From d1db6a11d868978ce6bbb62b362cf6967fa8c0f2 Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Thu, 2 Jul 2026 17:58:24 +0800 Subject: [PATCH] Milestone 4: yt-dlp provider and weighted source scheduler Add a second playback source and a weighted scheduler mixing it with Navidrome: - Scheduler picks a provider by SOURCE_WEIGHTS, falling through to the others when one has nothing ready, so no source can stall playback. - YtdlpProvider reads a hand-maintained config/urls.txt; container URLs (playlist/album/label/artist) are flat-extracted and one entry is drawn at random, honouring the anti-repeat window. Adds Track.source_url. - YtdlpFetcher downloads bestaudio via the yt-dlp library, reusing the atomic hidden-temp-then-rename pattern; Liquidsoap decodes the result. - Queue now dispatches to a fetcher registry keyed by backend. - Sweep orphaned download temp files on daemon startup (leftovers from a killed container otherwise pile up and trip the stream fallback). Verified end-to-end: yt-dlp opus decoded and served as 192 kbps MP3, and the 3:1 default mix observed in play history. Co-Authored-By: Claude Opus 4.8 --- .env.example | 9 ++ .gitignore | 3 + README.md | 27 ++++-- config/urls.txt.example | 14 +++ docker-compose.yml | 6 ++ ingest/radieo/__main__.py | 97 +++++++++++++------ ingest/radieo/config.py | 16 ++++ ingest/radieo/fetchers/ytdlp.py | 71 ++++++++++++++ ingest/radieo/models.py | 1 + ingest/radieo/providers/ytdlp.py | 154 +++++++++++++++++++++++++++++++ ingest/radieo/queue.py | 24 +++-- ingest/radieo/scheduler.py | 39 ++++++++ ingest/requirements.txt | 1 + 13 files changed, 417 insertions(+), 45 deletions(-) create mode 100644 config/urls.txt.example create mode 100644 ingest/radieo/fetchers/ytdlp.py create mode 100644 ingest/radieo/providers/ytdlp.py create mode 100644 ingest/radieo/scheduler.py diff --git a/.env.example b/.env.example index bf5fc43..bc8f78f 100644 --- a/.env.example +++ b/.env.example @@ -10,6 +10,15 @@ RADIEO_NAVIDROME_PASSWORD=monmotdepasse # Nom OU identifiant de la playlist à diffuser. RADIEO_NAVIDROME_PLAYLIST=Radio +# --- Source yt-dlp --- +# La liste d'URL se met dans config/urls.txt (copier config/urls.txt.example). +# Rien à mettre ici ; le fichier absent désactive simplement la source. + +# --- Dosage du mix entre sources (optionnel) --- +# Poids relatifs de tirage de chaque source (0 désactive la source). +RADIEO_WEIGHT_NAVIDROME=3 +RADIEO_WEIGHT_YTDLP=1 + # --- Rétention du cache (optionnel) --- # Nombre de morceaux joués conservés sur disque avant éviction (LRU). RADIEO_RETENTION_KEEP=20 diff --git a/.gitignore b/.gitignore index b58bc4d..9163637 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ # Secrets locaux .env +# Liste de sources yt-dlp personnelle (garder seulement l'exemple) +/config/urls.txt + # État de l'ingestion (base SQLite persistante) /state/ *.db diff --git a/README.md b/README.md index 795d8f8..b68024c 100644 --- a/README.md +++ b/README.md @@ -58,16 +58,27 @@ cp .env.example .env If the Navidrome variables are left empty, the source is simply disabled and the stream plays whatever is already in `cache/` (the milestone-1/2 behaviour). +For the yt-dlp source, list the URLs to draw from in `config/urls.txt` (copy +`config/urls.txt.example`). Each line is either a direct track URL or a +container URL (playlist, album, label, artist page) from which one track is +picked at random. The relative mix between sources is set by +`RADIEO_WEIGHT_NAVIDROME` / `RADIEO_WEIGHT_YTDLP` (a weight of 0 disables a +source); the file being absent also disables yt-dlp. + ## Current status -**Milestone 3 — Navidrome provider: done.** +**Milestone 4 — yt-dlp provider: done.** -- `ingest` pulls tracks from an OpenSubsonic playlist (Navidrome), downloading - them into the shared cache ahead of playback (prefetch buffer). +- Two playback sources feed a weighted scheduler: a Navidrome/OpenSubsonic + playlist and a hand-maintained list of yt-dlp URLs (`config/urls.txt`). + Container URLs (playlist/album/label/artist) are expanded and one track is + drawn at random, honouring the anti-repeat window. +- Each source has its own fetcher (Subsonic stream / yt-dlp download); files are + cached ahead of playback (prefetch buffer) and decoded by Liquidsoap. - Play history and LRU retention are tracked in a SQLite database under `state/`: only the N most recently played files are kept on disk - (`RADIEO_RETENTION_KEEP`, default 20); anti-repeat avoids replaying a track - seen among the last plays. + (`RADIEO_RETENTION_KEEP`, default 20). Orphaned download temp files are swept + on startup. - `GET /next` returns the next track as an annotated Liquidsoap URI with real title/artist metadata (or an empty body when nothing is ready). - `stream` (Liquidsoap v2.4.5) pulls via `request.dynamic` and falls back to the @@ -75,7 +86,7 @@ the stream plays whatever is already in `cache/` (the milestone-1/2 behaviour). - HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps), multiple simultaneous listeners supported. -The yt-dlp and ListenBrainz sources come next. +The ListenBrainz suggestion feed comes next. ## Roadmap @@ -84,8 +95,8 @@ The yt-dlp and ListenBrainz sources come next. switches to a `request.dynamic` source with the cache as fallback. 3. ✅ **Navidrome provider** — play from an OpenSubsonic playlist, with caching, LRU retention and play history. -4. **yt-dlp provider** — fetch tracks from a maintained URL/artist list; weighted - mixing between sources. +4. ✅ **yt-dlp provider** — fetch tracks from a maintained URL/artist list; + weighted mixing between sources. 5. **Canonicalizer** — ListenBrainz MBID lookup for source-agnostic de-duplication. 6. **ListenBrainz provider** — parse the RSS suggestions feed and resolve each diff --git a/config/urls.txt.example b/config/urls.txt.example new file mode 100644 index 0000000..0d69ff0 --- /dev/null +++ b/config/urls.txt.example @@ -0,0 +1,14 @@ +# radieo — liste de sources yt-dlp. Copier en `config/urls.txt`. +# Une URL par ligne. Les lignes vides et celles commençant par '#' sont ignorées. +# +# Chaque URL peut être : +# - un morceau précis (téléchargé tel quel) +# - une playlist / album / label / page d'artiste +# -> radieo y pioche un morceau au hasard à chaque tour, +# en évitant ceux joués récemment. +# +# Exemples (à remplacer par les tiens) : +# https://www.youtube.com/watch?v=dQw4w9WgXcQ +# https://soundcloud.com/artiste/un-morceau +# https://artiste.bandcamp.com/album/un-album +# https://www.youtube.com/playlist?list=PLxxxxxxxx diff --git a/docker-compose.yml b/docker-compose.yml index de177e4..7e35c21 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: volumes: - ./cache:/cache # volume partagé avec le stream (rw : téléchargements) - ./state:/state # état persistant (SQLite) hors du cache éphémère + - ./config:/config:ro # config éditable sans rebuild (urls.txt yt-dlp) environment: - RADIEO_CACHE_DIR=/cache - RADIEO_STATE_DIR=/state @@ -16,6 +17,11 @@ services: - RADIEO_NAVIDROME_PASSWORD=${RADIEO_NAVIDROME_PASSWORD:-} - RADIEO_NAVIDROME_PLAYLIST=${RADIEO_NAVIDROME_PLAYLIST:-} - RADIEO_RETENTION_KEEP=${RADIEO_RETENTION_KEEP:-20} + # Source yt-dlp : liste d'URL dans config/urls.txt (créer depuis l'exemple). + - RADIEO_YTDLP_URLS_FILE=/config/urls.txt + # Dosage du mix entre les sources (0 désactive). + - RADIEO_WEIGHT_NAVIDROME=${RADIEO_WEIGHT_NAVIDROME:-3} + - RADIEO_WEIGHT_YTDLP=${RADIEO_WEIGHT_YTDLP:-1} restart: unless-stopped stream: diff --git a/ingest/radieo/__main__.py b/ingest/radieo/__main__.py index ad6fd08..31c8874 100644 --- a/ingest/radieo/__main__.py +++ b/ingest/radieo/__main__.py @@ -6,39 +6,81 @@ from . import config from .api import IngestServer from .db import Database from .queue import TrackQueue +from .scheduler import Scheduler log = logging.getLogger("radieo") -class _NullProvider: - """Fallback provider when no source is configured: never yields a track.""" - - def next(self): - return None - - def _build_pipeline(db: Database): - """Return (provider, fetcher). Falls back to a no-op provider when the - Navidrome source is not configured, so the daemon still runs and the - stream plays its local cache.""" - if not config.NAVIDROME_ENABLED: - log.warning( - "Navidrome not configured (RADIEO_NAVIDROME_*): no source active, " - "the stream will play its local cache fallback." + """Return (scheduler, fetchers). Assembles whichever sources are enabled; + when none is, the scheduler yields nothing and the stream plays its local + cache fallback.""" + providers = [] # list[(provider, weight)] + fetchers = {} # backend name -> fetcher + + if config.NAVIDROME_ENABLED: + from .fetchers.subsonic import SubsonicFetcher + from .providers.navidrome import NavidromeProvider + from .subsonic import SubsonicClient + + client = SubsonicClient( + config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD ) - return _NullProvider(), None + provider = NavidromeProvider(client, config.NAVIDROME_PLAYLIST, db) + providers.append((provider, config.SOURCE_WEIGHTS.get("navidrome", 0))) + fetchers["subsonic"] = SubsonicFetcher(client, config.CACHE_DIR) + log.info( + "Navidrome source enabled (playlist=%r, weight=%d)", + config.NAVIDROME_PLAYLIST, + config.SOURCE_WEIGHTS.get("navidrome", 0), + ) + else: + log.warning("Navidrome not configured (RADIEO_NAVIDROME_*): source off.") - from .fetchers.subsonic import SubsonicFetcher - from .providers.navidrome import NavidromeProvider - from .subsonic import SubsonicClient + if config.SOURCE_WEIGHTS.get("ytdlp", 0) > 0 and config.YTDLP_URLS_FILE.exists(): + from .fetchers.ytdlp import YtdlpFetcher + from .providers.ytdlp import YtdlpProvider - client = SubsonicClient( - config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD - ) - provider = NavidromeProvider(client, config.NAVIDROME_PLAYLIST, db) - fetcher = SubsonicFetcher(client, config.CACHE_DIR) - log.info("Navidrome source enabled (playlist=%r)", config.NAVIDROME_PLAYLIST) - return provider, fetcher + provider = YtdlpProvider(config.YTDLP_URLS_FILE, db) + providers.append((provider, config.SOURCE_WEIGHTS["ytdlp"])) + fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR) + log.info( + "yt-dlp source enabled (urls=%s, weight=%d)", + config.YTDLP_URLS_FILE, + config.SOURCE_WEIGHTS["ytdlp"], + ) + else: + log.info( + "yt-dlp source off (weight=%d, urls file present=%s).", + config.SOURCE_WEIGHTS.get("ytdlp", 0), + config.YTDLP_URLS_FILE.exists(), + ) + + if not providers: + log.warning("no source active: the stream plays its local cache only.") + return Scheduler(providers), fetchers + + +def _sweep_temp_files() -> None: + """Delete orphaned download temp files left by a killed container. + + Fetchers write to hidden temp files and rename them into place atomically; + the only hidden files we ever create are those temps. If a download is + interrupted (SIGKILL on ``compose down``), the temp survives and the stream + fallback keeps trying to decode it. Clearing them on startup avoids that + noise and reclaims disk. + """ + removed = 0 + for path in config.CACHE_DIR.glob(".*"): + if path.name == ".gitkeep" or not path.is_file(): + continue + try: + path.unlink() + removed += 1 + except OSError: + log.warning("could not remove stale temp %s", path.name) + if removed: + log.info("swept %d stale temp file(s) from cache", removed) def main() -> None: @@ -49,10 +91,11 @@ def main() -> None: config.CACHE_DIR.mkdir(parents=True, exist_ok=True) config.STATE_DIR.mkdir(parents=True, exist_ok=True) + _sweep_temp_files() db = Database(config.STATE_DB) - provider, fetcher = _build_pipeline(db) - queue = TrackQueue(provider, fetcher, db) + scheduler, fetchers = _build_pipeline(db) + queue = TrackQueue(scheduler, fetchers, db) queue.start() server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue) diff --git a/ingest/radieo/config.py b/ingest/radieo/config.py index 0dc8f4f..2b7939e 100644 --- a/ingest/radieo/config.py +++ b/ingest/radieo/config.py @@ -41,3 +41,19 @@ PLAYLIST_REFRESH = float(os.environ.get("RADIEO_PLAYLIST_REFRESH", "300")) NAVIDROME_ENABLED = bool( NAVIDROME_URL and NAVIDROME_USER and NAVIDROME_PASSWORD and NAVIDROME_PLAYLIST ) + +# --- yt-dlp source --- +# Plain text file (one URL per line, '#' comments) mounted into the container. +# May contain direct track URLs or container URLs (playlist/album/label/artist), +# from which one track is picked at random. Left absent disables the source. +YTDLP_URLS_FILE = Path(os.environ.get("RADIEO_YTDLP_URLS_FILE", "/config/urls.txt")) +# How often to reload the URL list and re-expand container URLs, in seconds. +YTDLP_REFRESH = float(os.environ.get("RADIEO_YTDLP_REFRESH", "300")) + +# --- Source mix (weights) --- +# Relative odds of drawing from each source. Isolated here on purpose so the +# mix can later move to a config file. A weight of 0 disables a source. +SOURCE_WEIGHTS = { + "navidrome": int(os.environ.get("RADIEO_WEIGHT_NAVIDROME", "3")), + "ytdlp": int(os.environ.get("RADIEO_WEIGHT_YTDLP", "1")), +} diff --git a/ingest/radieo/fetchers/ytdlp.py b/ingest/radieo/fetchers/ytdlp.py new file mode 100644 index 0000000..79b1d60 --- /dev/null +++ b/ingest/radieo/fetchers/ytdlp.py @@ -0,0 +1,71 @@ +"""YtdlpFetcher: download a ``ytdlp`` track into the cache directory. + +Mirrors SubsonicFetcher's atomic pattern: yt-dlp writes to a hidden temporary +name and the finished file is renamed into place, so the ready file appears +atomically on the shared cache mount. Temps use a distinctive hidden prefix so +the reuse glob ignores them and the daemon's startup sweep can reclaim any left +behind by an interrupted download. + +We grab ``bestaudio`` and keep the original container (webm/opus/m4a/mp3…); +Liquidsoap decodes it via ffmpeg, so no re-encoding is needed. +""" + +import hashlib +import logging +from pathlib import Path +from uuid import uuid4 + +from ..models import Track + +log = logging.getLogger("radieo.fetcher.ytdlp") + + +class YtdlpFetcher: + backend = "ytdlp" + + def __init__(self, cache_dir: Path): + self._cache_dir = cache_dir + + def fetch(self, track: Track) -> Path: + h = hashlib.sha1(track.locator.encode()).hexdigest()[:16] + stem = f"ytdlp-{h}" + # Reuse a still-cached copy rather than downloading again. + for existing in self._cache_dir.glob(f"{stem}.*"): + if not existing.name.endswith(".part"): + return existing + + tmp_base = self._cache_dir / f".{stem}.{uuid4().hex}" + opts = { + "format": "bestaudio/best", + "outtmpl": str(tmp_base) + ".%(ext)s", + "quiet": True, + "no_warnings": True, + "noprogress": True, + "noplaylist": True, + "socket_timeout": 30, + "retries": 2, + } + try: + from yt_dlp import YoutubeDL + + with YoutubeDL(opts) as ydl: + info = ydl.extract_info(track.locator, download=True) + downloaded = self._downloaded_path(info, tmp_base) + dest = self._cache_dir / f"{stem}{downloaded.suffix}" + downloaded.replace(dest) + except BaseException: + for leftover in self._cache_dir.glob(f"{tmp_base.name}*"): + leftover.unlink(missing_ok=True) + raise + log.info("downloaded %s -> %s", track, dest.name) + return dest + + def _downloaded_path(self, info: dict, tmp_base: Path) -> Path: + reqs = info.get("requested_downloads") + if reqs and reqs[0].get("filepath"): + return Path(reqs[0]["filepath"]) + # Fallback: find the produced file (ignore yt-dlp's own .part leftovers). + for candidate in self._cache_dir.glob(f"{tmp_base.name}.*"): + if not candidate.name.endswith(".part"): + return candidate + raise FileNotFoundError(f"yt-dlp produced no file for {tmp_base.name}") diff --git a/ingest/radieo/models.py b/ingest/radieo/models.py index f2aaf8e..3ffb930 100644 --- a/ingest/radieo/models.py +++ b/ingest/radieo/models.py @@ -18,6 +18,7 @@ class Track: origin: str # provider that produced it, e.g. "navidrome" mbid: str | None = None # filled by the Canonicalizer (milestone 5) source_ext: str | None = None # filename hint, e.g. "mp3", "flac" + source_url: str | None = None # container URL a track was picked from @property def key(self) -> str: diff --git a/ingest/radieo/providers/ytdlp.py b/ingest/radieo/providers/ytdlp.py new file mode 100644 index 0000000..8cdff38 --- /dev/null +++ b/ingest/radieo/providers/ytdlp.py @@ -0,0 +1,154 @@ +"""YtdlpProvider: picks tracks from a hand-maintained list of URLs. + +The list is a plain text file (one URL per line, ``#`` comments allowed), +mounted into the container so it can be edited without a rebuild. Each line may +be either a direct track URL or a *container* URL (playlist, album, label, +artist page); container URLs are expanded with a flat yt-dlp extraction and one +entry is picked at random, honouring the anti-repeat window. + +The provider only *resolves* references — it emits ``ytdlp`` tracks whose +locator is the chosen media URL. The actual download happens in the matching +fetcher (milestone 4). Flat extraction is cached per source URL to avoid +re-hitting the network on every pick. +""" + +import logging +import random +import time +from pathlib import Path + +from .. import config +from ..db import Database +from ..models import Track + +log = logging.getLogger("radieo.provider.ytdlp") + + +class YtdlpProvider: + name = "ytdlp" + + def __init__(self, urls_file: Path, db: Database): + self._urls_file = urls_file + self._db = db + self._urls: list[str] = [] + self._loaded_at = 0.0 + # source URL -> (entries, loaded_at); entries are normalized dicts. + self._expanded: dict[str, tuple[list[dict], float]] = {} + + # --- source list ------------------------------------------------------ + + def _load_urls(self) -> None: + now = time.time() + if self._urls and now - self._loaded_at < config.YTDLP_REFRESH: + return + try: + lines = self._urls_file.read_text(encoding="utf-8").splitlines() + except OSError: + self._urls = [] + self._loaded_at = now + return + urls = [] + for line in lines: + line = line.strip() + if line and not line.startswith("#"): + urls.append(line) + if urls != self._urls: + log.info("loaded %d yt-dlp source URL(s)", len(urls)) + self._urls = urls + self._loaded_at = now + + # --- resolution ------------------------------------------------------- + + def next(self) -> Track | None: + self._load_urls() + if not self._urls: + return None + recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW) + # Try source lines in random order until one yields a usable track. + candidates = list(self._urls) + random.shuffle(candidates) + for src in candidates: + track = self._resolve(src, recent) + if track is not None: + return track + return None + + def _resolve(self, src_url: str, recent: set[str]) -> Track | None: + try: + entries = self._entries_for(src_url) + except Exception as exc: # yt-dlp raises many extractor-specific errors + log.warning("yt-dlp could not resolve %s: %s", src_url, exc) + return None + if not entries: + return None + pool = [e for e in entries if f"ytdlp:{e['url']}" not in recent] or entries + entry = random.choice(pool) + return Track( + backend="ytdlp", + locator=entry["url"], + artist=entry.get("artist") or "Unknown artist", + title=entry.get("title") or entry["url"], + origin=self.name, + source_url=src_url if src_url != entry["url"] else None, + ) + + def _entries_for(self, src_url: str) -> list[dict]: + now = time.time() + cached = self._expanded.get(src_url) + if cached and now - cached[1] < config.YTDLP_REFRESH: + return cached[0] + entries = self._extract(src_url) + self._expanded[src_url] = (entries, now) + return entries + + @staticmethod + def _extract(src_url: str) -> list[dict]: + from yt_dlp import YoutubeDL + + opts = { + "quiet": True, + "no_warnings": True, + "extract_flat": "in_playlist", + "skip_download": True, + "socket_timeout": 30, + } + with YoutubeDL(opts) as ydl: + info = ydl.extract_info(src_url, download=False) + if info is None: + return [] + + raw_entries = info.get("entries") + if raw_entries is not None: # container URL -> list of tracks + out = [] + for e in raw_entries: + if not e: + continue + url = e.get("url") or e.get("webpage_url") or e.get("id") + if not url: + continue + out.append( + { + "url": url, + "title": e.get("title"), + "artist": ( + e.get("artist") + or e.get("uploader") + or e.get("channel") + or info.get("uploader") + ), + } + ) + return out + + # direct track URL + return [ + { + "url": info.get("webpage_url") or src_url, + "title": info.get("title"), + "artist": ( + info.get("artist") + or info.get("uploader") + or info.get("channel") + ), + } + ] diff --git a/ingest/radieo/queue.py b/ingest/radieo/queue.py index 657c986..8ba7447 100644 --- a/ingest/radieo/queue.py +++ b/ingest/radieo/queue.py @@ -1,13 +1,13 @@ """Prefetching pipeline feeding ready-to-play tracks to the stream layer. A background thread keeps a small buffer of downloaded tracks: it asks the -provider what to play next, has the matching fetcher download it into the -cache, and enqueues the resulting file. ``pop_next`` hands the oldest ready +scheduler what to play next, hands the track to the fetcher registered for its +backend, and enqueues the resulting file. ``pop_next`` hands the oldest ready track to the HTTP API, records the play and runs LRU retention. -If the provider has nothing (e.g. Navidrome not configured, or unreachable), -the buffer simply stays empty and ``pop_next`` returns ``None`` — the stream -then plays its own local-cache fallback. +If no source has anything (e.g. nothing configured, or all unreachable), the +buffer simply stays empty and ``pop_next`` returns ``None`` — the stream then +plays its own local-cache fallback. """ import logging @@ -23,9 +23,9 @@ log = logging.getLogger("radieo.queue") class TrackQueue: - def __init__(self, provider, fetcher, db: Database): - self._provider = provider - self._fetcher = fetcher + def __init__(self, scheduler, fetchers, db: Database): + self._scheduler = scheduler + self._fetchers = fetchers # dict: backend name -> fetcher self._db = db self._lock = threading.Lock() self._ready: deque[tuple[Path, Track]] = deque() @@ -56,11 +56,15 @@ class TrackQueue: for _ in range(max(0, missing)): if self._stop.is_set(): return - track = self._provider.next() + track = self._scheduler.next() if track is None: return # nothing to fetch right now + fetcher = self._fetchers.get(track.backend) + if fetcher is None: + log.error("no fetcher for backend %r (%s)", track.backend, track) + continue try: - path = self._fetcher.fetch(track) + path = fetcher.fetch(track) except Exception: log.exception("fetch failed for %s", track) continue diff --git a/ingest/radieo/scheduler.py b/ingest/radieo/scheduler.py new file mode 100644 index 0000000..52ddec5 --- /dev/null +++ b/ingest/radieo/scheduler.py @@ -0,0 +1,39 @@ +"""Weighted scheduler mixing several providers. + +Picks a provider at random, weighted by ``SOURCE_WEIGHTS``, and asks it for a +track. If the chosen provider has nothing right now (empty list, unreachable +source…), the remaining providers are tried in weighted-random order, so a +temporarily-idle source never stalls playback. Returns ``None`` only when every +provider is exhausted. + +The weights live in one place (``config.SOURCE_WEIGHTS``) so the mix can later +move to a config file without touching this logic. +""" + +import logging +import random + +log = logging.getLogger("radieo.scheduler") + + +class Scheduler: + def __init__(self, entries): + # entries: list[(provider, weight)]; drop non-positive weights. + self._entries = [(p, w) for p, w in entries if w > 0] + + def next(self): + pool = list(self._entries) + while pool: + weights = [w for _, w in pool] + idx = random.choices(range(len(pool)), weights=weights, k=1)[0] + provider, _ = pool.pop(idx) + try: + track = provider.next() + except Exception: # a provider must never crash the loop + log.exception( + "provider %s failed", getattr(provider, "name", "?") + ) + continue + if track is not None: + return track + return None diff --git a/ingest/requirements.txt b/ingest/requirements.txt index 6ecf620..59b3f06 100644 --- a/ingest/requirements.txt +++ b/ingest/requirements.txt @@ -1 +1,2 @@ httpx>=0.27 +yt-dlp>=2024.1