aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2026-02-18 13:05:08 -0800
committerPinapelz <yukais@pinapelz.com>2026-02-18 13:05:08 -0800
commit4311b4bc2cc5191039e5498a7a0df3aa103fa043 (patch)
tree4820b4cbc8f9c3e687623b347c0ead492d3193f1
parent91cf086367d350bbb095599f737f13c418d7b3cb (diff)
subscriber synchronization via buffered queue
-rw-r--r--README.md8
-rw-r--r--templates/index.html44
-rw-r--r--yt_radio.py152
3 files changed, 173 insertions, 31 deletions
diff --git a/README.md b/README.md
index 3470fa6..4ace374 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,13 @@
# yt-playlist-radio
-Simple demo of YouTube playlist to m3u playlist, all in about 200 lines of Python.
+Takes a YouTube playlist and converts it to an audio stream, similar to internet radio
+- `/playlist.m3u` provides a playlist of songs to be played, supports track skipping, like a traditional playlist
+- `/stream` provides a buffered audio stream
## How to run?
First set the environment variables as per `.env.template`, then just run it with gunicorn or something else (gunicorn comes bundled as part of the deps here)
```bash
uv sync
-uv run gunicorn -w 4 -b 0.0.0.0:8000 yt_radio:app
+uv run gunicorn yt_radio:app --bind 0.0.0.0:8000 --worker-class gevent --workers 2 --timeout 0 --keep-alive 5
```
+
+> Note that `--timeout 0` is a strict requirement if using `/stream` endpoint due to Gunicorn's default timeout policy
diff --git a/templates/index.html b/templates/index.html
new file mode 100644
index 0000000..ecad4ce
--- /dev/null
+++ b/templates/index.html
@@ -0,0 +1,44 @@
+<!doctype html>
+<html lang="en">
+<head>
+ <meta charset="utf-8">
+ <title>{{ title }}</title>
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <style>
+ html, body {
+ height: 100%;
+ margin: 0;
+ font-family: system-ui, -apple-system, Segoe UI, Roboto, Ubuntu, Cantarell, Arial, sans-serif;
+ background: #0f1220;
+ color: #e8eaf6;
+ }
+ .center {
+ min-height: 100%;
+ display: grid;
+ place-items: center;
+ text-align: center;
+ padding: 16px;
+ }
+ img {
+ max-width: 90vw;
+ height: auto;
+ display: block;
+ margin: 0 auto 12px;
+ }
+ h1 {
+ margin: 0;
+ font-size: clamp(1.5rem, 4vw, 2.5rem);
+ font-weight: 500;
+ }
+ </style>
+ <script src="https://unpkg.com/htmx.org@1.9.10"></script>
+</head>
+<body>
+ <main class="center">
+ <div hx-get="/now_playing" hx-trigger="load, every 10s" hx-swap="innerHTML">
+ <img src="{{ image_url }}" alt="Cover">
+ </div>
+ <small><code>/stream /playlist.m3u</code></small>
+ </main>
+</body>
+</html>
diff --git a/yt_radio.py b/yt_radio.py
index db0dc8d..40937e5 100644
--- a/yt_radio.py
+++ b/yt_radio.py
@@ -1,4 +1,4 @@
-from flask import Flask, Response, stream_with_context, jsonify
+from flask import Flask, Response, stream_with_context, jsonify, render_template, request
from yt_dlp import YoutubeDL
import subprocess
import json
@@ -8,6 +8,8 @@ import os
from dotenv import load_dotenv
import logging
import time
+from queue import Queue, Empty
+import uuid
load_dotenv()
@@ -17,15 +19,19 @@ logger = logging.getLogger(__name__)
app = Flask(__name__)
BITRATE_KBPS = int(os.environ.get("BITRATE_KBPS", "192"))
-BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10"))
+BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10")) # allow pre-buffering of 10 sec on /stream
BASE_URL = os.environ.get("BASE_URL", "http://localhost:8000")
PLAYLIST_URL = os.environ.get("PLAYLIST_URL")
CACHE_FILE = os.environ.get("CACHE_FILE", "cache.json")
+
+# optional / page
+SITE_TITLE = os.environ.get("SITE_TITLE", "yt_radio.py")
+SITE_IMAGE = os.environ.get("SITE_IMAGE", "")
if not PLAYLIST_URL:
raise RuntimeError("Please set PLAYLIST_URL environment variable")
METADATA = {}
-NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "url": ""}
+NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "id": ""}
_CACHE_LOCK = threading.Lock()
try:
@@ -78,7 +84,7 @@ def convert_playlist_to_links(link: str):
def fetch_metadata(index, url):
with _CACHE_LOCK:
cached = _CACHE.get(url)
- if cached:
+ if cached: # we can skip fetching from youtube if we've cached it before
try:
METADATA[index] = {
"title": cached.get("title", f"Track {index+1}"),
@@ -103,12 +109,14 @@ def fetch_metadata(index, url):
"title": data.get("title", f"Track {index+1}"),
"artist": data.get("uploader", "Unknown"),
"duration": data.get("duration", -1),
+ "id": data.get("id", ""),
}
with _CACHE_LOCK:
_CACHE[url] = {
"title": data.get("title"),
"uploader": data.get("uploader"),
"duration": data.get("duration"),
+ "id": data.get("id")
}
_save_cache()
except Exception:
@@ -116,6 +124,7 @@ def fetch_metadata(index, url):
"title": f"Track {index+1}",
"artist": "Unknown",
"duration": -1,
+ "id": ""
}
logger.debug("Failed to fetch metadata for index %s, using fallback", index)
@@ -131,7 +140,6 @@ logger.info("M3U available at %s/playlist.m3u", BASE_URL)
def _ensure_metadata(index):
- """Fetch metadata for a track if not already loaded. Called before playback."""
if index not in METADATA:
fetch_metadata(index, PLAYLIST[index])
@@ -144,7 +152,7 @@ def _stream_track(index):
NOW_PLAYING["index"] = index
NOW_PLAYING["title"] = meta["title"]
NOW_PLAYING["artist"] = meta["artist"]
- NOW_PLAYING["url"] = url
+ NOW_PLAYING["id"] = meta["id"]
logger.info("Now playing [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta["artist"], meta["title"])
ytdlp = subprocess.Popen(
@@ -203,6 +211,89 @@ def _stream_track(index):
logger.info("Finished sending [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta["artist"], meta["title"])
+SUBSCRIBERS = {} # sid -> Queue[bytes]
+SUBSCRIBERS_LOCK = threading.Lock()
+RADIO_THREAD = None
+RADIO_STOP = threading.Event()
+
+CHUNK_SIZE = 8192
+QUEUE_MAX_CHUNKS = 256
+
+
+def add_subscriber():
+ q = Queue(maxsize=QUEUE_MAX_CHUNKS)
+ sid = uuid.uuid4().hex
+ with SUBSCRIBERS_LOCK:
+ SUBSCRIBERS[sid] = q
+ logger.info("Subscriber added sid=%s (total=%d)", sid, len(SUBSCRIBERS))
+ return sid, q
+
+
+def remove_subscriber(sid):
+ with SUBSCRIBERS_LOCK:
+ if sid in SUBSCRIBERS:
+ SUBSCRIBERS.pop(sid, None)
+ logger.info("Subscriber removed sid=%s (total=%d)", sid, len(SUBSCRIBERS))
+
+
+def broadcast_chunk(chunk: bytes):
+ with SUBSCRIBERS_LOCK:
+ subs = list(SUBSCRIBERS.items())
+ for sid, q in subs: # push the same chunk to all subscribers
+ if q.full():
+ try:
+ q.get_nowait()
+ except Exception:
+ pass
+ try:
+ q.put_nowait(chunk)
+ except Exception:
+ pass
+
+
+def _radio_loop():
+ played = []
+ while not RADIO_STOP.is_set():
+ if not PLAYLIST:
+ logger.error("Playlist is empty, cannot stream")
+ time.sleep(1)
+ continue
+ available = [i for i in range(len(PLAYLIST)) if i not in played]
+ if not available:
+ played.clear()
+ available = list(range(len(PLAYLIST)))
+
+ index = random.choice(available)
+ played.append(index)
+
+ try:
+ for chunk in _stream_track(index):
+ if RADIO_STOP.is_set():
+ break
+ broadcast_chunk(chunk)
+ except Exception:
+ logger.exception("Error in radio producer, skipping track")
+ time.sleep(1)
+ continue
+
+
+def ensure_radio_running():
+ global RADIO_THREAD
+ if RADIO_THREAD and RADIO_THREAD.is_alive():
+ return
+ RADIO_STOP.clear()
+ RADIO_THREAD = threading.Thread(target=_radio_loop, daemon=True)
+ RADIO_THREAD.start()
+ logger.info("Radio producer started; listeners will share the same track")
+
+
+ensure_radio_running()
+
+@app.route("/")
+def home():
+ return render_template("index.html", title=SITE_TITLE, image_url=SITE_IMAGE)
+
+
@app.route("/playlist.m3u")
def playlist_route():
lines = [
@@ -215,39 +306,42 @@ def playlist_route():
@app.route("/stream")
def stream():
+ ensure_radio_running()
+ sid, q = add_subscriber()
def generate():
- played = []
- while True:
- if not PLAYLIST:
- logger.error("Playlist is empty, cannot stream")
- break
-
- available = [i for i in range(len(PLAYLIST)) if i not in played]
- if not available:
- played.clear()
- available = list(range(len(PLAYLIST)))
-
- index = random.choice(available)
- played.append(index)
-
- try:
- yield from _stream_track(index)
- except GeneratorExit:
- logger.info("Client disconnected")
- return
- except Exception:
- logger.exception("Error streaming track %d, skipping", index)
- time.sleep(1)
- continue
+ try:
+ while True:
+ try:
+ chunk = q.get(timeout=5)
+ except Empty:
+ if RADIO_THREAD and not RADIO_THREAD.is_alive():
+ logger.warning("Producer stopped; restarting")
+ ensure_radio_running()
+ continue
+ yield chunk
+ except GeneratorExit:
+ logger.info("Client disconnected (sid=%s)", sid)
+ finally:
+ remove_subscriber(sid)
return Response(stream_with_context(generate()), mimetype="audio/mpeg")
@app.route("/now_playing")
def now_playing():
+ hx = request.headers.get("HX-Request")
+ accept = request.headers.get("Accept", "")
+ if (hx and hx.lower() == "true") or ("text/html" in accept and "application/json" not in accept):
+ title = NOW_PLAYING.get("title") or "Nothing"
+ artist = NOW_PLAYING.get("artist") or "Unknown"
+ vid = NOW_PLAYING.get("id") or ""
+ thumb_url = f"https://img.youtube.com/vi/{vid}/maxresdefault.jpg" if vid else (SITE_IMAGE or "")
+ return f'<img src="{thumb_url}" alt="Cover" style="width:300px;height:300px;object-fit:cover;display:block;margin:0 auto 12px;"><div>{artist} — {title}</div>'
return jsonify(NOW_PLAYING)
+
+
@app.route("/tracks")
def tracks():
track_list = []
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage