diff options
| -rw-r--r-- | server.py | 119 |
1 files changed, 64 insertions, 55 deletions
@@ -3,6 +3,7 @@ import threading import json import queue import os +from typing import Any, Dict, Optional, Set, List, Iterator from flask import Flask, Response, stream_with_context from flask_cors import CORS import numpy as np @@ -10,17 +11,17 @@ import sounddevice as sd from faster_whisper import WhisperModel from gui import select_settings, prompt_input_sample_rate -TARGET_SAMPLE_RATE = 16000 -CAPTURE_SAMPLE_RATE = 0 -BUFFER_SECONDS = 10 -MAX_SAMPLES = 0 -PROCESS_INTERVAL_SECONDS = 2 -SSE_EVENT_SUBTITLE = "subtitle" -SSE_KEEPALIVE_SECONDS = 15 +TARGET_SAMPLE_RATE: int = 16000 +CAPTURE_SAMPLE_RATE: int = 0 +BUFFER_SECONDS: float = 10 +MAX_SAMPLES: int = 0 +PROCESS_INTERVAL_SECONDS: float = 2 +SSE_EVENT_SUBTITLE: str = "subtitle" +SSE_KEEPALIVE_SECONDS: int = 15 -SETTINGS_PATH = os.path.join(os.path.dirname(__file__), "settings.json") +SETTINGS_PATH: str = os.path.join(os.path.dirname(__file__), "settings.json") -DEFAULT_SETTINGS = { +DEFAULT_SETTINGS: Dict[str, Any] = { "audio_device_name": "", "model_name": "medium", "device": "cpu", @@ -32,26 +33,27 @@ DEFAULT_SETTINGS = { "update_interval_seconds": 2, } -MODEL_CHOICES = ["tiny", "base", "small", "medium", "large-v2", "large-v3", "distil-large-v3"] -DEVICE_CHOICES = ["cpu", "cuda", "auto"] -COMPUTE_CHOICES = ["int8", "int8_float16", "float16", "float32"] -TASK_CHOICES = ["translate", "transcribe"] +MODEL_CHOICES: List[str] = ["tiny", "base", "small", "medium", "large-v2", "large-v3", "distil-large-v3"] +DEVICE_CHOICES: List[str] = ["cpu", "cuda", "auto"] +COMPUTE_CHOICES: List[str] = ["int8", "int8_float16", "float16", "float32"] +TASK_CHOICES: List[str] = ["translate", "transcribe"] -audio_buffer = np.zeros(0, dtype=np.float32) -lock = threading.Lock() -model = None -WHISPER_TASK = DEFAULT_SETTINGS["task"] -WHISPER_BEAM_SIZE = DEFAULT_SETTINGS["beam_size"] -WHISPER_LANGUAGE = DEFAULT_SETTINGS["language"] +audio_buffer: np.ndarray = np.zeros(0, dtype=np.float32) +lock: threading.Lock = threading.Lock() +model: Optional[WhisperModel] = None +WHISPER_TASK: str = DEFAULT_SETTINGS["task"] +WHISPER_BEAM_SIZE: int = DEFAULT_SETTINGS["beam_size"] +WHISPER_LANGUAGE: str = DEFAULT_SETTINGS["language"] -last_payload = None -clients = set() -clients_lock = threading.Lock() -SERVER_HOST = "127.0.0.1" -SERVER_PORT = 5000 -app = Flask(__name__) +last_payload: Optional[Dict[str, Any]] = None +clients: Set[queue.Queue] = set() +clients_lock: threading.Lock = threading.Lock() +SERVER_HOST: str = "127.0.0.1" +SERVER_PORT: int = 5000 +app: Flask = Flask(__name__) CORS(app) + def resample_audio(audio_np: np.ndarray, src_rate: int, dst_rate: int) -> np.ndarray: if src_rate == dst_rate: return audio_np @@ -65,7 +67,7 @@ def resample_audio(audio_np: np.ndarray, src_rate: int, dst_rate: int) -> np.nda return np.interp(x_new, x_old, audio_np).astype(np.float32) -def load_settings() -> dict: +def load_settings() -> Dict[str, Any]: if not os.path.exists(SETTINGS_PATH): return DEFAULT_SETTINGS.copy() try: @@ -73,14 +75,14 @@ def load_settings() -> dict: data = json.load(handle) except (OSError, json.JSONDecodeError): return DEFAULT_SETTINGS.copy() - merged = DEFAULT_SETTINGS.copy() + merged: Dict[str, Any] = DEFAULT_SETTINGS.copy() for key, value in data.items(): if key in merged: merged[key] = value return merged -def save_settings(settings: dict) -> None: +def save_settings(settings: Dict[str, Any]) -> None: try: with open(SETTINGS_PATH, "w", encoding="utf-8") as handle: json.dump(settings, handle, indent=2) @@ -89,9 +91,11 @@ def save_settings(settings: dict) -> None: def run_whisper(audio_np: np.ndarray) -> str: - transcribe_kwargs = {"task": WHISPER_TASK, "beam_size": WHISPER_BEAM_SIZE} + transcribe_kwargs: Dict[str, Any] = {"task": WHISPER_TASK, "beam_size": WHISPER_BEAM_SIZE} if WHISPER_LANGUAGE: transcribe_kwargs["language"] = WHISPER_LANGUAGE + # model is expected to be initialized in main() + assert model is not None, "Whisper model is not initialized" segments, _info = model.transcribe(audio_np, **transcribe_kwargs) text = " ".join(seg.text for seg in segments).strip() if text: @@ -102,7 +106,7 @@ def run_whisper(audio_np: np.ndarray) -> str: def broadcast_subtitle(text: str) -> None: global last_payload - payload = {"text": text} + payload: Dict[str, Any] = {"text": text} last_payload = payload with clients_lock: targets = list(clients) @@ -112,12 +116,14 @@ def broadcast_subtitle(text: str) -> None: except queue.Full: pass -def format_sse_event(event: str, payload: dict) -> str: + +def format_sse_event(event: str, payload: Dict[str, Any]) -> str: data = json.dumps(payload) return f"event: {event}\ndata: {data}\n\n" -def event_stream(): - client_queue = queue.Queue(maxsize=10) + +def event_stream() -> Iterator[str]: + client_queue: queue.Queue = queue.Queue(maxsize=10) with clients_lock: clients.add(client_queue) @@ -136,8 +142,9 @@ def event_stream(): with clients_lock: clients.discard(client_queue) + @app.get("/events") -def events(): +def events() -> Response: headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", @@ -147,13 +154,13 @@ def events(): @app.get("/health") -def health(): +def health() -> Response: response = Response("ok", mimetype="text/plain") response.headers["Access-Control-Allow-Origin"] = "*" return response -def start_subtitle_server(): +def start_subtitle_server() -> threading.Thread: thread = threading.Thread( target=lambda: app.run( host=SERVER_HOST, @@ -180,10 +187,12 @@ def list_audio_devices() -> None: io_str = "/".join(io) if io else "none" print(f"[{idx}] {dev['name']} ({io_str})") -def audio_callback(indata, frames, time_info, status): + +def audio_callback(indata: np.ndarray, frames: int, time_info: Any, status: Any) -> None: if status: print(f"Audio status: {status}") - chunk = indata[:, 0].copy() + # Take first channel + chunk: np.ndarray = indata[:, 0].copy() global audio_buffer with lock: @@ -192,30 +201,30 @@ def audio_callback(indata, frames, time_info, status): audio_buffer = audio_buffer[-MAX_SAMPLES:] -def is_silent(audio_16k): +def is_silent(audio_16k: Optional[np.ndarray]) -> bool: if audio_16k is None or len(audio_16k) == 0: return False - rms = float(np.sqrt(np.mean(np.square(audio_16k)))) # root mean square + rms: float = float(np.sqrt(np.mean(np.square(audio_16k)))) # root mean square return rms < 0.003 -def processing_loop(): +def processing_loop() -> None: while True: time.sleep(PROCESS_INTERVAL_SECONDS) with lock: if len(audio_buffer) == 0 or CAPTURE_SAMPLE_RATE <= 0: continue - audio_copy = audio_buffer.copy() - capture_rate = CAPTURE_SAMPLE_RATE - audio_16k = resample_audio(audio_copy, capture_rate, TARGET_SAMPLE_RATE) + audio_copy: np.ndarray = audio_buffer.copy() + capture_rate: int = CAPTURE_SAMPLE_RATE + audio_16k: np.ndarray = resample_audio(audio_copy, capture_rate, TARGET_SAMPLE_RATE) if is_silent(audio_16k): continue run_whisper(audio_16k) def select_input_sample_rate(device_index: int, preferred_rate: int) -> int: - common_rates = [48000, 44100, 32000, 24000, 22050, 16000, 12000, 8000] - tried = set() + common_rates: List[int] = [48000, 44100, 32000, 24000, 22050, 16000, 12000, 8000] + tried: Set[int] = set() for rate in [preferred_rate] + common_rates: if rate in tried or rate <= 0: continue @@ -228,12 +237,12 @@ def select_input_sample_rate(device_index: int, preferred_rate: int) -> int: return prompt_input_sample_rate(device_index, common_rates) -def main(): +def main() -> None: global CAPTURE_SAMPLE_RATE, MAX_SAMPLES, model, WHISPER_TASK, WHISPER_BEAM_SIZE, WHISPER_LANGUAGE global BUFFER_SECONDS, PROCESS_INTERVAL_SECONDS start_subtitle_server() - settings = load_settings() + settings: Dict[str, Any] = load_settings() devices = sd.query_devices() input_devices = [(idx, dev) for idx, dev in enumerate(devices) if dev["max_input_channels"] > 0] settings = select_settings( @@ -247,19 +256,19 @@ def main(): ) save_settings(settings) - device_name = settings.get("audio_device_name", "") - matched_index = None + device_name: str = settings.get("audio_device_name", "") + matched_index: Optional[int] = None for idx, dev in enumerate(devices): if dev.get("name") == device_name and dev.get("max_input_channels", 0) > 0: matched_index = idx break if matched_index is None: raise RuntimeError("Saved audio device not found. Please reselect in the settings window.") - device_index = matched_index + device_index: int = matched_index - model_name = settings["model_name"] - whisper_device = settings["device"] - compute_type = settings["compute_type"] + model_name: str = settings["model_name"] + whisper_device: str = settings["device"] + compute_type: str = settings["compute_type"] WHISPER_TASK = settings["task"] WHISPER_BEAM_SIZE = int(settings["beam_size"]) WHISPER_LANGUAGE = settings["language"].strip() if settings["language"] else "" @@ -273,7 +282,7 @@ def main(): model = WhisperModel(model_name, device=whisper_device, compute_type=compute_type) device_info = sd.query_devices(device_index) - preferred_rate = int(device_info["default_samplerate"]) + preferred_rate: int = int(device_info["default_samplerate"]) if preferred_rate <= 0: preferred_rate = 48000 CAPTURE_SAMPLE_RATE = select_input_sample_rate(device_index, preferred_rate) |
