aboutsummaryrefslogtreecommitdiffstats
path: root/yt_radio.py
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2026-02-18 13:05:08 -0800
committerPinapelz <yukais@pinapelz.com>2026-02-18 13:05:08 -0800
commit4311b4bc2cc5191039e5498a7a0df3aa103fa043 (patch)
tree4820b4cbc8f9c3e687623b347c0ead492d3193f1 /yt_radio.py
parent91cf086367d350bbb095599f737f13c418d7b3cb (diff)
subscriber synchronization via buffered queue
Diffstat (limited to 'yt_radio.py')
-rw-r--r--yt_radio.py152
1 files changed, 123 insertions, 29 deletions
diff --git a/yt_radio.py b/yt_radio.py
index db0dc8d..40937e5 100644
--- a/yt_radio.py
+++ b/yt_radio.py
@@ -1,4 +1,4 @@
-from flask import Flask, Response, stream_with_context, jsonify
+from flask import Flask, Response, stream_with_context, jsonify, render_template, request
from yt_dlp import YoutubeDL
import subprocess
import json
@@ -8,6 +8,8 @@ import os
from dotenv import load_dotenv
import logging
import time
+from queue import Queue, Empty
+import uuid
load_dotenv()
@@ -17,15 +19,19 @@ logger = logging.getLogger(__name__)
app = Flask(__name__)
BITRATE_KBPS = int(os.environ.get("BITRATE_KBPS", "192"))
-BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10"))
+BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10")) # allow pre-buffering of 10 sec on /stream
BASE_URL = os.environ.get("BASE_URL", "http://localhost:8000")
PLAYLIST_URL = os.environ.get("PLAYLIST_URL")
CACHE_FILE = os.environ.get("CACHE_FILE", "cache.json")
+
+# optional / page
+SITE_TITLE = os.environ.get("SITE_TITLE", "yt_radio.py")
+SITE_IMAGE = os.environ.get("SITE_IMAGE", "")
if not PLAYLIST_URL:
raise RuntimeError("Please set PLAYLIST_URL environment variable")
METADATA = {}
-NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "url": ""}
+NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "id": ""}
_CACHE_LOCK = threading.Lock()
try:
@@ -78,7 +84,7 @@ def convert_playlist_to_links(link: str):
def fetch_metadata(index, url):
with _CACHE_LOCK:
cached = _CACHE.get(url)
- if cached:
+ if cached: # we can skip fetching from youtube if we've cached it before
try:
METADATA[index] = {
"title": cached.get("title", f"Track {index+1}"),
@@ -103,12 +109,14 @@ def fetch_metadata(index, url):
"title": data.get("title", f"Track {index+1}"),
"artist": data.get("uploader", "Unknown"),
"duration": data.get("duration", -1),
+ "id": data.get("id", ""),
}
with _CACHE_LOCK:
_CACHE[url] = {
"title": data.get("title"),
"uploader": data.get("uploader"),
"duration": data.get("duration"),
+ "id": data.get("id")
}
_save_cache()
except Exception:
@@ -116,6 +124,7 @@ def fetch_metadata(index, url):
"title": f"Track {index+1}",
"artist": "Unknown",
"duration": -1,
+ "id": ""
}
logger.debug("Failed to fetch metadata for index %s, using fallback", index)
@@ -131,7 +140,6 @@ logger.info("M3U available at %s/playlist.m3u", BASE_URL)
def _ensure_metadata(index):
- """Fetch metadata for a track if not already loaded. Called before playback."""
if index not in METADATA:
fetch_metadata(index, PLAYLIST[index])
@@ -144,7 +152,7 @@ def _stream_track(index):
NOW_PLAYING["index"] = index
NOW_PLAYING["title"] = meta["title"]
NOW_PLAYING["artist"] = meta["artist"]
- NOW_PLAYING["url"] = url
+ NOW_PLAYING["id"] = meta["id"]
logger.info("Now playing [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta["artist"], meta["title"])
ytdlp = subprocess.Popen(
@@ -203,6 +211,89 @@ def _stream_track(index):
logger.info("Finished sending [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta["artist"], meta["title"])
+SUBSCRIBERS = {} # sid -> Queue[bytes]
+SUBSCRIBERS_LOCK = threading.Lock()
+RADIO_THREAD = None
+RADIO_STOP = threading.Event()
+
+CHUNK_SIZE = 8192
+QUEUE_MAX_CHUNKS = 256
+
+
+def add_subscriber():
+ q = Queue(maxsize=QUEUE_MAX_CHUNKS)
+ sid = uuid.uuid4().hex
+ with SUBSCRIBERS_LOCK:
+ SUBSCRIBERS[sid] = q
+ logger.info("Subscriber added sid=%s (total=%d)", sid, len(SUBSCRIBERS))
+ return sid, q
+
+
+def remove_subscriber(sid):
+ with SUBSCRIBERS_LOCK:
+ if sid in SUBSCRIBERS:
+ SUBSCRIBERS.pop(sid, None)
+ logger.info("Subscriber removed sid=%s (total=%d)", sid, len(SUBSCRIBERS))
+
+
+def broadcast_chunk(chunk: bytes):
+ with SUBSCRIBERS_LOCK:
+ subs = list(SUBSCRIBERS.items())
+ for sid, q in subs: # push the same chunk to all subscribers
+ if q.full():
+ try:
+ q.get_nowait()
+ except Exception:
+ pass
+ try:
+ q.put_nowait(chunk)
+ except Exception:
+ pass
+
+
+def _radio_loop():
+ played = []
+ while not RADIO_STOP.is_set():
+ if not PLAYLIST:
+ logger.error("Playlist is empty, cannot stream")
+ time.sleep(1)
+ continue
+ available = [i for i in range(len(PLAYLIST)) if i not in played]
+ if not available:
+ played.clear()
+ available = list(range(len(PLAYLIST)))
+
+ index = random.choice(available)
+ played.append(index)
+
+ try:
+ for chunk in _stream_track(index):
+ if RADIO_STOP.is_set():
+ break
+ broadcast_chunk(chunk)
+ except Exception:
+ logger.exception("Error in radio producer, skipping track")
+ time.sleep(1)
+ continue
+
+
+def ensure_radio_running():
+ global RADIO_THREAD
+ if RADIO_THREAD and RADIO_THREAD.is_alive():
+ return
+ RADIO_STOP.clear()
+ RADIO_THREAD = threading.Thread(target=_radio_loop, daemon=True)
+ RADIO_THREAD.start()
+ logger.info("Radio producer started; listeners will share the same track")
+
+
+ensure_radio_running()
+
+@app.route("/")
+def home():
+ return render_template("index.html", title=SITE_TITLE, image_url=SITE_IMAGE)
+
+
@app.route("/playlist.m3u")
def playlist_route():
lines = [
@@ -215,39 +306,42 @@ def playlist_route():
@app.route("/stream")
def stream():
+ ensure_radio_running()
+ sid, q = add_subscriber()
def generate():
- played = []
- while True:
- if not PLAYLIST:
- logger.error("Playlist is empty, cannot stream")
- break
-
- available = [i for i in range(len(PLAYLIST)) if i not in played]
- if not available:
- played.clear()
- available = list(range(len(PLAYLIST)))
-
- index = random.choice(available)
- played.append(index)
-
- try:
- yield from _stream_track(index)
- except GeneratorExit:
- logger.info("Client disconnected")
- return
- except Exception:
- logger.exception("Error streaming track %d, skipping", index)
- time.sleep(1)
- continue
+ try:
+ while True:
+ try:
+ chunk = q.get(timeout=5)
+ except Empty:
+ if RADIO_THREAD and not RADIO_THREAD.is_alive():
+ logger.warning("Producer stopped; restarting")
+ ensure_radio_running()
+ continue
+ yield chunk
+ except GeneratorExit:
+ logger.info("Client disconnected (sid=%s)", sid)
+ finally:
+ remove_subscriber(sid)
return Response(stream_with_context(generate()), mimetype="audio/mpeg")
@app.route("/now_playing")
def now_playing():
+ hx = request.headers.get("HX-Request")
+ accept = request.headers.get("Accept", "")
+ if (hx and hx.lower() == "true") or ("text/html" in accept and "application/json" not in accept):
+ title = NOW_PLAYING.get("title") or "Nothing"
+ artist = NOW_PLAYING.get("artist") or "Unknown"
+ vid = NOW_PLAYING.get("id") or ""
+ thumb_url = f"https://img.youtube.com/vi/{vid}/maxresdefault.jpg" if vid else (SITE_IMAGE or "")
+ return f'<img src="{thumb_url}" alt="Cover" style="width:300px;height:300px;object-fit:cover;display:block;margin:0 auto 12px;"><div>{artist} — {title}</div>'
return jsonify(NOW_PLAYING)
+
+
@app.route("/tracks")
def tracks():
track_list = []
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage