"""Stream Mac addresses""" import re import asyncio import subprocess from fastapi import FastAPI, Response from fastapi.staticfiles import StaticFiles from fastapi.responses import StreamingResponse, PlainTextResponse app = FastAPI() from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) async def stream_mac_addresses(): """Stream Mac Addresses""" pipe = await asyncio.create_subprocess_shell('tcpdump -i en0 -n -e', stdout=subprocess.PIPE, stderr=subprocess.PIPE) mac_regex = re.compile(r"(([0-9a-fA-F]:?){12})") while True: line = await pipe.stdout.readline() line = line.decode('utf-8').strip() macs = mac_regex.findall(line) if macs: yield f'data: {macs[0][0]}\n\n' # New - mount static files app.mount("/static", StaticFiles(directory="."), name="static") @app.get("/hello") def read_root(): """Hello world""" return {"Hello": "World"} @app.get("/stream", response_class=PlainTextResponse) async def read_stream(): return StreamingResponse(stream_mac_addresses())