From 7e0f08b863e1451b99080dbe4ac270c6c7db9a3c Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Thu, 2 Jul 2026 18:46:30 +0800 Subject: [PATCH] Milestone 5: MusicBrainz MBID canonicalizer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Give tracks a source-agnostic identity so the same song from different sources no longer replays in a loop. - Canonicalizer resolves (artist, title) to a MusicBrainz recording MBID (no API key; ~1 req/s, descriptive User-Agent, best-effort). Hits and confirmed misses are cached in SQLite; transient errors are not. - Track.key becomes mbid: when resolved, else a normalized name:| fallback — still source-agnostic. - Scheduler now owns the authoritative anti-repeat on the canonical key, canonicalizing the drawn track with a bounded retry; providers keep a cheap recent-locator filter to limit retries. - db: canonical_cache table, history.locator column with migration for existing databases, recent_locators(). - Canonicalization can be turned off via RADIEO_CANONICAL_ENABLED=0. Verified: MBID hit/cache/miss, cross-source key collapse, scheduler dodging a recent play, schema migration, and full stack (Navidrome + yt-dlp) with zero Python tracebacks and a valid 192 kbps MP3 stream. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --- .env.example | 7 +++ README.md | 16 +++-- docker-compose.yml | 3 + ingest/radieo/__main__.py | 29 ++++++++- ingest/radieo/canonicalizer.py | 94 ++++++++++++++++++++++++++++ ingest/radieo/config.py | 17 +++++ ingest/radieo/db.py | 83 +++++++++++++++++++++--- ingest/radieo/models.py | 20 ++++-- ingest/radieo/providers/navidrome.py | 11 ++-- ingest/radieo/providers/ytdlp.py | 4 +- ingest/radieo/scheduler.py | 41 +++++++++--- 11 files changed, 292 insertions(+), 33 deletions(-) create mode 100644 ingest/radieo/canonicalizer.py diff --git a/.env.example b/.env.example index bc8f78f..67fea57 100644 --- a/.env.example +++ b/.env.example @@ -19,6 +19,13 @@ RADIEO_NAVIDROME_PLAYLIST=Radio RADIEO_WEIGHT_NAVIDROME=3 RADIEO_WEIGHT_YTDLP=1 +# --- Canonicalizer MBID (optionnel) --- +# Résout (artiste, titre) -> MBID MusicBrainz pour dédupliquer entre sources. +# Aucune clé requise. Mettre 0 pour désactiver (clé = (artiste, titre)). +RADIEO_CANONICAL_ENABLED=1 +# User-Agent envoyé à MusicBrainz (qui en exige un, descriptif). +RADIEO_USER_AGENT=radieo/0.1 (personal music radio) + # --- Rétention du cache (optionnel) --- # Nombre de morceaux joués conservés sur disque avant éviction (LRU). RADIEO_RETENTION_KEEP=20 diff --git a/README.md b/README.md index b68024c..5e979b4 100644 --- a/README.md +++ b/README.md @@ -67,12 +67,18 @@ source); the file being absent also disables yt-dlp. ## Current status -**Milestone 4 — yt-dlp provider: done.** +**Milestone 5 — MBID canonicalizer: done.** - Two playback sources feed a weighted scheduler: a Navidrome/OpenSubsonic playlist and a hand-maintained list of yt-dlp URLs (`config/urls.txt`). Container URLs (playlist/album/label/artist) are expanded and one track is - drawn at random, honouring the anti-repeat window. + drawn at random. +- Each track is canonicalized to a MusicBrainz recording MBID (no API key + needed; ~1 req/s, best-effort, results cached in SQLite). This gives a + source-agnostic identity, so the same song from two sources collapses to one; + when no confident match is found it falls back to a normalized + `(artist, title)` key. The scheduler uses this canonical key for anti-repeat, + with the providers applying a cheap locator filter first. - Each source has its own fetcher (Subsonic stream / yt-dlp download); files are cached ahead of playback (prefetch buffer) and decoded by Liquidsoap. - Play history and LRU retention are tracked in a SQLite database under @@ -86,7 +92,9 @@ source); the file being absent also disables yt-dlp. - HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps), multiple simultaneous listeners supported. -The ListenBrainz suggestion feed comes next. +The ListenBrainz suggestion feed comes next. (Known cosmetic quirk: at startup +the fallback logs a few harmless ffmpeg "Invalid data" warnings while probing +non-audio files such as `.gitkeep`; to be quieted in the polish milestone.) ## Roadmap @@ -97,7 +105,7 @@ The ListenBrainz suggestion feed comes next. LRU retention and play history. 4. ✅ **yt-dlp provider** — fetch tracks from a maintained URL/artist list; weighted mixing between sources. -5. **Canonicalizer** — ListenBrainz MBID lookup for source-agnostic +5. ✅ **Canonicalizer** — MusicBrainz MBID lookup for source-agnostic de-duplication. 6. **ListenBrainz provider** — parse the RSS suggestions feed and resolve each one to Navidrome or yt-dlp. diff --git a/docker-compose.yml b/docker-compose.yml index 7e35c21..0862ffd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,9 @@ services: # Dosage du mix entre les sources (0 désactive). - RADIEO_WEIGHT_NAVIDROME=${RADIEO_WEIGHT_NAVIDROME:-3} - RADIEO_WEIGHT_YTDLP=${RADIEO_WEIGHT_YTDLP:-1} + # Canonicalizer MusicBrainz (identité MBID inter-sources ; sans clé). + - RADIEO_CANONICAL_ENABLED=${RADIEO_CANONICAL_ENABLED:-1} + - RADIEO_USER_AGENT=${RADIEO_USER_AGENT:-radieo/0.1 (personal music radio)} restart: unless-stopped stream: diff --git a/ingest/radieo/__main__.py b/ingest/radieo/__main__.py index 31c8874..831946c 100644 --- a/ingest/radieo/__main__.py +++ b/ingest/radieo/__main__.py @@ -11,8 +11,18 @@ from .scheduler import Scheduler log = logging.getLogger("radieo") +class _NullCanonicalizer: + """Used when canonicalization is disabled: leaves the Track untouched.""" + + def canonicalize(self, track): + return track + + def close(self): + pass + + def _build_pipeline(db: Database): - """Return (scheduler, fetchers). Assembles whichever sources are enabled; + """Return (providers, fetchers). Assembles whichever sources are enabled; when none is, the scheduler yields nothing and the stream plays its local cache fallback.""" providers = [] # list[(provider, weight)] @@ -58,7 +68,7 @@ def _build_pipeline(db: Database): if not providers: log.warning("no source active: the stream plays its local cache only.") - return Scheduler(providers), fetchers + return providers, fetchers def _sweep_temp_files() -> None: @@ -94,7 +104,19 @@ def main() -> None: _sweep_temp_files() db = Database(config.STATE_DB) - scheduler, fetchers = _build_pipeline(db) + + if config.CANONICAL_ENABLED: + from .canonicalizer import Canonicalizer + + canonicalizer = Canonicalizer(db) + log.info("Canonicalizer enabled (MusicBrainz, min_score=%d)", + config.CANONICAL_MIN_SCORE) + else: + canonicalizer = _NullCanonicalizer() + log.info("Canonicalizer disabled: tracks keyed by (artist, title).") + + providers, fetchers = _build_pipeline(db) + scheduler = Scheduler(providers, canonicalizer, db) queue = TrackQueue(scheduler, fetchers, db) queue.start() @@ -113,6 +135,7 @@ def main() -> None: finally: queue.stop() server.server_close() + canonicalizer.close() db.close() diff --git a/ingest/radieo/canonicalizer.py b/ingest/radieo/canonicalizer.py new file mode 100644 index 0000000..bdcbf05 --- /dev/null +++ b/ingest/radieo/canonicalizer.py @@ -0,0 +1,94 @@ +"""Canonicalizer: resolve a Track to a MusicBrainz recording MBID. + +Given ``(artist, title)`` it queries the MusicBrainz recording search and keeps +the best match above a score threshold. Results — hits *and* confirmed misses — +are cached in SQLite so the network is hit at most once per distinct track. +Genuine no-matches are cached; transient network errors are not, so they get +retried later. + +MusicBrainz asks anonymous clients to stay under ~1 request/second and to send a +descriptive User-Agent; both are honoured here. The whole thing is best-effort: +any failure just leaves ``mbid`` unset and the Track falls back to its +name-based key. +""" + +import logging +import threading +import time +from dataclasses import replace + +import httpx + +from . import config +from .models import Track, norm_name + +log = logging.getLogger("radieo.canonicalizer") + + +class Canonicalizer: + def __init__(self, db): + self._db = db + self._http = httpx.Client( + timeout=httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0), + headers={"User-Agent": config.USER_AGENT}, + follow_redirects=True, + ) + self._rate_lock = threading.Lock() + self._last_call = 0.0 + + def canonicalize(self, track: Track) -> Track: + """Return the Track with ``mbid`` filled when resolvable, else unchanged.""" + artist_norm = norm_name(track.artist) + title_norm = norm_name(track.title) + if not artist_norm or not title_norm: + return track + + cached, mbid = self._db.get_canonical(artist_norm, title_norm) + if not cached: + ok, mbid = self._lookup(track.artist, track.title) + if ok: # cache hits and genuine misses; skip transient errors + self._db.put_canonical(artist_norm, title_norm, mbid) + return replace(track, mbid=mbid) if mbid else track + + # --- MusicBrainz ------------------------------------------------------ + + def _lookup(self, artist: str, title: str) -> tuple[bool, str | None]: + """Return (ok, mbid). ``ok`` False on a transient error (do not cache).""" + query = f'artist:"{_escape(artist)}" AND recording:"{_escape(title)}"' + self._throttle() + try: + resp = self._http.get( + config.MUSICBRAINZ_URL, + params={"query": query, "fmt": "json", "limit": 3}, + ) + resp.raise_for_status() + data = resp.json() + except (httpx.HTTPError, ValueError) as exc: + log.warning("MusicBrainz lookup failed for %s — %s: %s", artist, title, exc) + return False, None + + recordings = data.get("recordings") or [] + if recordings: + best = recordings[0] + if int(best.get("score", 0)) >= config.CANONICAL_MIN_SCORE: + mbid = best.get("id") + log.info("MBID %s for %s — %s", mbid, artist, title) + return True, mbid + log.info("no confident MBID for %s — %s", artist, title) + return True, None # genuine miss: cache it + + def _throttle(self) -> None: + with self._rate_lock: + wait = config.CANONICAL_RATE_INTERVAL - (time.time() - self._last_call) + if wait > 0: + time.sleep(wait) + self._last_call = time.time() + + def close(self) -> None: + self._http.close() + + +# Lucene special characters we quote around; escape the ones that would break a +# quoted phrase. +def _escape(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"') diff --git a/ingest/radieo/config.py b/ingest/radieo/config.py index 2b7939e..402dcda 100644 --- a/ingest/radieo/config.py +++ b/ingest/radieo/config.py @@ -27,6 +27,23 @@ PREFETCH_INTERVAL = float(os.environ.get("RADIEO_PREFETCH_INTERVAL", "2.0")) RETENTION_KEEP = int(os.environ.get("RADIEO_RETENTION_KEEP", "20")) # Do not replay a track seen among the last N plays, when avoidable. ANTIREPEAT_WINDOW = int(os.environ.get("RADIEO_ANTIREPEAT_WINDOW", "50")) +# How many draws the scheduler tries to dodge a recent repeat before giving up. +SCHEDULER_MAX_TRIES = int(os.environ.get("RADIEO_SCHEDULER_MAX_TRIES", "5")) + +# --- Canonicalizer (MusicBrainz MBID lookup) --- +# Resolves (artist, title) -> recording MBID for source-agnostic de-dup. +CANONICAL_ENABLED = os.environ.get("RADIEO_CANONICAL_ENABLED", "1") != "0" +MUSICBRAINZ_URL = os.environ.get( + "RADIEO_MUSICBRAINZ_URL", "https://musicbrainz.org/ws/2/recording" +) +# Minimum MusicBrainz match score (0-100) to accept a recording as canonical. +CANONICAL_MIN_SCORE = int(os.environ.get("RADIEO_CANONICAL_MIN_SCORE", "90")) +# Minimum seconds between MusicBrainz requests (anonymous limit ~1 req/s). +CANONICAL_RATE_INTERVAL = float(os.environ.get("RADIEO_CANONICAL_RATE_INTERVAL", "1.1")) +# Sent to MusicBrainz (which requires a descriptive User-Agent) and yt-dlp/others. +USER_AGENT = os.environ.get( + "RADIEO_USER_AGENT", "radieo/0.1 (personal music radio)" +) # --- Navidrome / OpenSubsonic source --- # Left empty means the provider is disabled (the stream then plays its own diff --git a/ingest/radieo/db.py b/ingest/radieo/db.py index c9aff9c..e48bacf 100644 --- a/ingest/radieo/db.py +++ b/ingest/radieo/db.py @@ -1,13 +1,17 @@ -"""SQLite state: play history (anti-repeat + stats) and cache-file retention. +"""SQLite state: play history, cache-file retention and MBID canonical cache. -Two concerns, two tables: +Three concerns, three tables: -- ``history`` is append-only. It drives anti-repeat (recently played track - keys) and survives cache eviction, so a track can stay "recently played" even - after its file is deleted. +- ``history`` is append-only. It drives anti-repeat (recently played canonical + keys, and raw locators for the providers' cheap local filter) and survives + cache eviction, so a track can stay "recently played" even after its file is + deleted. - ``cache_files`` tracks downloaded files so we can keep only the N most recently *played* ones (LRU retention). Files not yet played are never evicted. +- ``canonical_cache`` memoizes ``(artist, title) -> MBID`` lookups (a NULL mbid + means a confirmed no-match) so the Canonicalizer hits the network at most once + per distinct track. """ import sqlite3 @@ -21,6 +25,7 @@ _SCHEMA = """ CREATE TABLE IF NOT EXISTS history ( id INTEGER PRIMARY KEY, track_key TEXT NOT NULL, + locator TEXT, artist TEXT, title TEXT, origin TEXT, @@ -33,6 +38,14 @@ CREATE TABLE IF NOT EXISTS cache_files ( track_key TEXT, played_at REAL -- NULL until the file has been played ); + +CREATE TABLE IF NOT EXISTS canonical_cache ( + artist_norm TEXT NOT NULL, + title_norm TEXT NOT NULL, + mbid TEXT, -- NULL means a confirmed no-match + resolved_at REAL NOT NULL, + PRIMARY KEY (artist_norm, title_norm) +); """ @@ -45,6 +58,16 @@ class Database: ) self._conn.row_factory = sqlite3.Row self._conn.executescript(_SCHEMA) + self._migrate() + + def _migrate(self) -> None: + # Add columns introduced after a DB may have been created (milestone 5). + cols = { + r["name"] + for r in self._conn.execute("PRAGMA table_info(history)").fetchall() + } + if "locator" not in cols: + self._conn.execute("ALTER TABLE history ADD COLUMN locator TEXT") # --- anti-repeat / history ------------------------------------------- @@ -56,12 +79,56 @@ class Database: ).fetchall() return {r["track_key"] for r in rows} + def recent_locators(self, limit: int) -> set[str]: + """Raw backend locators recently played (providers' cheap local filter).""" + with self._lock: + rows = self._conn.execute( + "SELECT locator FROM history WHERE locator IS NOT NULL" + " ORDER BY played_at DESC LIMIT ?", + (limit,), + ).fetchall() + return {r["locator"] for r in rows} + def record_play(self, track: Track) -> None: with self._lock: self._conn.execute( - "INSERT INTO history (track_key, artist, title, origin, played_at)" - " VALUES (?, ?, ?, ?, ?)", - (track.key, track.artist, track.title, track.origin, time.time()), + "INSERT INTO history" + " (track_key, locator, artist, title, origin, played_at)" + " VALUES (?, ?, ?, ?, ?, ?)", + ( + track.key, + track.locator, + track.artist, + track.title, + track.origin, + time.time(), + ), + ) + + # --- MBID canonical cache -------------------------------------------- + + def get_canonical(self, artist_norm: str, title_norm: str): + """Return (cached: bool, mbid: str | None). ``cached`` False means the + pair was never looked up; a cached row with mbid None is a known miss.""" + with self._lock: + row = self._conn.execute( + "SELECT mbid FROM canonical_cache" + " WHERE artist_norm = ? AND title_norm = ?", + (artist_norm, title_norm), + ).fetchone() + if row is None: + return False, None + return True, row["mbid"] + + def put_canonical( + self, artist_norm: str, title_norm: str, mbid: str | None + ) -> None: + with self._lock: + self._conn.execute( + "INSERT OR REPLACE INTO canonical_cache" + " (artist_norm, title_norm, mbid, resolved_at)" + " VALUES (?, ?, ?, ?)", + (artist_norm, title_norm, mbid, time.time()), ) # --- cache-file retention -------------------------------------------- diff --git a/ingest/radieo/models.py b/ingest/radieo/models.py index 3ffb930..c9d26fb 100644 --- a/ingest/radieo/models.py +++ b/ingest/radieo/models.py @@ -6,8 +6,18 @@ it into a local file; the queue and the state database use ``key`` for de-duplication and anti-repeat. """ +import re from dataclasses import dataclass +_WS = re.compile(r"\s+") + + +def norm_name(value: str) -> str: + """Normalize an artist/title for stable keying and cache lookups: + case-fold and collapse whitespace. Kept deliberately light (no accent + stripping) so distinct titles never collapse together.""" + return _WS.sub(" ", value.strip()).casefold() + @dataclass(frozen=True) class Track: @@ -22,14 +32,16 @@ class Track: @property def key(self) -> str: - """Stable identity for de-duplication and anti-repeat. + """Stable, source-agnostic identity for de-duplication and anti-repeat. - Until the Canonicalizer (milestone 5) fills ``mbid``, we key on the - backend locator, which is unique within a source. + The Canonicalizer fills ``mbid`` when it can, giving a truly + cross-source identity. When it can't, we fall back to the normalized + ``(artist, title)`` — still source-agnostic, so the same track fetched + from two backends collapses to one key. """ if self.mbid: return f"mbid:{self.mbid}" - return f"{self.backend}:{self.locator}" + return f"name:{norm_name(self.artist)}|{norm_name(self.title)}" def __str__(self) -> str: return f"{self.artist} — {self.title} [{self.origin}]" diff --git a/ingest/radieo/providers/navidrome.py b/ingest/radieo/providers/navidrome.py index 7b433d7..8a3fa0a 100644 --- a/ingest/radieo/providers/navidrome.py +++ b/ingest/radieo/providers/navidrome.py @@ -1,9 +1,10 @@ """NavidromeProvider: picks tracks from an OpenSubsonic playlist. Emits ``subsonic`` tracks (locator = song id). The playlist is cached in -memory and refreshed periodically. Anti-repeat is applied by filtering out -tracks whose key is among the recently played ones; if that empties the pool -(short playlist), the filter is dropped so playback never stalls. +memory and refreshed periodically. A cheap local anti-repeat filters out songs +whose id was played recently; if that empties the pool (short playlist), the +filter is dropped so playback never stalls. The authoritative, source-agnostic +anti-repeat lives in the Scheduler (on the canonical key). """ import logging @@ -53,9 +54,9 @@ class NavidromeProvider: if not self._songs: return None - recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW) + recent = self._db.recent_locators(config.ANTIREPEAT_WINDOW) candidates = [ - s for s in self._songs if f"subsonic:{s['id']}" not in recent + s for s in self._songs if str(s["id"]) not in recent ] or self._songs song = random.choice(candidates) return Track( diff --git a/ingest/radieo/providers/ytdlp.py b/ingest/radieo/providers/ytdlp.py index 8cdff38..6ebca85 100644 --- a/ingest/radieo/providers/ytdlp.py +++ b/ingest/radieo/providers/ytdlp.py @@ -63,7 +63,7 @@ class YtdlpProvider: self._load_urls() if not self._urls: return None - recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW) + recent = self._db.recent_locators(config.ANTIREPEAT_WINDOW) # Try source lines in random order until one yields a usable track. candidates = list(self._urls) random.shuffle(candidates) @@ -81,7 +81,7 @@ class YtdlpProvider: return None if not entries: return None - pool = [e for e in entries if f"ytdlp:{e['url']}" not in recent] or entries + pool = [e for e in entries if e["url"] not in recent] or entries entry = random.choice(pool) return Track( backend="ytdlp", diff --git a/ingest/radieo/scheduler.py b/ingest/radieo/scheduler.py index 52ddec5..a3d2c17 100644 --- a/ingest/radieo/scheduler.py +++ b/ingest/radieo/scheduler.py @@ -1,10 +1,15 @@ -"""Weighted scheduler mixing several providers. +"""Weighted scheduler mixing several providers, with canonical anti-repeat. -Picks a provider at random, weighted by ``SOURCE_WEIGHTS``, and asks it for a -track. If the chosen provider has nothing right now (empty list, unreachable -source…), the remaining providers are tried in weighted-random order, so a -temporarily-idle source never stalls playback. Returns ``None`` only when every -provider is exhausted. +Two responsibilities: + +- **Source mix**: pick a provider at random, weighted by ``SOURCE_WEIGHTS``. If + the chosen one has nothing right now, try the rest in weighted-random order, + so a temporarily-idle source never stalls playback. +- **Authoritative anti-repeat**: canonicalize the picked track (fill its MBID, + cached/best-effort) and reject it if its canonical key was played recently, + retrying a bounded number of times. This is source-agnostic: the same track + coming from two backends collapses to one key. Providers still apply a cheap + locator-based filter first, which keeps the number of retries low. The weights live in one place (``config.SOURCE_WEIGHTS``) so the mix can later move to a config file without touching this logic. @@ -13,15 +18,37 @@ move to a config file without touching this logic. import logging import random +from . import config + log = logging.getLogger("radieo.scheduler") class Scheduler: - def __init__(self, entries): + def __init__(self, entries, canonicalizer, db): # entries: list[(provider, weight)]; drop non-positive weights. self._entries = [(p, w) for p, w in entries if w > 0] + self._canonicalizer = canonicalizer + self._db = db def next(self): + if not self._entries: + return None + recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW) + last = None + for _ in range(config.SCHEDULER_MAX_TRIES): + track = self._pick() + if track is None: + return None # no source has anything right now + track = self._canonicalizer.canonicalize(track) + if track.key not in recent: + return track + last = track # recently played; try another + log.debug("skipping recent %s", track) + # Everything drawn was recent (e.g. tiny library): play the last anyway. + return last + + def _pick(self): + """Weighted provider draw, falling through to the others when empty.""" pool = list(self._entries) while pool: weights = [w for _, w in pool]