diff options
| author | Pinapelz <yukais@pinapelz.com> | 2026-02-18 13:05:08 -0800 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2026-02-18 13:05:08 -0800 |
| commit | 4311b4bc2cc5191039e5498a7a0df3aa103fa043 (patch) | |
| tree | 4820b4cbc8f9c3e687623b347c0ead492d3193f1 | |
| parent | 91cf086367d350bbb095599f737f13c418d7b3cb (diff) | |
subscriber synchronization via buffered queue
| -rw-r--r-- | README.md | 8 | ||||
| -rw-r--r-- | templates/index.html | 44 | ||||
| -rw-r--r-- | yt_radio.py | 152 |
3 files changed, 173 insertions, 31 deletions
@@ -1,9 +1,13 @@ # yt-playlist-radio -Simple demo of YouTube playlist to m3u playlist, all in about 200 lines of Python. +Takes a YouTube playlist and converts it to an audio stream, similar to internet radio +- `/playlist.m3u` provides a playlist of songs to be played, supports track skipping, like a traditional playlist +- `/stream` provides a buffered audio stream ## How to run? First set the environment variables as per `.env.template`, then just run it with gunicorn or something else (gunicorn comes bundled as part of the deps here) ```bash uv sync -uv run gunicorn -w 4 -b 0.0.0.0:8000 yt_radio:app +uv run gunicorn yt_radio:app --bind 0.0.0.0:8000 --worker-class gevent --workers 2 --timeout 0 --keep-alive 5 ``` + +> Note that `--timeout 0` is a strict requirement if using `/stream` endpoint due to Gunicorn's default timeout policy diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..ecad4ce --- /dev/null +++ b/templates/index.html @@ -0,0 +1,44 @@ +<!doctype html> +<html lang="en"> +<head> + <meta charset="utf-8"> + <title>{{ title }}</title> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <style> + html, body { + height: 100%; + margin: 0; + font-family: system-ui, -apple-system, Segoe UI, Roboto, Ubuntu, Cantarell, Arial, sans-serif; + background: #0f1220; + color: #e8eaf6; + } + .center { + min-height: 100%; + display: grid; + place-items: center; + text-align: center; + padding: 16px; + } + img { + max-width: 90vw; + height: auto; + display: block; + margin: 0 auto 12px; + } + h1 { + margin: 0; + font-size: clamp(1.5rem, 4vw, 2.5rem); + font-weight: 500; + } + </style> + <script src="https://unpkg.com/htmx.org@1.9.10"></script> +</head> +<body> + <main class="center"> + <div hx-get="/now_playing" hx-trigger="load, every 10s" hx-swap="innerHTML"> + <img src="{{ image_url }}" alt="Cover"> + </div> + <small><code>/stream /playlist.m3u</code></small> + </main> +</body> +</html> diff --git a/yt_radio.py b/yt_radio.py index db0dc8d..40937e5 100644 --- a/yt_radio.py +++ b/yt_radio.py @@ -1,4 +1,4 @@ -from flask import Flask, Response, stream_with_context, jsonify +from flask import Flask, Response, stream_with_context, jsonify, render_template, request from yt_dlp import YoutubeDL import subprocess import json @@ -8,6 +8,8 @@ import os from dotenv import load_dotenv import logging import time +from queue import Queue, Empty +import uuid load_dotenv() @@ -17,15 +19,19 @@ logger = logging.getLogger(__name__) app = Flask(__name__) BITRATE_KBPS = int(os.environ.get("BITRATE_KBPS", "192")) -BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10")) +BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10")) # allow pre-buffering of 10 sec on /stream BASE_URL = os.environ.get("BASE_URL", "http://localhost:8000") PLAYLIST_URL = os.environ.get("PLAYLIST_URL") CACHE_FILE = os.environ.get("CACHE_FILE", "cache.json") + +# optional / page +SITE_TITLE = os.environ.get("SITE_TITLE", "yt_radio.py") +SITE_IMAGE = os.environ.get("SITE_IMAGE", "") if not PLAYLIST_URL: raise RuntimeError("Please set PLAYLIST_URL environment variable") METADATA = {} -NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "url": ""} +NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "id": ""} _CACHE_LOCK = threading.Lock() try: @@ -78,7 +84,7 @@ def convert_playlist_to_links(link: str): def fetch_metadata(index, url): with _CACHE_LOCK: cached = _CACHE.get(url) - if cached: + if cached: # we can skip fetching from youtube if we've cached it before try: METADATA[index] = { "title": cached.get("title", f"Track {index+1}"), @@ -103,12 +109,14 @@ def fetch_metadata(index, url): "title": data.get("title", f"Track {index+1}"), "artist": data.get("uploader", "Unknown"), "duration": data.get("duration", -1), + "id": data.get("id", ""), } with _CACHE_LOCK: _CACHE[url] = { "title": data.get("title"), "uploader": data.get("uploader"), "duration": data.get("duration"), + "id": data.get("id") } _save_cache() except Exception: @@ -116,6 +124,7 @@ def fetch_metadata(index, url): "title": f"Track {index+1}", "artist": "Unknown", "duration": -1, + "id": "" } logger.debug("Failed to fetch metadata for index %s, using fallback", index) @@ -131,7 +140,6 @@ logger.info("M3U available at %s/playlist.m3u", BASE_URL) def _ensure_metadata(index): - """Fetch metadata for a track if not already loaded. Called before playback.""" if index not in METADATA: fetch_metadata(index, PLAYLIST[index]) @@ -144,7 +152,7 @@ def _stream_track(index): NOW_PLAYING["index"] = index NOW_PLAYING["title"] = meta["title"] NOW_PLAYING["artist"] = meta["artist"] - NOW_PLAYING["url"] = url + NOW_PLAYING["id"] = meta["id"] logger.info("Now playing [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta["artist"], meta["title"]) ytdlp = subprocess.Popen( @@ -203,6 +211,89 @@ def _stream_track(index): logger.info("Finished sending [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta["artist"], meta["title"]) +SUBSCRIBERS = {} # sid -> Queue[bytes] +SUBSCRIBERS_LOCK = threading.Lock() +RADIO_THREAD = None +RADIO_STOP = threading.Event() + +CHUNK_SIZE = 8192 +QUEUE_MAX_CHUNKS = 256 + + +def add_subscriber(): + q = Queue(maxsize=QUEUE_MAX_CHUNKS) + sid = uuid.uuid4().hex + with SUBSCRIBERS_LOCK: + SUBSCRIBERS[sid] = q + logger.info("Subscriber added sid=%s (total=%d)", sid, len(SUBSCRIBERS)) + return sid, q + + +def remove_subscriber(sid): + with SUBSCRIBERS_LOCK: + if sid in SUBSCRIBERS: + SUBSCRIBERS.pop(sid, None) + logger.info("Subscriber removed sid=%s (total=%d)", sid, len(SUBSCRIBERS)) + + +def broadcast_chunk(chunk: bytes): + with SUBSCRIBERS_LOCK: + subs = list(SUBSCRIBERS.items()) + for sid, q in subs: # push the same chunk to all subscribers + if q.full(): + try: + q.get_nowait() + except Exception: + pass + try: + q.put_nowait(chunk) + except Exception: + pass + + +def _radio_loop(): + played = [] + while not RADIO_STOP.is_set(): + if not PLAYLIST: + logger.error("Playlist is empty, cannot stream") + time.sleep(1) + continue + available = [i for i in range(len(PLAYLIST)) if i not in played] + if not available: + played.clear() + available = list(range(len(PLAYLIST))) + + index = random.choice(available) + played.append(index) + + try: + for chunk in _stream_track(index): + if RADIO_STOP.is_set(): + break + broadcast_chunk(chunk) + except Exception: + logger.exception("Error in radio producer, skipping track") + time.sleep(1) + continue + + +def ensure_radio_running(): + global RADIO_THREAD + if RADIO_THREAD and RADIO_THREAD.is_alive(): + return + RADIO_STOP.clear() + RADIO_THREAD = threading.Thread(target=_radio_loop, daemon=True) + RADIO_THREAD.start() + logger.info("Radio producer started; listeners will share the same track") + + +ensure_radio_running() + +@app.route("/") +def home(): + return render_template("index.html", title=SITE_TITLE, image_url=SITE_IMAGE) + + @app.route("/playlist.m3u") def playlist_route(): lines = [ @@ -215,39 +306,42 @@ def playlist_route(): @app.route("/stream") def stream(): + ensure_radio_running() + sid, q = add_subscriber() def generate(): - played = [] - while True: - if not PLAYLIST: - logger.error("Playlist is empty, cannot stream") - break - - available = [i for i in range(len(PLAYLIST)) if i not in played] - if not available: - played.clear() - available = list(range(len(PLAYLIST))) - - index = random.choice(available) - played.append(index) - - try: - yield from _stream_track(index) - except GeneratorExit: - logger.info("Client disconnected") - return - except Exception: - logger.exception("Error streaming track %d, skipping", index) - time.sleep(1) - continue + try: + while True: + try: + chunk = q.get(timeout=5) + except Empty: + if RADIO_THREAD and not RADIO_THREAD.is_alive(): + logger.warning("Producer stopped; restarting") + ensure_radio_running() + continue + yield chunk + except GeneratorExit: + logger.info("Client disconnected (sid=%s)", sid) + finally: + remove_subscriber(sid) return Response(stream_with_context(generate()), mimetype="audio/mpeg") @app.route("/now_playing") def now_playing(): + hx = request.headers.get("HX-Request") + accept = request.headers.get("Accept", "") + if (hx and hx.lower() == "true") or ("text/html" in accept and "application/json" not in accept): + title = NOW_PLAYING.get("title") or "Nothing" + artist = NOW_PLAYING.get("artist") or "Unknown" + vid = NOW_PLAYING.get("id") or "" + thumb_url = f"https://img.youtube.com/vi/{vid}/maxresdefault.jpg" if vid else (SITE_IMAGE or "") + return f'<img src="{thumb_url}" alt="Cover" style="width:300px;height:300px;object-fit:cover;display:block;margin:0 auto 12px;"><div>{artist} — {title}</div>' return jsonify(NOW_PLAYING) + + @app.route("/tracks") def tracks(): track_list = [] |
