Milestone 4: yt-dlp provider and weighted source scheduler

Add a second playback source and a weighted scheduler mixing it with
Navidrome:

- Scheduler picks a provider by SOURCE_WEIGHTS, falling through to the
  others when one has nothing ready, so no source can stall playback.
- YtdlpProvider reads a hand-maintained config/urls.txt; container URLs
  (playlist/album/label/artist) are flat-extracted and one entry is
  drawn at random, honouring the anti-repeat window. Adds Track.source_url.
- YtdlpFetcher downloads bestaudio via the yt-dlp library, reusing the
  atomic hidden-temp-then-rename pattern; Liquidsoap decodes the result.
- Queue now dispatches to a fetcher registry keyed by backend.
- Sweep orphaned download temp files on daemon startup (leftovers from a
  killed container otherwise pile up and trip the stream fallback).

Verified end-to-end: yt-dlp opus decoded and served as 192 kbps MP3, and
the 3:1 default mix observed in play history.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
nemunaire 2026-07-02 17:58:24 +08:00
commit d1db6a11d8
13 changed files with 418 additions and 46 deletions

View file

@ -6,39 +6,81 @@ from . import config
from .api import IngestServer
from .db import Database
from .queue import TrackQueue
from .scheduler import Scheduler
log = logging.getLogger("radieo")
class _NullProvider:
"""Fallback provider when no source is configured: never yields a track."""
def next(self):
return None
def _build_pipeline(db: Database):
"""Return (provider, fetcher). Falls back to a no-op provider when the
Navidrome source is not configured, so the daemon still runs and the
stream plays its local cache."""
if not config.NAVIDROME_ENABLED:
log.warning(
"Navidrome not configured (RADIEO_NAVIDROME_*): no source active, "
"the stream will play its local cache fallback."
"""Return (scheduler, fetchers). Assembles whichever sources are enabled;
when none is, the scheduler yields nothing and the stream plays its local
cache fallback."""
providers = [] # list[(provider, weight)]
fetchers = {} # backend name -> fetcher
if config.NAVIDROME_ENABLED:
from .fetchers.subsonic import SubsonicFetcher
from .providers.navidrome import NavidromeProvider
from .subsonic import SubsonicClient
client = SubsonicClient(
config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD
)
return _NullProvider(), None
provider = NavidromeProvider(client, config.NAVIDROME_PLAYLIST, db)
providers.append((provider, config.SOURCE_WEIGHTS.get("navidrome", 0)))
fetchers["subsonic"] = SubsonicFetcher(client, config.CACHE_DIR)
log.info(
"Navidrome source enabled (playlist=%r, weight=%d)",
config.NAVIDROME_PLAYLIST,
config.SOURCE_WEIGHTS.get("navidrome", 0),
)
else:
log.warning("Navidrome not configured (RADIEO_NAVIDROME_*): source off.")
from .fetchers.subsonic import SubsonicFetcher
from .providers.navidrome import NavidromeProvider
from .subsonic import SubsonicClient
if config.SOURCE_WEIGHTS.get("ytdlp", 0) > 0 and config.YTDLP_URLS_FILE.exists():
from .fetchers.ytdlp import YtdlpFetcher
from .providers.ytdlp import YtdlpProvider
client = SubsonicClient(
config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD
)
provider = NavidromeProvider(client, config.NAVIDROME_PLAYLIST, db)
fetcher = SubsonicFetcher(client, config.CACHE_DIR)
log.info("Navidrome source enabled (playlist=%r)", config.NAVIDROME_PLAYLIST)
return provider, fetcher
provider = YtdlpProvider(config.YTDLP_URLS_FILE, db)
providers.append((provider, config.SOURCE_WEIGHTS["ytdlp"]))
fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR)
log.info(
"yt-dlp source enabled (urls=%s, weight=%d)",
config.YTDLP_URLS_FILE,
config.SOURCE_WEIGHTS["ytdlp"],
)
else:
log.info(
"yt-dlp source off (weight=%d, urls file present=%s).",
config.SOURCE_WEIGHTS.get("ytdlp", 0),
config.YTDLP_URLS_FILE.exists(),
)
if not providers:
log.warning("no source active: the stream plays its local cache only.")
return Scheduler(providers), fetchers
def _sweep_temp_files() -> None:
"""Delete orphaned download temp files left by a killed container.
Fetchers write to hidden temp files and rename them into place atomically;
the only hidden files we ever create are those temps. If a download is
interrupted (SIGKILL on ``compose down``), the temp survives and the stream
fallback keeps trying to decode it. Clearing them on startup avoids that
noise and reclaims disk.
"""
removed = 0
for path in config.CACHE_DIR.glob(".*"):
if path.name == ".gitkeep" or not path.is_file():
continue
try:
path.unlink()
removed += 1
except OSError:
log.warning("could not remove stale temp %s", path.name)
if removed:
log.info("swept %d stale temp file(s) from cache", removed)
def main() -> None:
@ -49,10 +91,11 @@ def main() -> None:
config.CACHE_DIR.mkdir(parents=True, exist_ok=True)
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
_sweep_temp_files()
db = Database(config.STATE_DB)
provider, fetcher = _build_pipeline(db)
queue = TrackQueue(provider, fetcher, db)
scheduler, fetchers = _build_pipeline(db)
queue = TrackQueue(scheduler, fetchers, db)
queue.start()
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue)

View file

@ -41,3 +41,19 @@ PLAYLIST_REFRESH = float(os.environ.get("RADIEO_PLAYLIST_REFRESH", "300"))
NAVIDROME_ENABLED = bool(
NAVIDROME_URL and NAVIDROME_USER and NAVIDROME_PASSWORD and NAVIDROME_PLAYLIST
)
# --- yt-dlp source ---
# Plain text file (one URL per line, '#' comments) mounted into the container.
# May contain direct track URLs or container URLs (playlist/album/label/artist),
# from which one track is picked at random. Left absent disables the source.
YTDLP_URLS_FILE = Path(os.environ.get("RADIEO_YTDLP_URLS_FILE", "/config/urls.txt"))
# How often to reload the URL list and re-expand container URLs, in seconds.
YTDLP_REFRESH = float(os.environ.get("RADIEO_YTDLP_REFRESH", "300"))
# --- Source mix (weights) ---
# Relative odds of drawing from each source. Isolated here on purpose so the
# mix can later move to a config file. A weight of 0 disables a source.
SOURCE_WEIGHTS = {
"navidrome": int(os.environ.get("RADIEO_WEIGHT_NAVIDROME", "3")),
"ytdlp": int(os.environ.get("RADIEO_WEIGHT_YTDLP", "1")),
}

View file

@ -0,0 +1,71 @@
"""YtdlpFetcher: download a ``ytdlp`` track into the cache directory.
Mirrors SubsonicFetcher's atomic pattern: yt-dlp writes to a hidden temporary
name and the finished file is renamed into place, so the ready file appears
atomically on the shared cache mount. Temps use a distinctive hidden prefix so
the reuse glob ignores them and the daemon's startup sweep can reclaim any left
behind by an interrupted download.
We grab ``bestaudio`` and keep the original container (webm/opus/m4a/mp3);
Liquidsoap decodes it via ffmpeg, so no re-encoding is needed.
"""
import hashlib
import logging
from pathlib import Path
from uuid import uuid4
from ..models import Track
log = logging.getLogger("radieo.fetcher.ytdlp")
class YtdlpFetcher:
backend = "ytdlp"
def __init__(self, cache_dir: Path):
self._cache_dir = cache_dir
def fetch(self, track: Track) -> Path:
h = hashlib.sha1(track.locator.encode()).hexdigest()[:16]
stem = f"ytdlp-{h}"
# Reuse a still-cached copy rather than downloading again.
for existing in self._cache_dir.glob(f"{stem}.*"):
if not existing.name.endswith(".part"):
return existing
tmp_base = self._cache_dir / f".{stem}.{uuid4().hex}"
opts = {
"format": "bestaudio/best",
"outtmpl": str(tmp_base) + ".%(ext)s",
"quiet": True,
"no_warnings": True,
"noprogress": True,
"noplaylist": True,
"socket_timeout": 30,
"retries": 2,
}
try:
from yt_dlp import YoutubeDL
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(track.locator, download=True)
downloaded = self._downloaded_path(info, tmp_base)
dest = self._cache_dir / f"{stem}{downloaded.suffix}"
downloaded.replace(dest)
except BaseException:
for leftover in self._cache_dir.glob(f"{tmp_base.name}*"):
leftover.unlink(missing_ok=True)
raise
log.info("downloaded %s -> %s", track, dest.name)
return dest
def _downloaded_path(self, info: dict, tmp_base: Path) -> Path:
reqs = info.get("requested_downloads")
if reqs and reqs[0].get("filepath"):
return Path(reqs[0]["filepath"])
# Fallback: find the produced file (ignore yt-dlp's own .part leftovers).
for candidate in self._cache_dir.glob(f"{tmp_base.name}.*"):
if not candidate.name.endswith(".part"):
return candidate
raise FileNotFoundError(f"yt-dlp produced no file for {tmp_base.name}")

View file

@ -18,6 +18,7 @@ class Track:
origin: str # provider that produced it, e.g. "navidrome"
mbid: str | None = None # filled by the Canonicalizer (milestone 5)
source_ext: str | None = None # filename hint, e.g. "mp3", "flac"
source_url: str | None = None # container URL a track was picked from
@property
def key(self) -> str:

View file

@ -0,0 +1,154 @@
"""YtdlpProvider: picks tracks from a hand-maintained list of URLs.
The list is a plain text file (one URL per line, ``#`` comments allowed),
mounted into the container so it can be edited without a rebuild. Each line may
be either a direct track URL or a *container* URL (playlist, album, label,
artist page); container URLs are expanded with a flat yt-dlp extraction and one
entry is picked at random, honouring the anti-repeat window.
The provider only *resolves* references it emits ``ytdlp`` tracks whose
locator is the chosen media URL. The actual download happens in the matching
fetcher (milestone 4). Flat extraction is cached per source URL to avoid
re-hitting the network on every pick.
"""
import logging
import random
import time
from pathlib import Path
from .. import config
from ..db import Database
from ..models import Track
log = logging.getLogger("radieo.provider.ytdlp")
class YtdlpProvider:
name = "ytdlp"
def __init__(self, urls_file: Path, db: Database):
self._urls_file = urls_file
self._db = db
self._urls: list[str] = []
self._loaded_at = 0.0
# source URL -> (entries, loaded_at); entries are normalized dicts.
self._expanded: dict[str, tuple[list[dict], float]] = {}
# --- source list ------------------------------------------------------
def _load_urls(self) -> None:
now = time.time()
if self._urls and now - self._loaded_at < config.YTDLP_REFRESH:
return
try:
lines = self._urls_file.read_text(encoding="utf-8").splitlines()
except OSError:
self._urls = []
self._loaded_at = now
return
urls = []
for line in lines:
line = line.strip()
if line and not line.startswith("#"):
urls.append(line)
if urls != self._urls:
log.info("loaded %d yt-dlp source URL(s)", len(urls))
self._urls = urls
self._loaded_at = now
# --- resolution -------------------------------------------------------
def next(self) -> Track | None:
self._load_urls()
if not self._urls:
return None
recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW)
# Try source lines in random order until one yields a usable track.
candidates = list(self._urls)
random.shuffle(candidates)
for src in candidates:
track = self._resolve(src, recent)
if track is not None:
return track
return None
def _resolve(self, src_url: str, recent: set[str]) -> Track | None:
try:
entries = self._entries_for(src_url)
except Exception as exc: # yt-dlp raises many extractor-specific errors
log.warning("yt-dlp could not resolve %s: %s", src_url, exc)
return None
if not entries:
return None
pool = [e for e in entries if f"ytdlp:{e['url']}" not in recent] or entries
entry = random.choice(pool)
return Track(
backend="ytdlp",
locator=entry["url"],
artist=entry.get("artist") or "Unknown artist",
title=entry.get("title") or entry["url"],
origin=self.name,
source_url=src_url if src_url != entry["url"] else None,
)
def _entries_for(self, src_url: str) -> list[dict]:
now = time.time()
cached = self._expanded.get(src_url)
if cached and now - cached[1] < config.YTDLP_REFRESH:
return cached[0]
entries = self._extract(src_url)
self._expanded[src_url] = (entries, now)
return entries
@staticmethod
def _extract(src_url: str) -> list[dict]:
from yt_dlp import YoutubeDL
opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": "in_playlist",
"skip_download": True,
"socket_timeout": 30,
}
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(src_url, download=False)
if info is None:
return []
raw_entries = info.get("entries")
if raw_entries is not None: # container URL -> list of tracks
out = []
for e in raw_entries:
if not e:
continue
url = e.get("url") or e.get("webpage_url") or e.get("id")
if not url:
continue
out.append(
{
"url": url,
"title": e.get("title"),
"artist": (
e.get("artist")
or e.get("uploader")
or e.get("channel")
or info.get("uploader")
),
}
)
return out
# direct track URL
return [
{
"url": info.get("webpage_url") or src_url,
"title": info.get("title"),
"artist": (
info.get("artist")
or info.get("uploader")
or info.get("channel")
),
}
]

View file

@ -1,13 +1,13 @@
"""Prefetching pipeline feeding ready-to-play tracks to the stream layer.
A background thread keeps a small buffer of downloaded tracks: it asks the
provider what to play next, has the matching fetcher download it into the
cache, and enqueues the resulting file. ``pop_next`` hands the oldest ready
scheduler what to play next, hands the track to the fetcher registered for its
backend, and enqueues the resulting file. ``pop_next`` hands the oldest ready
track to the HTTP API, records the play and runs LRU retention.
If the provider has nothing (e.g. Navidrome not configured, or unreachable),
the buffer simply stays empty and ``pop_next`` returns ``None`` the stream
then plays its own local-cache fallback.
If no source has anything (e.g. nothing configured, or all unreachable), the
buffer simply stays empty and ``pop_next`` returns ``None`` the stream then
plays its own local-cache fallback.
"""
import logging
@ -23,9 +23,9 @@ log = logging.getLogger("radieo.queue")
class TrackQueue:
def __init__(self, provider, fetcher, db: Database):
self._provider = provider
self._fetcher = fetcher
def __init__(self, scheduler, fetchers, db: Database):
self._scheduler = scheduler
self._fetchers = fetchers # dict: backend name -> fetcher
self._db = db
self._lock = threading.Lock()
self._ready: deque[tuple[Path, Track]] = deque()
@ -56,11 +56,15 @@ class TrackQueue:
for _ in range(max(0, missing)):
if self._stop.is_set():
return
track = self._provider.next()
track = self._scheduler.next()
if track is None:
return # nothing to fetch right now
fetcher = self._fetchers.get(track.backend)
if fetcher is None:
log.error("no fetcher for backend %r (%s)", track.backend, track)
continue
try:
path = self._fetcher.fetch(track)
path = fetcher.fetch(track)
except Exception:
log.exception("fetch failed for %s", track)
continue

View file

@ -0,0 +1,39 @@
"""Weighted scheduler mixing several providers.
Picks a provider at random, weighted by ``SOURCE_WEIGHTS``, and asks it for a
track. If the chosen provider has nothing right now (empty list, unreachable
source), the remaining providers are tried in weighted-random order, so a
temporarily-idle source never stalls playback. Returns ``None`` only when every
provider is exhausted.
The weights live in one place (``config.SOURCE_WEIGHTS``) so the mix can later
move to a config file without touching this logic.
"""
import logging
import random
log = logging.getLogger("radieo.scheduler")
class Scheduler:
def __init__(self, entries):
# entries: list[(provider, weight)]; drop non-positive weights.
self._entries = [(p, w) for p, w in entries if w > 0]
def next(self):
pool = list(self._entries)
while pool:
weights = [w for _, w in pool]
idx = random.choices(range(len(pool)), weights=weights, k=1)[0]
provider, _ = pool.pop(idx)
try:
track = provider.next()
except Exception: # a provider must never crash the loop
log.exception(
"provider %s failed", getattr(provider, "name", "?")
)
continue
if track is not None:
return track
return None

View file

@ -1 +1,2 @@
httpx>=0.27
yt-dlp>=2024.1