from yt_dlp import YoutubeDL from file_util import _load_urls_from_file, _create_or_get_cache, _save_cache import subprocess import json import threading import random import os from dotenv import load_dotenv import logging import time from queue import Queue import uuid import tempfile load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) BITRATE_KBPS = int(os.environ.get("BITRATE_KBPS", "192")) BURST_SECONDS = int(os.environ.get("BURST_SECONDS", "10")) # allow pre-buffering of 10 sec on /stream BASE_URL = os.environ.get("BASE_URL", "http://localhost:8000") PLAYLIST_URL = os.environ.get("PLAYLIST_URL") CACHE_FILE = os.environ.get("CACHE_FILE", "cache.json") RANDOMIZE_PLAYLIST = bool(os.environ.get("RANDOMIZE_PLAYLIST", "false")) META_INTERVAL_SECONDS = int(os.environ.get("META_INTERVAL_SECONDS", "5")) # optional / page SITE_TITLE = os.environ.get("SITE_TITLE", "yt_radio.py") SITE_IMAGE = os.environ.get("SITE_IMAGE", "") if not PLAYLIST_URL: raise RuntimeError("Please set PLAYLIST_URL environment variable") METADATA = {} NOW_PLAYING = {"index": None, "title": "Nothing", "artist": "Unknown", "id": ""} _CACHE_LOCK = threading.Lock() _CACHE = _create_or_get_cache(CACHE_FILE) SUBSCRIBERS = {} # sid -> Queue[bytes] SUBSCRIBERS_LOCK = threading.Lock() RADIO_THREAD = None RADIO_STOP = threading.Event() SUBSCRIBER_EVENT = threading.Event() CHUNK_SIZE = 8192 QUEUE_MAX_CHUNKS = 256 def convert_playlist_to_links(link: str): # Loading a .radio file if link.endswith(".radio"): logger.info(".radio file specified, loading from local file") return _load_urls_from_file(link) # Pull YouTube list of URLs from YouTube Playlist ydl_opts = { "quiet": True, "extract_flat": True, } urls = [] logger.info("Starting conversion of playlist to links: %s", link) try: with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(link, download=False) except Exception: logger.exception("yt-dlp failed to extract playlist info for %s", link) exit(1) entries = info.get("entries") if isinstance(info, dict) else None if isinstance(entries, list): logger.info("Playlist info returned %d entries", len(entries)) else: logger.error("Could not find list of links. Are you providing a YouTube Playlist URL?") exit(1) for idx, entry in enumerate(entries, start=1): entry_id = None if isinstance(entry, dict): entry_id = entry.get("id") or entry.get("url") elif isinstance(entry, str): entry_id = entry if not entry_id: logger.debug("Skipping playlist entry #%d: no id/url present", idx) continue if isinstance(entry_id, str) and entry_id.startswith("http"): urls.append(entry_id) logger.debug("Playlist entry #%d: added direct URL %s", idx, entry_id) else: constructed = f"https://www.youtube.com/watch?v={entry_id}" urls.append(constructed) logger.debug("Playlist entry #%d: constructed URL %s from id %s", idx, constructed, entry_id) logger.info("Finished converting playlist: %d links generated", len(urls)) return urls def fetch_metadata(index, url): with _CACHE_LOCK: cached = _CACHE.get(url) if cached: # we can skip fetching from youtube if we've cached it before try: METADATA[index] = { "title": cached.get("title", f"Track {index+1}"), "artist": cached.get("uploader", "Unknown"), "duration": cached.get("duration", -1), "id": cached.get("id", ""), } return except Exception: logger.exception("Failed to use cached metadata for %s, will refetch", url) # Get metadata via yt-dlp try: result = subprocess.run( ["yt-dlp", "--dump-json", url], capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=30, ) if result.returncode != 0 or not result.stdout: logger.error("Failed to get metadata from yt-dlp, you may or may not be throttled!") raise RuntimeError(f"yt-dlp failed for {url}: {result.stderr.strip()}") data = json.loads(result.stdout) METADATA[index] = { "title": data.get("title", f"Track {index+1}"), "artist": data.get("uploader", "Unknown"), "duration": data.get("duration", -1), "id": data.get("id", ""), } with _CACHE_LOCK: # get lock _CACHE[url] = { "title": data.get("title"), "uploader": data.get("uploader"), "duration": data.get("duration"), "id": data.get("id") } _save_cache(_CACHE, CACHE_FILE) except Exception: # Even if we fail to get meta we may be able to stream music still? So don't exit METADATA[index] = { "title": f"Track {index+1}", "artist": "Unknown", "duration": -1, "id": "" } logger.debug("Failed to fetch metadata for index %s, using fallback", index) # Bootstrap PLAYLIST = convert_playlist_to_links(PLAYLIST_URL) PRELOAD_COUNT = min(4, len(PLAYLIST)) _preload_indices = random.sample(range(len(PLAYLIST)), PRELOAD_COUNT) if PLAYLIST else [] for i in _preload_indices: threading.Thread(target=fetch_metadata, args=(i, PLAYLIST[i]), daemon=True).start() logger.info("Playlist loaded: %d tracks (preloading %d)", len(PLAYLIST), PRELOAD_COUNT) logger.info("Stream available at %s/stream", BASE_URL) logger.info("M3U available at %s/playlist.m3u", BASE_URL) def _ensure_metadata(index): if index not in METADATA: fetch_metadata(index, PLAYLIST[index]) def _stream_track(index): url = PLAYLIST[index] _ensure_metadata(index) meta = dict(METADATA.get(index, {"title": f"Track {index+1}", "artist": "Unknown", "duration": -1, "id": ""})) logger.info("Now playing [%d/%d]: %s - %s", index + 1, len(PLAYLIST), meta.get("artist", ""), meta.get("title", "")) ytdlp_err = tempfile.TemporaryFile() ffmpeg_err = tempfile.TemporaryFile() ytdlp = subprocess.Popen( ["yt-dlp", "-f", "bestaudio", "-o", "-", url], stdout=subprocess.PIPE, stderr=ytdlp_err, ) ffmpeg = subprocess.Popen( [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-f", "mp3", "-ab", f"{BITRATE_KBPS}k", "-ar", "44100", "-ac", "2", "pipe:1", ], stdin=ytdlp.stdout, stdout=subprocess.PIPE, stderr=ffmpeg_err, ) if ytdlp.stdout: ytdlp.stdout.close() bytes_per_sec = (BITRATE_KBPS * 1000) // 8 burst_bytes = bytes_per_sec * BURST_SECONDS bytes_sent = 0 start_time = time.monotonic() try: while True: if ffmpeg.stdout is None: logger.warning("No stdout available from FFMPEG") break chunk = ffmpeg.stdout.read(8192) if not chunk: break bytes_sent += len(chunk) yield chunk elapsed = time.monotonic() - start_time expected_bytes = elapsed * bytes_per_sec + burst_bytes if bytes_sent > expected_bytes: sleep_for = (bytes_sent - expected_bytes) / bytes_per_sec time.sleep(sleep_for) finally: try: ffmpeg.kill() except Exception: pass try: ytdlp.kill() except Exception: pass ffmpeg.wait() ytdlp.wait() elapsed = time.monotonic() - start_time logger.info("Finished sending [%d/%d]: %r - %r (bytes_sent=%d, elapsed=%.2fs)", index + 1, len(PLAYLIST), meta.get("artist"), meta.get("title"), bytes_sent, elapsed) try: ffmpeg_err.seek(0) ferr = ffmpeg_err.read().decode("utf-8", errors="replace") if ferr: logger.warning("ffmpeg stderr for track %d: %s", index + 1, ferr.strip()) ytdlp_err.seek(0) yerr = ytdlp_err.read().decode("utf-8", errors="replace") if yerr: logger.warning("yt-dlp stderr for track %d: %s", index + 1, yerr.strip()) except Exception: logger.exception("Failed to read subprocess stderr") finally: try: ffmpeg_err.close() except Exception: pass try: ytdlp_err.close() except Exception: pass def add_subscriber(): q = Queue(maxsize=QUEUE_MAX_CHUNKS) sid = uuid.uuid4().hex with SUBSCRIBERS_LOCK: SUBSCRIBERS[sid] = q SUBSCRIBER_EVENT.set() logger.info("Subscriber added sid=%s (total=%d)", sid, len(SUBSCRIBERS)) return sid, q def remove_subscriber(sid): with SUBSCRIBERS_LOCK: if sid in SUBSCRIBERS: SUBSCRIBERS.pop(sid, None) logger.info("Subscriber removed sid=%s (total=%d)", sid, len(SUBSCRIBERS)) if not SUBSCRIBERS: SUBSCRIBER_EVENT.clear() def broadcast_chunk(track_index: int, chunk: bytes): with SUBSCRIBERS_LOCK: subs = list(SUBSCRIBERS.items()) for _, q in subs: # push the same chunk to all subscribers, we do nothing with sid for now if q.full(): try: q.get_nowait() except Exception: pass try: q.put_nowait((track_index, chunk)) except Exception: pass def _radio_loop(): played = [] while not RADIO_STOP.is_set(): if not SUBSCRIBER_EVENT.wait(timeout=1): continue if not PLAYLIST: logger.error("Playlist is empty, cannot stream") time.sleep(1) continue available = [i for i in range(len(PLAYLIST)) if i not in played] if not available: played.clear() available = list(range(len(PLAYLIST))) index = random.choice(available) played.append(index) try: for chunk in _stream_track(index): if RADIO_STOP.is_set(): break with SUBSCRIBERS_LOCK: if not SUBSCRIBERS: logger.info("No subscribers remaining; stopping track early") break broadcast_chunk(index, chunk) except Exception: logger.exception("Error in radio producer, skipping track") time.sleep(1) continue def ensure_radio_running(): global RADIO_THREAD if RADIO_THREAD and RADIO_THREAD.is_alive(): return RADIO_STOP.clear() RADIO_THREAD = threading.Thread(target=_radio_loop, daemon=True) RADIO_THREAD.start() logger.info("Radio producer started; listeners will share the same track") ensure_radio_running() if __name__ == "__main__": from routes import app app.run(host="0.0.0.0", port=8000, threaded=True)