radieo/ingest/radieo/providers/ytdlp.py
Pierre-Olivier Mercier 7e0f08b863 Milestone 5: MusicBrainz MBID canonicalizer
Give tracks a source-agnostic identity so the same song from different
sources no longer replays in a loop.

- Canonicalizer resolves (artist, title) to a MusicBrainz recording MBID
  (no API key; ~1 req/s, descriptive User-Agent, best-effort). Hits and
  confirmed misses are cached in SQLite; transient errors are not.
- Track.key becomes mbid:<id> when resolved, else a normalized
  name:<artist>|<title> fallback — still source-agnostic.
- Scheduler now owns the authoritative anti-repeat on the canonical key,
  canonicalizing the drawn track with a bounded retry; providers keep a
  cheap recent-locator filter to limit retries.
- db: canonical_cache table, history.locator column with migration for
  existing databases, recent_locators().
- Canonicalization can be turned off via RADIEO_CANONICAL_ENABLED=0.

Verified: MBID hit/cache/miss, cross-source key collapse, scheduler
dodging a recent play, schema migration, and full stack (Navidrome +
yt-dlp) with zero Python tracebacks and a valid 192 kbps MP3 stream.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-02 18:46:30 +08:00

154 lines
5.2 KiB
Python

"""YtdlpProvider: picks tracks from a hand-maintained list of URLs.
The list is a plain text file (one URL per line, ``#`` comments allowed),
mounted into the container so it can be edited without a rebuild. Each line may
be either a direct track URL or a *container* URL (playlist, album, label,
artist page); container URLs are expanded with a flat yt-dlp extraction and one
entry is picked at random, honouring the anti-repeat window.
The provider only *resolves* references — it emits ``ytdlp`` tracks whose
locator is the chosen media URL. The actual download happens in the matching
fetcher (milestone 4). Flat extraction is cached per source URL to avoid
re-hitting the network on every pick.
"""
import logging
import random
import time
from pathlib import Path
from .. import config
from ..db import Database
from ..models import Track
log = logging.getLogger("radieo.provider.ytdlp")
class YtdlpProvider:
name = "ytdlp"
def __init__(self, urls_file: Path, db: Database):
self._urls_file = urls_file
self._db = db
self._urls: list[str] = []
self._loaded_at = 0.0
# source URL -> (entries, loaded_at); entries are normalized dicts.
self._expanded: dict[str, tuple[list[dict], float]] = {}
# --- source list ------------------------------------------------------
def _load_urls(self) -> None:
now = time.time()
if self._urls and now - self._loaded_at < config.YTDLP_REFRESH:
return
try:
lines = self._urls_file.read_text(encoding="utf-8").splitlines()
except OSError:
self._urls = []
self._loaded_at = now
return
urls = []
for line in lines:
line = line.strip()
if line and not line.startswith("#"):
urls.append(line)
if urls != self._urls:
log.info("loaded %d yt-dlp source URL(s)", len(urls))
self._urls = urls
self._loaded_at = now
# --- resolution -------------------------------------------------------
def next(self) -> Track | None:
self._load_urls()
if not self._urls:
return None
recent = self._db.recent_locators(config.ANTIREPEAT_WINDOW)
# Try source lines in random order until one yields a usable track.
candidates = list(self._urls)
random.shuffle(candidates)
for src in candidates:
track = self._resolve(src, recent)
if track is not None:
return track
return None
def _resolve(self, src_url: str, recent: set[str]) -> Track | None:
try:
entries = self._entries_for(src_url)
except Exception as exc: # yt-dlp raises many extractor-specific errors
log.warning("yt-dlp could not resolve %s: %s", src_url, exc)
return None
if not entries:
return None
pool = [e for e in entries if e["url"] not in recent] or entries
entry = random.choice(pool)
return Track(
backend="ytdlp",
locator=entry["url"],
artist=entry.get("artist") or "Unknown artist",
title=entry.get("title") or entry["url"],
origin=self.name,
source_url=src_url if src_url != entry["url"] else None,
)
def _entries_for(self, src_url: str) -> list[dict]:
now = time.time()
cached = self._expanded.get(src_url)
if cached and now - cached[1] < config.YTDLP_REFRESH:
return cached[0]
entries = self._extract(src_url)
self._expanded[src_url] = (entries, now)
return entries
@staticmethod
def _extract(src_url: str) -> list[dict]:
from yt_dlp import YoutubeDL
opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": "in_playlist",
"skip_download": True,
"socket_timeout": 30,
}
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(src_url, download=False)
if info is None:
return []
raw_entries = info.get("entries")
if raw_entries is not None: # container URL -> list of tracks
out = []
for e in raw_entries:
if not e:
continue
url = e.get("url") or e.get("webpage_url") or e.get("id")
if not url:
continue
out.append(
{
"url": url,
"title": e.get("title"),
"artist": (
e.get("artist")
or e.get("uploader")
or e.get("channel")
or info.get("uploader")
),
}
)
return out
# direct track URL
return [
{
"url": info.get("webpage_url") or src_url,
"title": info.get("title"),
"artist": (
info.get("artist")
or info.get("uploader")
or info.get("channel")
),
}
]