#!/usr/bin/env python3 """WebSocket mic bridge — recibe PCM s16le 16kHz mono del móvil, vuelca a pacat. Además mantiene un registro de dispositivos conectados (nombre cantante, IP, bytes recibidos, última actividad) y lo expone a admin/ vía JSON atómico en /tmp/karaoke-mic-devices.json. """ import asyncio import json import os import subprocess import sys import tempfile import time import uuid from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse try: import websockets except ImportError: print("ERROR: pip3 install websockets") sys.exit(1) HOST = "0.0.0.0" PORT = 8081 LATENCY_MS = 40 # Ruta de IPC con el panel admin. Escritura atómica vía tmp + rename. DEVICES_FILE = "/tmp/karaoke-mic-devices.json" # Un dispositivo se considera "activo" si recibió audio en los últimos N segundos. ACTIVE_WINDOW_S = 3.0 # IPC admin → mic-server: set de device_ids silenciados. # Admin escribe atómicamente; nosotros leemos cacheado por mtime. MUTED_FILE = "/tmp/karaoke-mic-muted.json" _MUTED_CACHE: set[str] = set() _MUTED_MTIME: float = 0.0 def _load_muted_if_changed() -> set[str]: """Lee MUTED_FILE solo si su mtime cambió. Cache barata en memoria. Retorna el set actual (cached). Si el fichero no existe → set vacío. Errores → mantiene el cache previo (best-effort). """ global _MUTED_CACHE, _MUTED_MTIME try: st = os.stat(MUTED_FILE) except FileNotFoundError: if _MUTED_CACHE: _MUTED_CACHE = set() _MUTED_MTIME = 0.0 return _MUTED_CACHE except Exception: return _MUTED_CACHE if st.st_mtime == _MUTED_MTIME: return _MUTED_CACHE try: with open(MUTED_FILE, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict) and isinstance(data.get("muted"), list): _MUTED_CACHE = set(str(x) for x in data["muted"]) _MUTED_MTIME = st.st_mtime except Exception: # mantiene cache previo pass return _MUTED_CACHE # connection_id -> dict(name, ip, connected_at, last_audio_at, bytes_received) CONNECTED_DEVICES: dict[str, dict] = {} _DEVICES_LOCK = asyncio.Lock() _WRITE_TRIGGER: asyncio.Event | None = None # se inicializa en main() # Conexiones "monitor" — laptops que escuchan el audio reenviado. # Solo contiene websockets activos. NO se persiste a JSON (admin panel no los # muestra). El broadcast a monitors no debe bloquear el envío a pacat. MONITORS: set = set() _MONITORS_LOCK = asyncio.Lock() def _iso_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _parse_name_from_path(raw_path: str) -> str: """Extrae ?name= del path del WebSocket. Compat: vacío -> 'Anónimo'.""" try: # raw_path puede ser "/mic-ws?name=Diego" o "/?name=Diego" o "/" q = urlparse(raw_path).query params = parse_qs(q) vals = params.get("name") or [] if not vals: return "Anónimo" name = (vals[0] or "").strip() if not name: return "Anónimo" # Sanea longitud para evitar abusos if len(name) > 60: name = name[:60] return name except Exception: return "Anónimo" def _is_monitor_path(raw_path: str) -> bool: """Detecta si la conexión es un cliente monitor (escucha pasiva). Cualquier `?monitor=1`, `?monitor=true`, `?monitor=yes` cuenta como monitor. El resto es tratado como cliente mic normal (envía audio). """ try: q = urlparse(raw_path).query params = parse_qs(q) vals = params.get("monitor") or [] if not vals: return False v = (vals[0] or "").strip().lower() return v in ("1", "true", "yes", "on") except Exception: return False def _ws_path(websocket) -> str: """Compat entre versiones de websockets: 10.x usa .path, 13+ usa .request.path.""" req = getattr(websocket, "request", None) if req is not None and getattr(req, "path", None): return req.path return getattr(websocket, "path", "") or "" def _snapshot_devices_locked() -> dict: """Construye la representación JSON del estado actual. Llamar bajo lock.""" now = time.time() muted = _load_muted_if_changed() devices = [] for cid, info in CONNECTED_DEVICES.items(): last_audio = info.get("last_audio_at_ts") or 0.0 active = (now - last_audio) < ACTIVE_WINDOW_S if last_audio else False devices.append({ "id": cid, "name": info.get("name") or "Anónimo", "ip": info.get("ip") or "", "connected_at": info.get("connected_at") or _iso_now(), "last_audio_at": info.get("last_audio_at") or None, "bytes_received": int(info.get("bytes_received") or 0), "active": bool(active), "muted": cid in muted, }) # Orden estable: por connected_at devices.sort(key=lambda d: d.get("connected_at") or "") return {"updated_at": _iso_now(), "devices": devices} def _write_devices_blocking(payload: dict) -> None: """Escritura atómica: archivo temporal + os.rename. Ejecutado en executor.""" try: dir_ = os.path.dirname(DEVICES_FILE) or "/tmp" # NamedTemporaryFile en el mismo directorio para que el rename sea atómico. fd, tmp_path = tempfile.mkstemp(prefix=".karaoke-mic-devices.", suffix=".tmp", dir=dir_) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, separators=(",", ":")) os.replace(tmp_path, DEVICES_FILE) except Exception: try: os.unlink(tmp_path) except Exception: pass raise except Exception as e: print(f"[mic] WARN: no pude escribir {DEVICES_FILE}: {e}", file=sys.stderr) async def _persist_devices() -> None: """Toma snapshot bajo lock y delega la escritura a un executor (no bloquea loop).""" async with _DEVICES_LOCK: snapshot = _snapshot_devices_locked() loop = asyncio.get_running_loop() await loop.run_in_executor(None, _write_devices_blocking, snapshot) def _request_persist() -> None: """Solicita una persistencia. La hace el coroutine flusher (coalesce). Si por cualquier razón el trigger no está montado, fallback inline (raro). """ if _WRITE_TRIGGER is not None: _WRITE_TRIGGER.set() else: # Best-effort sin loop activo try: payload = _snapshot_devices_locked() _write_devices_blocking(payload) except Exception: pass async def _devices_flusher() -> None: """Loop que escribe el JSON cuando hay cambios. Coalesce de eventos.""" assert _WRITE_TRIGGER is not None # Estado inicial (lista vacía) await _persist_devices() while True: await _WRITE_TRIGGER.wait() _WRITE_TRIGGER.clear() # Pequeña ventana de coalesce para no saturar IO con ráfagas await asyncio.sleep(0.1) await _persist_devices() async def _activity_pinger() -> None: """Repinta el JSON cada segundo para que el flag `active` decaiga aunque no haya nuevas conexiones (admin ve dispositivos pasivos en gris).""" while True: await asyncio.sleep(1.0) async with _DEVICES_LOCK: has_any = bool(CONNECTED_DEVICES) if has_any: _request_persist() async def _broadcast_to_monitors(payload: bytes) -> None: """Reenvía el chunk PCM a todos los monitors conectados. No bloquea el envío a pacat: si un monitor falla, se descarta silenciosamente. Usamos snapshot del set bajo lock para no romper iteración si entran/salen. """ if not MONITORS: return async with _MONITORS_LOCK: snapshot = list(MONITORS) if not snapshot: return async def _send_one(m): try: await m.send(payload) except Exception: # El que falle será limpiado por su propio handler al detectar close. pass await asyncio.gather(*[_send_one(m) for m in snapshot], return_exceptions=True) async def handle_monitor(websocket): """Cliente que solo escucha — recibe los bytes PCM reenviados. No abre pacat, no se registra en CONNECTED_DEVICES (admin no lo lista). Mantiene la conexión abierta hasta que el cliente cierre. """ addr = websocket.remote_address ip = addr[0] if addr else "" async with _MONITORS_LOCK: MONITORS.add(websocket) print(f"[monitor] Conectado: ip={ip} total={len(MONITORS)}") try: # Esperar a que el cliente cierre. Drenamos cualquier mensaje entrante # (no esperamos audio del monitor; si llegara, lo ignoramos). async for _ in websocket: pass except Exception: pass finally: async with _MONITORS_LOCK: MONITORS.discard(websocket) print(f"[monitor] Desconectado: ip={ip} total={len(MONITORS)}") async def handle_mic(websocket): addr = websocket.remote_address ip = addr[0] if addr else "" path = _ws_path(websocket) # Routing: si es monitor, deriva al handler de monitor (sin pacat, sin tracking). if _is_monitor_path(path): await handle_monitor(websocket) return name = _parse_name_from_path(path) conn_id = uuid.uuid4().hex[:8] now_iso = _iso_now() async with _DEVICES_LOCK: CONNECTED_DEVICES[conn_id] = { "name": name, "ip": ip, "connected_at": now_iso, "last_audio_at": None, "last_audio_at_ts": 0.0, "bytes_received": 0, } _request_persist() print(f"[mic] Conectado: id={conn_id} name={name!r} ip={ip}") proc = subprocess.Popen( [ "pacat", "--playback", "--device=karaoke_mic", "--format=s16le", "--rate=16000", "--channels=1", f"--latency-msec={LATENCY_MS}", ], stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, ) try: async for msg in websocket: if isinstance(msg, bytes) and proc.poll() is None: # Check mute: si este device está muted, NO escribimos a pacat # (drop audio bytes) pero sí actualizamos contadores y monitors # para que admin vea el dispositivo "vivo pero silenciado". muted_set = _load_muted_if_changed() is_muted = conn_id in muted_set if not is_muted: try: proc.stdin.write(msg) proc.stdin.flush() except Exception: pass # Actualiza tracking (sin lock — operaciones atómicas en dict de un solo escritor) info = CONNECTED_DEVICES.get(conn_id) if info is not None: ts = time.time() info["bytes_received"] = int(info.get("bytes_received") or 0) + len(msg) info["last_audio_at_ts"] = ts info["last_audio_at"] = _iso_now() # Reenvío a monitors — independiente del mute (monitor escucha mic crudo). # Si quieres que monitor también respete mute, mover esto al if not is_muted. if MONITORS and not is_muted: try: await _broadcast_to_monitors(msg) except Exception: pass except Exception: pass finally: try: proc.terminate() except Exception: pass async with _DEVICES_LOCK: CONNECTED_DEVICES.pop(conn_id, None) _request_persist() print(f"[mic] Desconectado: id={conn_id} name={name!r} ip={ip}") async def main(): global _WRITE_TRIGGER _WRITE_TRIGGER = asyncio.Event() print(f"[mic] Servidor WebSocket en ws://{HOST}:{PORT}") print(f"[mic] Abre en el móvil: http://192.168.1.182:8080/player/mic.html") print(f"[mic] Estado dispositivos: {DEVICES_FILE}") asyncio.create_task(_devices_flusher()) asyncio.create_task(_activity_pinger()) async with websockets.serve(handle_mic, HOST, PORT): await asyncio.Future() asyncio.run(main())