"""Stream Mac addresses"""
import re
import asyncio
import subprocess
from fastapi import FastAPI, Response
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse, PlainTextResponse
app = FastAPI()
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def stream_mac_addresses():
"""Stream Mac Addresses"""
pipe = await asyncio.create_subprocess_shell('tcpdump -i en0 -n -e',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
mac_regex = re.compile(r"(([0-9a-fA-F]:?){12})")
while True:
line = await pipe.stdout.readline()
line = line.decode('utf-8').strip()
macs = mac_regex.findall(line)
if macs:
yield f'data: {macs[0][0]}\n\n'
# New - mount static files
app.mount("/static", StaticFiles(directory="."), name="static")
@app.get("/hello")
def read_root():
"""Hello world"""
return {"Hello": "World"}
@app.get("/stream", response_class=PlainTextResponse)
async def read_stream():
return StreamingResponse(stream_mac_addresses())