diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..bf5fc43 --- /dev/null +++ b/.env.example @@ -0,0 +1,15 @@ +# radieo — configuration locale. Copier en `.env` et remplir. +# docker compose lit automatiquement `.env` pour ces variables. + +# --- Source Navidrome / OpenSubsonic --- +# URL de base de ton serveur (sans /rest). Laisser les champs vides désactive +# la source : le stream joue alors uniquement les fichiers déjà dans cache/. +RADIEO_NAVIDROME_URL=https://navidrome.example.org +RADIEO_NAVIDROME_USER=monuser +RADIEO_NAVIDROME_PASSWORD=monmotdepasse +# Nom OU identifiant de la playlist à diffuser. +RADIEO_NAVIDROME_PLAYLIST=Radio + +# --- Rétention du cache (optionnel) --- +# Nombre de morceaux joués conservés sur disque avant éviction (LRU). +RADIEO_RETENTION_KEEP=20 diff --git a/.gitignore b/.gitignore index 71b799e..b58bc4d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,12 +2,16 @@ /cache/* !/cache/.gitkeep -# État de l'ingestion (jalons suivants) +# Secrets locaux +.env + +# État de l'ingestion (base SQLite persistante) +/state/ *.db *.db-journal *.db-wal -# Python (jalons suivants) +# Python __pycache__/ *.pyc .venv/ diff --git a/README.md b/README.md index 5d71679..795d8f8 100644 --- a/README.md +++ b/README.md @@ -46,29 +46,43 @@ The stream is MP3 at 192 kbps. Multiple clients can listen at the same time. New files dropped into `cache/` are picked up automatically (the playlist is reloaded when the directory changes). +## Configuration + +Copy `.env.example` to `.env` and fill in your Navidrome details: + +```sh +cp .env.example .env +# edit .env: RADIEO_NAVIDROME_URL / USER / PASSWORD / PLAYLIST +``` + +If the Navidrome variables are left empty, the source is simply disabled and +the stream plays whatever is already in `cache/` (the milestone-1/2 behaviour). + ## Current status -**Milestone 2 — ingestion daemon: done.** +**Milestone 3 — Navidrome provider: done.** -- `ingest` (Python) container exposes `GET /next`, returning the next track as - an annotated Liquidsoap URI (or an empty body when nothing is ready). -- `stream` (Liquidsoap v2.4.5) pulls from `ingest` via a `request.dynamic` - source, and falls back to the local `cache/` directory when the daemon has - nothing to offer. -- HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps). -- Continuous output guaranteed: silence rather than a crash when everything is - empty (`mksafe`). -- Multiple simultaneous listeners supported. +- `ingest` pulls tracks from an OpenSubsonic playlist (Navidrome), downloading + them into the shared cache ahead of playback (prefetch buffer). +- Play history and LRU retention are tracked in a SQLite database under + `state/`: only the N most recently played files are kept on disk + (`RADIEO_RETENTION_KEEP`, default 20); anti-repeat avoids replaying a track + seen among the last plays. +- `GET /next` returns the next track as an annotated Liquidsoap URI with real + title/artist metadata (or an empty body when nothing is ready). +- `stream` (Liquidsoap v2.4.5) pulls via `request.dynamic` and falls back to the + local `cache/` directory; `mksafe` guarantees silence rather than a crash. +- HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps), + multiple simultaneous listeners supported. -At this stage the daemon just cycles through the files already in `cache/`; the -download providers (Navidrome, yt-dlp, ListenBrainz) come next. +The yt-dlp and ListenBrainz sources come next. ## Roadmap 1. ✅ **Broadcasting skeleton** — Liquidsoap serving the cache directory. 2. ✅ **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap switches to a `request.dynamic` source with the cache as fallback. -3. **Navidrome provider** — play from an OpenSubsonic playlist, with caching, +3. ✅ **Navidrome provider** — play from an OpenSubsonic playlist, with caching, LRU retention and play history. 4. **yt-dlp provider** — fetch tracks from a maintained URL/artist list; weighted mixing between sources. diff --git a/docker-compose.yml b/docker-compose.yml index abc4625..de177e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,10 +3,19 @@ services: build: ./ingest image: radieo-ingest volumes: - - ./cache:/cache # volume partagé avec le stream + - ./cache:/cache # volume partagé avec le stream (rw : téléchargements) + - ./state:/state # état persistant (SQLite) hors du cache éphémère environment: - RADIEO_CACHE_DIR=/cache + - RADIEO_STATE_DIR=/state - RADIEO_HTTP_PORT=8080 + # Source Navidrome / OpenSubsonic (voir .env / .env.example). + # Laisser vide désactive la source : le stream joue alors son cache local. + - RADIEO_NAVIDROME_URL=${RADIEO_NAVIDROME_URL:-} + - RADIEO_NAVIDROME_USER=${RADIEO_NAVIDROME_USER:-} + - RADIEO_NAVIDROME_PASSWORD=${RADIEO_NAVIDROME_PASSWORD:-} + - RADIEO_NAVIDROME_PLAYLIST=${RADIEO_NAVIDROME_PLAYLIST:-} + - RADIEO_RETENTION_KEEP=${RADIEO_RETENTION_KEEP:-20} restart: unless-stopped stream: diff --git a/ingest/Dockerfile b/ingest/Dockerfile index 69a8f6f..8798513 100644 --- a/ingest/Dockerfile +++ b/ingest/Dockerfile @@ -2,8 +2,9 @@ FROM python:3.12-slim WORKDIR /app -# Milestone 2 uses the standard library only; third-party dependencies -# (httpx, yt-dlp, feedparser…) will be added in later milestones. +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + COPY radieo/ ./radieo/ ENV PYTHONUNBUFFERED=1 diff --git a/ingest/radieo/__main__.py b/ingest/radieo/__main__.py index a7e8840..ad6fd08 100644 --- a/ingest/radieo/__main__.py +++ b/ingest/radieo/__main__.py @@ -1,34 +1,76 @@ -"""Entry point: start the HTTP API serving the track queue.""" +"""Entry point: wire providers/fetchers into the queue and serve GET /next.""" import logging from . import config from .api import IngestServer +from .db import Database from .queue import TrackQueue +log = logging.getLogger("radieo") + + +class _NullProvider: + """Fallback provider when no source is configured: never yields a track.""" + + def next(self): + return None + + +def _build_pipeline(db: Database): + """Return (provider, fetcher). Falls back to a no-op provider when the + Navidrome source is not configured, so the daemon still runs and the + stream plays its local cache.""" + if not config.NAVIDROME_ENABLED: + log.warning( + "Navidrome not configured (RADIEO_NAVIDROME_*): no source active, " + "the stream will play its local cache fallback." + ) + return _NullProvider(), None + + from .fetchers.subsonic import SubsonicFetcher + from .providers.navidrome import NavidromeProvider + from .subsonic import SubsonicClient + + client = SubsonicClient( + config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD + ) + provider = NavidromeProvider(client, config.NAVIDROME_PLAYLIST, db) + fetcher = SubsonicFetcher(client, config.CACHE_DIR) + log.info("Navidrome source enabled (playlist=%r)", config.NAVIDROME_PLAYLIST) + return provider, fetcher + def main() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) - log = logging.getLogger("radieo") config.CACHE_DIR.mkdir(parents=True, exist_ok=True) + config.STATE_DIR.mkdir(parents=True, exist_ok=True) - server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), TrackQueue()) + db = Database(config.STATE_DB) + provider, fetcher = _build_pipeline(db) + queue = TrackQueue(provider, fetcher, db) + queue.start() + + server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue) log.info( - "ingest listening on %s:%d (cache=%s)", + "ingest listening on %s:%d (cache=%s, state=%s)", config.HTTP_HOST, config.HTTP_PORT, config.CACHE_DIR, + config.STATE_DB, ) try: server.serve_forever() except KeyboardInterrupt: pass finally: + queue.stop() server.server_close() + db.close() if __name__ == "__main__": diff --git a/ingest/radieo/api.py b/ingest/radieo/api.py index dfb2743..1209dd2 100644 --- a/ingest/radieo/api.py +++ b/ingest/radieo/api.py @@ -10,24 +10,22 @@ import logging from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path +from .models import Track from .queue import TrackQueue log = logging.getLogger("radieo.api") -def annotate_uri(path: Path) -> str: - """Build an annotated Liquidsoap request URI for a cache file. - - Metadata is minimal for now (title derived from the filename); real - metadata will come from the providers in later milestones. - """ +def annotate_uri(path: Path, track: Track) -> str: + """Build an annotated Liquidsoap request URI for a cache file.""" def esc(value: str) -> str: return value.replace("\\", "\\\\").replace('"', '\\"') - title = esc(path.stem) - artist = esc("radieo") - return f'annotate:title="{title}",artist="{artist}":{path}' + return ( + f'annotate:title="{esc(track.title)}",artist="{esc(track.artist)}"' + f":{path}" + ) class IngestServer(ThreadingHTTPServer): @@ -48,13 +46,14 @@ class _Handler(BaseHTTPRequestHandler): self._text(404, "not found\n") def _serve_next(self): - track = self.server.queue.pop_next() - if track is None: + result = self.server.queue.pop_next() + if result is None: # Empty body: tells Liquidsoap to use its fallback for now. self._text(200, "") return - log.info("next -> %s", track.name) - self._text(200, annotate_uri(track) + "\n") + path, track = result + log.info("next -> %s", track) + self._text(200, annotate_uri(path, track) + "\n") def _text(self, code: int, body: str): data = body.encode("utf-8") diff --git a/ingest/radieo/config.py b/ingest/radieo/config.py index a095658..0dc8f4f 100644 --- a/ingest/radieo/config.py +++ b/ingest/radieo/config.py @@ -11,8 +11,33 @@ from pathlib import Path # be valid inside *that* container, so both mount the cache at the same path. CACHE_DIR = Path(os.environ.get("RADIEO_CACHE_DIR", "/cache")) +# Persistent state (SQLite). Kept out of the ephemeral cache directory. +STATE_DIR = Path(os.environ.get("RADIEO_STATE_DIR", "/state")) +STATE_DB = STATE_DIR / "radieo.db" + HTTP_HOST = os.environ.get("RADIEO_HTTP_HOST", "0.0.0.0") HTTP_PORT = int(os.environ.get("RADIEO_HTTP_PORT", "8080")) -# File extensions considered playable when scanning the cache. -AUDIO_EXTENSIONS = {".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"} +# --- Prefetching / retention --- +# How many downloaded tracks to keep ready ahead of playback. +PREFETCH = int(os.environ.get("RADIEO_PREFETCH", "3")) +# Seconds between prefetch-loop wake-ups. +PREFETCH_INTERVAL = float(os.environ.get("RADIEO_PREFETCH_INTERVAL", "2.0")) +# Keep the N most recently played files on disk; evict older ones (LRU). +RETENTION_KEEP = int(os.environ.get("RADIEO_RETENTION_KEEP", "20")) +# Do not replay a track seen among the last N plays, when avoidable. +ANTIREPEAT_WINDOW = int(os.environ.get("RADIEO_ANTIREPEAT_WINDOW", "50")) + +# --- Navidrome / OpenSubsonic source --- +# Left empty means the provider is disabled (the stream then plays its own +# local-cache fallback). Credentials are expected to come from a .env file. +NAVIDROME_URL = os.environ.get("RADIEO_NAVIDROME_URL", "").strip() +NAVIDROME_USER = os.environ.get("RADIEO_NAVIDROME_USER", "").strip() +NAVIDROME_PASSWORD = os.environ.get("RADIEO_NAVIDROME_PASSWORD", "") +NAVIDROME_PLAYLIST = os.environ.get("RADIEO_NAVIDROME_PLAYLIST", "").strip() +# How often to reload the playlist contents, in seconds. +PLAYLIST_REFRESH = float(os.environ.get("RADIEO_PLAYLIST_REFRESH", "300")) + +NAVIDROME_ENABLED = bool( + NAVIDROME_URL and NAVIDROME_USER and NAVIDROME_PASSWORD and NAVIDROME_PLAYLIST +) diff --git a/ingest/radieo/db.py b/ingest/radieo/db.py new file mode 100644 index 0000000..c9aff9c --- /dev/null +++ b/ingest/radieo/db.py @@ -0,0 +1,109 @@ +"""SQLite state: play history (anti-repeat + stats) and cache-file retention. + +Two concerns, two tables: + +- ``history`` is append-only. It drives anti-repeat (recently played track + keys) and survives cache eviction, so a track can stay "recently played" even + after its file is deleted. +- ``cache_files`` tracks downloaded files so we can keep only the N most + recently *played* ones (LRU retention). Files not yet played are never + evicted. +""" + +import sqlite3 +import threading +import time +from pathlib import Path + +from .models import Track + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS history ( + id INTEGER PRIMARY KEY, + track_key TEXT NOT NULL, + artist TEXT, + title TEXT, + origin TEXT, + played_at REAL NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_history_played_at ON history(played_at); + +CREATE TABLE IF NOT EXISTS cache_files ( + path TEXT PRIMARY KEY, + track_key TEXT, + played_at REAL -- NULL until the file has been played +); +""" + + +class Database: + def __init__(self, path: Path): + path.parent.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() + self._conn = sqlite3.connect( + path, check_same_thread=False, isolation_level=None + ) + self._conn.row_factory = sqlite3.Row + self._conn.executescript(_SCHEMA) + + # --- anti-repeat / history ------------------------------------------- + + def recent_keys(self, limit: int) -> set[str]: + with self._lock: + rows = self._conn.execute( + "SELECT track_key FROM history ORDER BY played_at DESC LIMIT ?", + (limit,), + ).fetchall() + return {r["track_key"] for r in rows} + + def record_play(self, track: Track) -> None: + with self._lock: + self._conn.execute( + "INSERT INTO history (track_key, artist, title, origin, played_at)" + " VALUES (?, ?, ?, ?, ?)", + (track.key, track.artist, track.title, track.origin, time.time()), + ) + + # --- cache-file retention -------------------------------------------- + + def register_download(self, path: str, track_key: str) -> None: + with self._lock: + self._conn.execute( + "INSERT OR REPLACE INTO cache_files (path, track_key, played_at)" + " VALUES (?, ?, NULL)", + (path, track_key), + ) + + def mark_played(self, path: str) -> None: + with self._lock: + self._conn.execute( + "UPDATE cache_files SET played_at = ? WHERE path = ?", + (time.time(), path), + ) + + def evict(self, keep: int) -> list[str]: + """Return (and forget) played files beyond the ``keep`` most recent. + + The caller is responsible for deleting the returned files from disk. + Never touches files that have not been played yet. + """ + with self._lock: + rows = self._conn.execute( + "SELECT path FROM cache_files WHERE played_at IS NOT NULL" + " AND path NOT IN (" + " SELECT path FROM cache_files WHERE played_at IS NOT NULL" + " ORDER BY played_at DESC LIMIT ?" + ")", + (keep,), + ).fetchall() + paths = [r["path"] for r in rows] + if paths: + self._conn.executemany( + "DELETE FROM cache_files WHERE path = ?", + [(p,) for p in paths], + ) + return paths + + def close(self) -> None: + with self._lock: + self._conn.close() diff --git a/ingest/radieo/fetchers/__init__.py b/ingest/radieo/fetchers/__init__.py new file mode 100644 index 0000000..d3d5089 --- /dev/null +++ b/ingest/radieo/fetchers/__init__.py @@ -0,0 +1 @@ +"""Fetchers turn a ``Track`` into a local file in the cache directory.""" diff --git a/ingest/radieo/fetchers/subsonic.py b/ingest/radieo/fetchers/subsonic.py new file mode 100644 index 0000000..52e56fb --- /dev/null +++ b/ingest/radieo/fetchers/subsonic.py @@ -0,0 +1,46 @@ +"""SubsonicFetcher: download a ``subsonic`` track into the cache directory. + +Files are downloaded to a hidden, non-audio temporary name and then atomically +renamed into place, so the stream container never sees a partial file through +its shared read-only cache mount. +""" + +import logging +import re +from pathlib import Path +from uuid import uuid4 + +from ..models import Track +from ..subsonic import SubsonicClient + +log = logging.getLogger("radieo.fetcher.subsonic") + +_SAFE = re.compile(r"[^A-Za-z0-9._-]") + + +class SubsonicFetcher: + backend = "subsonic" + + def __init__(self, client: SubsonicClient, cache_dir: Path): + self._client = client + self._cache_dir = cache_dir + + def fetch(self, track: Track) -> Path: + stem = f"subsonic-{_SAFE.sub('_', track.locator)}" + # Reuse a still-cached copy rather than downloading again. + for existing in self._cache_dir.glob(f"{stem}.*"): + if not existing.name.endswith(".part"): + return existing + + tmp = self._cache_dir / f".{uuid4().hex}.part" + try: + ext = self._client.download( + track.locator, tmp, hint_ext=track.source_ext + ) + dest = self._cache_dir / f"{stem}{ext}" + tmp.replace(dest) + except BaseException: + tmp.unlink(missing_ok=True) + raise + log.info("downloaded %s -> %s", track, dest.name) + return dest diff --git a/ingest/radieo/models.py b/ingest/radieo/models.py new file mode 100644 index 0000000..f2aaf8e --- /dev/null +++ b/ingest/radieo/models.py @@ -0,0 +1,34 @@ +"""Shared data model. + +A ``Track`` is the uniform object every provider emits: a *resolved* reference +(which backend can download it, and where) plus display metadata. Fetchers turn +it into a local file; the queue and the state database use ``key`` for +de-duplication and anti-repeat. +""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Track: + backend: str # which fetcher handles it: "subsonic" | "ytdlp" + locator: str # backend-specific: Subsonic song id, or a media URL + artist: str + title: str + origin: str # provider that produced it, e.g. "navidrome" + mbid: str | None = None # filled by the Canonicalizer (milestone 5) + source_ext: str | None = None # filename hint, e.g. "mp3", "flac" + + @property + def key(self) -> str: + """Stable identity for de-duplication and anti-repeat. + + Until the Canonicalizer (milestone 5) fills ``mbid``, we key on the + backend locator, which is unique within a source. + """ + if self.mbid: + return f"mbid:{self.mbid}" + return f"{self.backend}:{self.locator}" + + def __str__(self) -> str: + return f"{self.artist} — {self.title} [{self.origin}]" diff --git a/ingest/radieo/providers/__init__.py b/ingest/radieo/providers/__init__.py new file mode 100644 index 0000000..5413ab4 --- /dev/null +++ b/ingest/radieo/providers/__init__.py @@ -0,0 +1 @@ +"""Providers decide *what* to play, emitting resolved ``Track`` objects.""" diff --git a/ingest/radieo/providers/navidrome.py b/ingest/radieo/providers/navidrome.py new file mode 100644 index 0000000..7b433d7 --- /dev/null +++ b/ingest/radieo/providers/navidrome.py @@ -0,0 +1,68 @@ +"""NavidromeProvider: picks tracks from an OpenSubsonic playlist. + +Emits ``subsonic`` tracks (locator = song id). The playlist is cached in +memory and refreshed periodically. Anti-repeat is applied by filtering out +tracks whose key is among the recently played ones; if that empties the pool +(short playlist), the filter is dropped so playback never stalls. +""" + +import logging +import random +import time + +import httpx + +from .. import config +from ..db import Database +from ..models import Track +from ..subsonic import SubsonicClient, SubsonicError + +log = logging.getLogger("radieo.provider.navidrome") + + +class NavidromeProvider: + name = "navidrome" + + def __init__(self, client: SubsonicClient, playlist_ref: str, db: Database): + self._client = client + self._playlist_ref = playlist_ref + self._db = db + self._playlist_id: str | None = None + self._songs: list[dict] = [] + self._loaded_at = 0.0 + + def _ensure_songs(self) -> None: + now = time.time() + if self._songs and now - self._loaded_at < config.PLAYLIST_REFRESH: + return + if self._playlist_id is None: + self._playlist_id = self._client.resolve_playlist_id( + self._playlist_ref + ) + songs = self._client.get_playlist_songs(self._playlist_id) + self._songs = songs + self._loaded_at = now + log.info("loaded %d songs from playlist %r", len(songs), self._playlist_ref) + + def next(self) -> Track | None: + try: + self._ensure_songs() + except (SubsonicError, httpx.HTTPError, OSError) as exc: + log.warning("could not load playlist: %s", exc) + return None + if not self._songs: + return None + + recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW) + candidates = [ + s for s in self._songs if f"subsonic:{s['id']}" not in recent + ] or self._songs + song = random.choice(candidates) + return Track( + backend="subsonic", + locator=str(song["id"]), + artist=song.get("artist", "Unknown artist"), + title=song.get("title", str(song["id"])), + origin=self.name, + source_ext=song.get("suffix"), + ) diff --git a/ingest/radieo/queue.py b/ingest/radieo/queue.py index ac182d6..657c986 100644 --- a/ingest/radieo/queue.py +++ b/ingest/radieo/queue.py @@ -1,50 +1,89 @@ -"""Queue of tracks ready to be served to the stream layer. +"""Prefetching pipeline feeding ready-to-play tracks to the stream layer. -Milestone 2: a track is simply an audio file already present in the cache -directory. When the queue drains, it is refilled with a fresh shuffle of the -available files. Later milestones will replace the refill logic with providers -that download tracks (Navidrome, yt-dlp, …) before enqueuing them. +A background thread keeps a small buffer of downloaded tracks: it asks the +provider what to play next, has the matching fetcher download it into the +cache, and enqueues the resulting file. ``pop_next`` hands the oldest ready +track to the HTTP API, records the play and runs LRU retention. + +If the provider has nothing (e.g. Navidrome not configured, or unreachable), +the buffer simply stays empty and ``pop_next`` returns ``None`` — the stream +then plays its own local-cache fallback. """ -import random +import logging import threading from collections import deque from pathlib import Path from . import config +from .db import Database +from .models import Track + +log = logging.getLogger("radieo.queue") class TrackQueue: - def __init__(self): + def __init__(self, provider, fetcher, db: Database): + self._provider = provider + self._fetcher = fetcher + self._db = db self._lock = threading.Lock() - self._upcoming: deque[Path] = deque() - self._last_served: Path | None = None - - def _available_files(self) -> list[Path]: - if not config.CACHE_DIR.is_dir(): - return [] - return sorted( - p - for p in config.CACHE_DIR.iterdir() - if p.is_file() and p.suffix.lower() in config.AUDIO_EXTENSIONS + self._ready: deque[tuple[Path, Track]] = deque() + self._stop = threading.Event() + self._thread = threading.Thread( + target=self._run, name="prefetch", daemon=True ) - def _refill_locked(self) -> None: - files = self._available_files() - if not files: - return - # Avoid replaying the last served track back-to-back when we can. - pool = [f for f in files if f != self._last_served] or files - random.shuffle(pool) - self._upcoming.extend(pool) + def start(self) -> None: + self._thread.start() - def pop_next(self) -> Path | None: - """Return the next track to play, or None if the cache is empty.""" + def stop(self) -> None: + self._stop.set() + + # --- background prefetching ------------------------------------------ + + def _run(self) -> None: + while not self._stop.is_set(): + try: + self._prefetch() + except Exception: # never let the loop die + log.exception("prefetch loop error") + self._stop.wait(config.PREFETCH_INTERVAL) + + def _prefetch(self) -> None: with self._lock: - if not self._upcoming: - self._refill_locked() - if not self._upcoming: + missing = config.PREFETCH - len(self._ready) + for _ in range(max(0, missing)): + if self._stop.is_set(): + return + track = self._provider.next() + if track is None: + return # nothing to fetch right now + try: + path = self._fetcher.fetch(track) + except Exception: + log.exception("fetch failed for %s", track) + continue + self._db.register_download(str(path), track.key) + with self._lock: + self._ready.append((path, track)) + + # --- serving ---------------------------------------------------------- + + def pop_next(self) -> tuple[Path, Track] | None: + with self._lock: + if not self._ready: return None - track = self._upcoming.popleft() - self._last_served = track - return track + path, track = self._ready.popleft() + self._db.mark_played(str(path)) + self._db.record_play(track) + self._evict() + return path, track + + def _evict(self) -> None: + for path in self._db.evict(config.RETENTION_KEEP): + try: + Path(path).unlink(missing_ok=True) + log.info("evicted %s", Path(path).name) + except OSError: + log.exception("could not evict %s", path) diff --git a/ingest/radieo/subsonic.py b/ingest/radieo/subsonic.py new file mode 100644 index 0000000..b5365ae --- /dev/null +++ b/ingest/radieo/subsonic.py @@ -0,0 +1,115 @@ +"""Minimal OpenSubsonic client (enough for Navidrome playback). + +Uses salted-token authentication (``t = md5(password + salt)``), the scheme +recommended by the Subsonic API since 1.13.0 and supported by Navidrome. +""" + +import hashlib +import logging +import secrets +from pathlib import Path + +import httpx + +log = logging.getLogger("radieo.subsonic") + +# Advertised API version and client name. +_API_VERSION = "1.16.1" +_CLIENT = "radieo" + +# Content-Type -> file extension, used to name downloaded files. +_CTYPE_EXT = { + "audio/mpeg": ".mp3", + "audio/mp3": ".mp3", + "audio/flac": ".flac", + "audio/x-flac": ".flac", + "audio/ogg": ".ogg", + "application/ogg": ".ogg", + "audio/opus": ".opus", + "audio/mp4": ".m4a", + "audio/x-m4a": ".m4a", + "audio/aac": ".aac", + "audio/wav": ".wav", + "audio/x-wav": ".wav", +} + + +class SubsonicError(Exception): + pass + + +class SubsonicClient: + def __init__(self, base_url: str, user: str, password: str): + self._base = base_url.rstrip("/") + self._user = user + self._password = password + self._http = httpx.Client(timeout=30.0, follow_redirects=True) + + def _auth_params(self) -> dict[str, str]: + salt = secrets.token_hex(8) + token = hashlib.md5((self._password + salt).encode()).hexdigest() + return { + "u": self._user, + "t": token, + "s": salt, + "v": _API_VERSION, + "c": _CLIENT, + "f": "json", + } + + def _get_json(self, view: str, **params) -> dict: + url = f"{self._base}/rest/{view}" + resp = self._http.get(url, params={**self._auth_params(), **params}) + resp.raise_for_status() + body = resp.json()["subsonic-response"] + if body.get("status") != "ok": + err = body.get("error", {}) + raise SubsonicError( + f"{view}: {err.get('code')} {err.get('message')}" + ) + return body + + def ping(self) -> None: + self._get_json("ping") + + def resolve_playlist_id(self, name_or_id: str) -> str: + """Accept either a playlist id or a playlist name.""" + body = self._get_json("getPlaylists") + playlists = body.get("playlists", {}).get("playlist", []) + for pl in playlists: + if pl.get("id") == name_or_id or pl.get("name") == name_or_id: + return pl["id"] + raise SubsonicError(f"playlist not found: {name_or_id!r}") + + def get_playlist_songs(self, playlist_id: str) -> list[dict]: + body = self._get_json("getPlaylist", id=playlist_id) + return body.get("playlist", {}).get("entry", []) + + def download(self, song_id: str, dest: Path, hint_ext: str | None = None) -> str: + """Download a song to ``dest``; return the file extension used. + + ``format=raw`` asks Navidrome for the original file (no transcoding), + keeping quality and letting Liquidsoap decode it. + """ + params = {**self._auth_params(), "id": song_id, "format": "raw"} + with self._http.stream( + "GET", f"{self._base}/rest/stream", params=params + ) as resp: + resp.raise_for_status() + ctype = resp.headers.get("content-type", "").split(";")[0].strip() + if ctype.startswith(("application/json", "text/xml")): + raise SubsonicError( + f"stream {song_id}: error response {resp.read()[:200]!r}" + ) + ext = _CTYPE_EXT.get(ctype) + if ext is None and hint_ext: + ext = "." + hint_ext.lstrip(".") + if ext is None: + ext = ".mp3" + with open(dest, "wb") as fh: + for chunk in resp.iter_bytes(chunk_size=65536): + fh.write(chunk) + return ext + + def close(self) -> None: + self._http.close() diff --git a/ingest/requirements.txt b/ingest/requirements.txt new file mode 100644 index 0000000..6ecf620 --- /dev/null +++ b/ingest/requirements.txt @@ -0,0 +1 @@ +httpx>=0.27