Milestone 3: Navidrome (OpenSubsonic) playback provider
Replace the directory-scan queue with a real ingestion pipeline: provider -> fetcher -> cache -> ready queue, driven by a background prefetch thread. - subsonic.py: minimal OpenSubsonic client (salted-token auth, getPlaylists/getPlaylist, raw streaming download). - providers/navidrome.py: pick tracks from a playlist (by name or id), with anti-repeat and periodic playlist reload. - fetchers/subsonic.py: atomic download into the shared cache. - db.py: SQLite state — append-only play history (anti-repeat + stats) and cache_files LRU retention (keep the N most recently played). - queue.py: prefetch buffer + retention on play; graceful degradation to the stream's local-cache fallback when no source is configured. - api.py: GET /next now carries real title/artist metadata. - Config via .env (Navidrome credentials), persistent state/ volume, httpx dependency. Verified end-to-end against a live Navidrome: playlist resolved, tracks downloaded and broadcast, retention and history correct. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
f8eb0655eb
commit
8c27498632
17 changed files with 594 additions and 71 deletions
15
.env.example
Normal file
15
.env.example
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
# radieo — configuration locale. Copier en `.env` et remplir.
|
||||||
|
# docker compose lit automatiquement `.env` pour ces variables.
|
||||||
|
|
||||||
|
# --- Source Navidrome / OpenSubsonic ---
|
||||||
|
# URL de base de ton serveur (sans /rest). Laisser les champs vides désactive
|
||||||
|
# la source : le stream joue alors uniquement les fichiers déjà dans cache/.
|
||||||
|
RADIEO_NAVIDROME_URL=https://navidrome.example.org
|
||||||
|
RADIEO_NAVIDROME_USER=monuser
|
||||||
|
RADIEO_NAVIDROME_PASSWORD=monmotdepasse
|
||||||
|
# Nom OU identifiant de la playlist à diffuser.
|
||||||
|
RADIEO_NAVIDROME_PLAYLIST=Radio
|
||||||
|
|
||||||
|
# --- Rétention du cache (optionnel) ---
|
||||||
|
# Nombre de morceaux joués conservés sur disque avant éviction (LRU).
|
||||||
|
RADIEO_RETENTION_KEEP=20
|
||||||
8
.gitignore
vendored
8
.gitignore
vendored
|
|
@ -2,12 +2,16 @@
|
||||||
/cache/*
|
/cache/*
|
||||||
!/cache/.gitkeep
|
!/cache/.gitkeep
|
||||||
|
|
||||||
# État de l'ingestion (jalons suivants)
|
# Secrets locaux
|
||||||
|
.env
|
||||||
|
|
||||||
|
# État de l'ingestion (base SQLite persistante)
|
||||||
|
/state/
|
||||||
*.db
|
*.db
|
||||||
*.db-journal
|
*.db-journal
|
||||||
*.db-wal
|
*.db-wal
|
||||||
|
|
||||||
# Python (jalons suivants)
|
# Python
|
||||||
__pycache__/
|
__pycache__/
|
||||||
*.pyc
|
*.pyc
|
||||||
.venv/
|
.venv/
|
||||||
|
|
|
||||||
40
README.md
40
README.md
|
|
@ -46,29 +46,43 @@ The stream is MP3 at 192 kbps. Multiple clients can listen at the same time.
|
||||||
New files dropped into `cache/` are picked up automatically (the playlist is
|
New files dropped into `cache/` are picked up automatically (the playlist is
|
||||||
reloaded when the directory changes).
|
reloaded when the directory changes).
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Copy `.env.example` to `.env` and fill in your Navidrome details:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
cp .env.example .env
|
||||||
|
# edit .env: RADIEO_NAVIDROME_URL / USER / PASSWORD / PLAYLIST
|
||||||
|
```
|
||||||
|
|
||||||
|
If the Navidrome variables are left empty, the source is simply disabled and
|
||||||
|
the stream plays whatever is already in `cache/` (the milestone-1/2 behaviour).
|
||||||
|
|
||||||
## Current status
|
## Current status
|
||||||
|
|
||||||
**Milestone 2 — ingestion daemon: done.**
|
**Milestone 3 — Navidrome provider: done.**
|
||||||
|
|
||||||
- `ingest` (Python) container exposes `GET /next`, returning the next track as
|
- `ingest` pulls tracks from an OpenSubsonic playlist (Navidrome), downloading
|
||||||
an annotated Liquidsoap URI (or an empty body when nothing is ready).
|
them into the shared cache ahead of playback (prefetch buffer).
|
||||||
- `stream` (Liquidsoap v2.4.5) pulls from `ingest` via a `request.dynamic`
|
- Play history and LRU retention are tracked in a SQLite database under
|
||||||
source, and falls back to the local `cache/` directory when the daemon has
|
`state/`: only the N most recently played files are kept on disk
|
||||||
nothing to offer.
|
(`RADIEO_RETENTION_KEEP`, default 20); anti-repeat avoids replaying a track
|
||||||
- HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps).
|
seen among the last plays.
|
||||||
- Continuous output guaranteed: silence rather than a crash when everything is
|
- `GET /next` returns the next track as an annotated Liquidsoap URI with real
|
||||||
empty (`mksafe`).
|
title/artist metadata (or an empty body when nothing is ready).
|
||||||
- Multiple simultaneous listeners supported.
|
- `stream` (Liquidsoap v2.4.5) pulls via `request.dynamic` and falls back to the
|
||||||
|
local `cache/` directory; `mksafe` guarantees silence rather than a crash.
|
||||||
|
- HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps),
|
||||||
|
multiple simultaneous listeners supported.
|
||||||
|
|
||||||
At this stage the daemon just cycles through the files already in `cache/`; the
|
The yt-dlp and ListenBrainz sources come next.
|
||||||
download providers (Navidrome, yt-dlp, ListenBrainz) come next.
|
|
||||||
|
|
||||||
## Roadmap
|
## Roadmap
|
||||||
|
|
||||||
1. ✅ **Broadcasting skeleton** — Liquidsoap serving the cache directory.
|
1. ✅ **Broadcasting skeleton** — Liquidsoap serving the cache directory.
|
||||||
2. ✅ **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap
|
2. ✅ **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap
|
||||||
switches to a `request.dynamic` source with the cache as fallback.
|
switches to a `request.dynamic` source with the cache as fallback.
|
||||||
3. **Navidrome provider** — play from an OpenSubsonic playlist, with caching,
|
3. ✅ **Navidrome provider** — play from an OpenSubsonic playlist, with caching,
|
||||||
LRU retention and play history.
|
LRU retention and play history.
|
||||||
4. **yt-dlp provider** — fetch tracks from a maintained URL/artist list; weighted
|
4. **yt-dlp provider** — fetch tracks from a maintained URL/artist list; weighted
|
||||||
mixing between sources.
|
mixing between sources.
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,19 @@ services:
|
||||||
build: ./ingest
|
build: ./ingest
|
||||||
image: radieo-ingest
|
image: radieo-ingest
|
||||||
volumes:
|
volumes:
|
||||||
- ./cache:/cache # volume partagé avec le stream
|
- ./cache:/cache # volume partagé avec le stream (rw : téléchargements)
|
||||||
|
- ./state:/state # état persistant (SQLite) hors du cache éphémère
|
||||||
environment:
|
environment:
|
||||||
- RADIEO_CACHE_DIR=/cache
|
- RADIEO_CACHE_DIR=/cache
|
||||||
|
- RADIEO_STATE_DIR=/state
|
||||||
- RADIEO_HTTP_PORT=8080
|
- RADIEO_HTTP_PORT=8080
|
||||||
|
# Source Navidrome / OpenSubsonic (voir .env / .env.example).
|
||||||
|
# Laisser vide désactive la source : le stream joue alors son cache local.
|
||||||
|
- RADIEO_NAVIDROME_URL=${RADIEO_NAVIDROME_URL:-}
|
||||||
|
- RADIEO_NAVIDROME_USER=${RADIEO_NAVIDROME_USER:-}
|
||||||
|
- RADIEO_NAVIDROME_PASSWORD=${RADIEO_NAVIDROME_PASSWORD:-}
|
||||||
|
- RADIEO_NAVIDROME_PLAYLIST=${RADIEO_NAVIDROME_PLAYLIST:-}
|
||||||
|
- RADIEO_RETENTION_KEEP=${RADIEO_RETENTION_KEEP:-20}
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
stream:
|
stream:
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,9 @@ FROM python:3.12-slim
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
# Milestone 2 uses the standard library only; third-party dependencies
|
COPY requirements.txt ./
|
||||||
# (httpx, yt-dlp, feedparser…) will be added in later milestones.
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
COPY radieo/ ./radieo/
|
COPY radieo/ ./radieo/
|
||||||
|
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
|
||||||
|
|
@ -1,34 +1,76 @@
|
||||||
"""Entry point: start the HTTP API serving the track queue."""
|
"""Entry point: wire providers/fetchers into the queue and serve GET /next."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from . import config
|
from . import config
|
||||||
from .api import IngestServer
|
from .api import IngestServer
|
||||||
|
from .db import Database
|
||||||
from .queue import TrackQueue
|
from .queue import TrackQueue
|
||||||
|
|
||||||
|
log = logging.getLogger("radieo")
|
||||||
|
|
||||||
|
|
||||||
|
class _NullProvider:
|
||||||
|
"""Fallback provider when no source is configured: never yields a track."""
|
||||||
|
|
||||||
|
def next(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _build_pipeline(db: Database):
|
||||||
|
"""Return (provider, fetcher). Falls back to a no-op provider when the
|
||||||
|
Navidrome source is not configured, so the daemon still runs and the
|
||||||
|
stream plays its local cache."""
|
||||||
|
if not config.NAVIDROME_ENABLED:
|
||||||
|
log.warning(
|
||||||
|
"Navidrome not configured (RADIEO_NAVIDROME_*): no source active, "
|
||||||
|
"the stream will play its local cache fallback."
|
||||||
|
)
|
||||||
|
return _NullProvider(), None
|
||||||
|
|
||||||
|
from .fetchers.subsonic import SubsonicFetcher
|
||||||
|
from .providers.navidrome import NavidromeProvider
|
||||||
|
from .subsonic import SubsonicClient
|
||||||
|
|
||||||
|
client = SubsonicClient(
|
||||||
|
config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD
|
||||||
|
)
|
||||||
|
provider = NavidromeProvider(client, config.NAVIDROME_PLAYLIST, db)
|
||||||
|
fetcher = SubsonicFetcher(client, config.CACHE_DIR)
|
||||||
|
log.info("Navidrome source enabled (playlist=%r)", config.NAVIDROME_PLAYLIST)
|
||||||
|
return provider, fetcher
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO,
|
level=logging.INFO,
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
)
|
)
|
||||||
log = logging.getLogger("radieo")
|
|
||||||
|
|
||||||
config.CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
config.CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), TrackQueue())
|
db = Database(config.STATE_DB)
|
||||||
|
provider, fetcher = _build_pipeline(db)
|
||||||
|
queue = TrackQueue(provider, fetcher, db)
|
||||||
|
queue.start()
|
||||||
|
|
||||||
|
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue)
|
||||||
log.info(
|
log.info(
|
||||||
"ingest listening on %s:%d (cache=%s)",
|
"ingest listening on %s:%d (cache=%s, state=%s)",
|
||||||
config.HTTP_HOST,
|
config.HTTP_HOST,
|
||||||
config.HTTP_PORT,
|
config.HTTP_PORT,
|
||||||
config.CACHE_DIR,
|
config.CACHE_DIR,
|
||||||
|
config.STATE_DB,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
server.serve_forever()
|
server.serve_forever()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
|
queue.stop()
|
||||||
server.server_close()
|
server.server_close()
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -10,24 +10,22 @@ import logging
|
||||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .models import Track
|
||||||
from .queue import TrackQueue
|
from .queue import TrackQueue
|
||||||
|
|
||||||
log = logging.getLogger("radieo.api")
|
log = logging.getLogger("radieo.api")
|
||||||
|
|
||||||
|
|
||||||
def annotate_uri(path: Path) -> str:
|
def annotate_uri(path: Path, track: Track) -> str:
|
||||||
"""Build an annotated Liquidsoap request URI for a cache file.
|
"""Build an annotated Liquidsoap request URI for a cache file."""
|
||||||
|
|
||||||
Metadata is minimal for now (title derived from the filename); real
|
|
||||||
metadata will come from the providers in later milestones.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def esc(value: str) -> str:
|
def esc(value: str) -> str:
|
||||||
return value.replace("\\", "\\\\").replace('"', '\\"')
|
return value.replace("\\", "\\\\").replace('"', '\\"')
|
||||||
|
|
||||||
title = esc(path.stem)
|
return (
|
||||||
artist = esc("radieo")
|
f'annotate:title="{esc(track.title)}",artist="{esc(track.artist)}"'
|
||||||
return f'annotate:title="{title}",artist="{artist}":{path}'
|
f":{path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class IngestServer(ThreadingHTTPServer):
|
class IngestServer(ThreadingHTTPServer):
|
||||||
|
|
@ -48,13 +46,14 @@ class _Handler(BaseHTTPRequestHandler):
|
||||||
self._text(404, "not found\n")
|
self._text(404, "not found\n")
|
||||||
|
|
||||||
def _serve_next(self):
|
def _serve_next(self):
|
||||||
track = self.server.queue.pop_next()
|
result = self.server.queue.pop_next()
|
||||||
if track is None:
|
if result is None:
|
||||||
# Empty body: tells Liquidsoap to use its fallback for now.
|
# Empty body: tells Liquidsoap to use its fallback for now.
|
||||||
self._text(200, "")
|
self._text(200, "")
|
||||||
return
|
return
|
||||||
log.info("next -> %s", track.name)
|
path, track = result
|
||||||
self._text(200, annotate_uri(track) + "\n")
|
log.info("next -> %s", track)
|
||||||
|
self._text(200, annotate_uri(path, track) + "\n")
|
||||||
|
|
||||||
def _text(self, code: int, body: str):
|
def _text(self, code: int, body: str):
|
||||||
data = body.encode("utf-8")
|
data = body.encode("utf-8")
|
||||||
|
|
|
||||||
|
|
@ -11,8 +11,33 @@ from pathlib import Path
|
||||||
# be valid inside *that* container, so both mount the cache at the same path.
|
# be valid inside *that* container, so both mount the cache at the same path.
|
||||||
CACHE_DIR = Path(os.environ.get("RADIEO_CACHE_DIR", "/cache"))
|
CACHE_DIR = Path(os.environ.get("RADIEO_CACHE_DIR", "/cache"))
|
||||||
|
|
||||||
|
# Persistent state (SQLite). Kept out of the ephemeral cache directory.
|
||||||
|
STATE_DIR = Path(os.environ.get("RADIEO_STATE_DIR", "/state"))
|
||||||
|
STATE_DB = STATE_DIR / "radieo.db"
|
||||||
|
|
||||||
HTTP_HOST = os.environ.get("RADIEO_HTTP_HOST", "0.0.0.0")
|
HTTP_HOST = os.environ.get("RADIEO_HTTP_HOST", "0.0.0.0")
|
||||||
HTTP_PORT = int(os.environ.get("RADIEO_HTTP_PORT", "8080"))
|
HTTP_PORT = int(os.environ.get("RADIEO_HTTP_PORT", "8080"))
|
||||||
|
|
||||||
# File extensions considered playable when scanning the cache.
|
# --- Prefetching / retention ---
|
||||||
AUDIO_EXTENSIONS = {".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"}
|
# How many downloaded tracks to keep ready ahead of playback.
|
||||||
|
PREFETCH = int(os.environ.get("RADIEO_PREFETCH", "3"))
|
||||||
|
# Seconds between prefetch-loop wake-ups.
|
||||||
|
PREFETCH_INTERVAL = float(os.environ.get("RADIEO_PREFETCH_INTERVAL", "2.0"))
|
||||||
|
# Keep the N most recently played files on disk; evict older ones (LRU).
|
||||||
|
RETENTION_KEEP = int(os.environ.get("RADIEO_RETENTION_KEEP", "20"))
|
||||||
|
# Do not replay a track seen among the last N plays, when avoidable.
|
||||||
|
ANTIREPEAT_WINDOW = int(os.environ.get("RADIEO_ANTIREPEAT_WINDOW", "50"))
|
||||||
|
|
||||||
|
# --- Navidrome / OpenSubsonic source ---
|
||||||
|
# Left empty means the provider is disabled (the stream then plays its own
|
||||||
|
# local-cache fallback). Credentials are expected to come from a .env file.
|
||||||
|
NAVIDROME_URL = os.environ.get("RADIEO_NAVIDROME_URL", "").strip()
|
||||||
|
NAVIDROME_USER = os.environ.get("RADIEO_NAVIDROME_USER", "").strip()
|
||||||
|
NAVIDROME_PASSWORD = os.environ.get("RADIEO_NAVIDROME_PASSWORD", "")
|
||||||
|
NAVIDROME_PLAYLIST = os.environ.get("RADIEO_NAVIDROME_PLAYLIST", "").strip()
|
||||||
|
# How often to reload the playlist contents, in seconds.
|
||||||
|
PLAYLIST_REFRESH = float(os.environ.get("RADIEO_PLAYLIST_REFRESH", "300"))
|
||||||
|
|
||||||
|
NAVIDROME_ENABLED = bool(
|
||||||
|
NAVIDROME_URL and NAVIDROME_USER and NAVIDROME_PASSWORD and NAVIDROME_PLAYLIST
|
||||||
|
)
|
||||||
|
|
|
||||||
109
ingest/radieo/db.py
Normal file
109
ingest/radieo/db.py
Normal file
|
|
@ -0,0 +1,109 @@
|
||||||
|
"""SQLite state: play history (anti-repeat + stats) and cache-file retention.
|
||||||
|
|
||||||
|
Two concerns, two tables:
|
||||||
|
|
||||||
|
- ``history`` is append-only. It drives anti-repeat (recently played track
|
||||||
|
keys) and survives cache eviction, so a track can stay "recently played" even
|
||||||
|
after its file is deleted.
|
||||||
|
- ``cache_files`` tracks downloaded files so we can keep only the N most
|
||||||
|
recently *played* ones (LRU retention). Files not yet played are never
|
||||||
|
evicted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .models import Track
|
||||||
|
|
||||||
|
_SCHEMA = """
|
||||||
|
CREATE TABLE IF NOT EXISTS history (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
track_key TEXT NOT NULL,
|
||||||
|
artist TEXT,
|
||||||
|
title TEXT,
|
||||||
|
origin TEXT,
|
||||||
|
played_at REAL NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_history_played_at ON history(played_at);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS cache_files (
|
||||||
|
path TEXT PRIMARY KEY,
|
||||||
|
track_key TEXT,
|
||||||
|
played_at REAL -- NULL until the file has been played
|
||||||
|
);
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class Database:
|
||||||
|
def __init__(self, path: Path):
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._conn = sqlite3.connect(
|
||||||
|
path, check_same_thread=False, isolation_level=None
|
||||||
|
)
|
||||||
|
self._conn.row_factory = sqlite3.Row
|
||||||
|
self._conn.executescript(_SCHEMA)
|
||||||
|
|
||||||
|
# --- anti-repeat / history -------------------------------------------
|
||||||
|
|
||||||
|
def recent_keys(self, limit: int) -> set[str]:
|
||||||
|
with self._lock:
|
||||||
|
rows = self._conn.execute(
|
||||||
|
"SELECT track_key FROM history ORDER BY played_at DESC LIMIT ?",
|
||||||
|
(limit,),
|
||||||
|
).fetchall()
|
||||||
|
return {r["track_key"] for r in rows}
|
||||||
|
|
||||||
|
def record_play(self, track: Track) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute(
|
||||||
|
"INSERT INTO history (track_key, artist, title, origin, played_at)"
|
||||||
|
" VALUES (?, ?, ?, ?, ?)",
|
||||||
|
(track.key, track.artist, track.title, track.origin, time.time()),
|
||||||
|
)
|
||||||
|
|
||||||
|
# --- cache-file retention --------------------------------------------
|
||||||
|
|
||||||
|
def register_download(self, path: str, track_key: str) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute(
|
||||||
|
"INSERT OR REPLACE INTO cache_files (path, track_key, played_at)"
|
||||||
|
" VALUES (?, ?, NULL)",
|
||||||
|
(path, track_key),
|
||||||
|
)
|
||||||
|
|
||||||
|
def mark_played(self, path: str) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute(
|
||||||
|
"UPDATE cache_files SET played_at = ? WHERE path = ?",
|
||||||
|
(time.time(), path),
|
||||||
|
)
|
||||||
|
|
||||||
|
def evict(self, keep: int) -> list[str]:
|
||||||
|
"""Return (and forget) played files beyond the ``keep`` most recent.
|
||||||
|
|
||||||
|
The caller is responsible for deleting the returned files from disk.
|
||||||
|
Never touches files that have not been played yet.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
rows = self._conn.execute(
|
||||||
|
"SELECT path FROM cache_files WHERE played_at IS NOT NULL"
|
||||||
|
" AND path NOT IN ("
|
||||||
|
" SELECT path FROM cache_files WHERE played_at IS NOT NULL"
|
||||||
|
" ORDER BY played_at DESC LIMIT ?"
|
||||||
|
")",
|
||||||
|
(keep,),
|
||||||
|
).fetchall()
|
||||||
|
paths = [r["path"] for r in rows]
|
||||||
|
if paths:
|
||||||
|
self._conn.executemany(
|
||||||
|
"DELETE FROM cache_files WHERE path = ?",
|
||||||
|
[(p,) for p in paths],
|
||||||
|
)
|
||||||
|
return paths
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.close()
|
||||||
1
ingest/radieo/fetchers/__init__.py
Normal file
1
ingest/radieo/fetchers/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
"""Fetchers turn a ``Track`` into a local file in the cache directory."""
|
||||||
46
ingest/radieo/fetchers/subsonic.py
Normal file
46
ingest/radieo/fetchers/subsonic.py
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
"""SubsonicFetcher: download a ``subsonic`` track into the cache directory.
|
||||||
|
|
||||||
|
Files are downloaded to a hidden, non-audio temporary name and then atomically
|
||||||
|
renamed into place, so the stream container never sees a partial file through
|
||||||
|
its shared read-only cache mount.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from ..models import Track
|
||||||
|
from ..subsonic import SubsonicClient
|
||||||
|
|
||||||
|
log = logging.getLogger("radieo.fetcher.subsonic")
|
||||||
|
|
||||||
|
_SAFE = re.compile(r"[^A-Za-z0-9._-]")
|
||||||
|
|
||||||
|
|
||||||
|
class SubsonicFetcher:
|
||||||
|
backend = "subsonic"
|
||||||
|
|
||||||
|
def __init__(self, client: SubsonicClient, cache_dir: Path):
|
||||||
|
self._client = client
|
||||||
|
self._cache_dir = cache_dir
|
||||||
|
|
||||||
|
def fetch(self, track: Track) -> Path:
|
||||||
|
stem = f"subsonic-{_SAFE.sub('_', track.locator)}"
|
||||||
|
# Reuse a still-cached copy rather than downloading again.
|
||||||
|
for existing in self._cache_dir.glob(f"{stem}.*"):
|
||||||
|
if not existing.name.endswith(".part"):
|
||||||
|
return existing
|
||||||
|
|
||||||
|
tmp = self._cache_dir / f".{uuid4().hex}.part"
|
||||||
|
try:
|
||||||
|
ext = self._client.download(
|
||||||
|
track.locator, tmp, hint_ext=track.source_ext
|
||||||
|
)
|
||||||
|
dest = self._cache_dir / f"{stem}{ext}"
|
||||||
|
tmp.replace(dest)
|
||||||
|
except BaseException:
|
||||||
|
tmp.unlink(missing_ok=True)
|
||||||
|
raise
|
||||||
|
log.info("downloaded %s -> %s", track, dest.name)
|
||||||
|
return dest
|
||||||
34
ingest/radieo/models.py
Normal file
34
ingest/radieo/models.py
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
"""Shared data model.
|
||||||
|
|
||||||
|
A ``Track`` is the uniform object every provider emits: a *resolved* reference
|
||||||
|
(which backend can download it, and where) plus display metadata. Fetchers turn
|
||||||
|
it into a local file; the queue and the state database use ``key`` for
|
||||||
|
de-duplication and anti-repeat.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Track:
|
||||||
|
backend: str # which fetcher handles it: "subsonic" | "ytdlp"
|
||||||
|
locator: str # backend-specific: Subsonic song id, or a media URL
|
||||||
|
artist: str
|
||||||
|
title: str
|
||||||
|
origin: str # provider that produced it, e.g. "navidrome"
|
||||||
|
mbid: str | None = None # filled by the Canonicalizer (milestone 5)
|
||||||
|
source_ext: str | None = None # filename hint, e.g. "mp3", "flac"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def key(self) -> str:
|
||||||
|
"""Stable identity for de-duplication and anti-repeat.
|
||||||
|
|
||||||
|
Until the Canonicalizer (milestone 5) fills ``mbid``, we key on the
|
||||||
|
backend locator, which is unique within a source.
|
||||||
|
"""
|
||||||
|
if self.mbid:
|
||||||
|
return f"mbid:{self.mbid}"
|
||||||
|
return f"{self.backend}:{self.locator}"
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f"{self.artist} — {self.title} [{self.origin}]"
|
||||||
1
ingest/radieo/providers/__init__.py
Normal file
1
ingest/radieo/providers/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
"""Providers decide *what* to play, emitting resolved ``Track`` objects."""
|
||||||
68
ingest/radieo/providers/navidrome.py
Normal file
68
ingest/radieo/providers/navidrome.py
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
"""NavidromeProvider: picks tracks from an OpenSubsonic playlist.
|
||||||
|
|
||||||
|
Emits ``subsonic`` tracks (locator = song id). The playlist is cached in
|
||||||
|
memory and refreshed periodically. Anti-repeat is applied by filtering out
|
||||||
|
tracks whose key is among the recently played ones; if that empties the pool
|
||||||
|
(short playlist), the filter is dropped so playback never stalls.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from .. import config
|
||||||
|
from ..db import Database
|
||||||
|
from ..models import Track
|
||||||
|
from ..subsonic import SubsonicClient, SubsonicError
|
||||||
|
|
||||||
|
log = logging.getLogger("radieo.provider.navidrome")
|
||||||
|
|
||||||
|
|
||||||
|
class NavidromeProvider:
|
||||||
|
name = "navidrome"
|
||||||
|
|
||||||
|
def __init__(self, client: SubsonicClient, playlist_ref: str, db: Database):
|
||||||
|
self._client = client
|
||||||
|
self._playlist_ref = playlist_ref
|
||||||
|
self._db = db
|
||||||
|
self._playlist_id: str | None = None
|
||||||
|
self._songs: list[dict] = []
|
||||||
|
self._loaded_at = 0.0
|
||||||
|
|
||||||
|
def _ensure_songs(self) -> None:
|
||||||
|
now = time.time()
|
||||||
|
if self._songs and now - self._loaded_at < config.PLAYLIST_REFRESH:
|
||||||
|
return
|
||||||
|
if self._playlist_id is None:
|
||||||
|
self._playlist_id = self._client.resolve_playlist_id(
|
||||||
|
self._playlist_ref
|
||||||
|
)
|
||||||
|
songs = self._client.get_playlist_songs(self._playlist_id)
|
||||||
|
self._songs = songs
|
||||||
|
self._loaded_at = now
|
||||||
|
log.info("loaded %d songs from playlist %r", len(songs), self._playlist_ref)
|
||||||
|
|
||||||
|
def next(self) -> Track | None:
|
||||||
|
try:
|
||||||
|
self._ensure_songs()
|
||||||
|
except (SubsonicError, httpx.HTTPError, OSError) as exc:
|
||||||
|
log.warning("could not load playlist: %s", exc)
|
||||||
|
return None
|
||||||
|
if not self._songs:
|
||||||
|
return None
|
||||||
|
|
||||||
|
recent = self._db.recent_keys(config.ANTIREPEAT_WINDOW)
|
||||||
|
candidates = [
|
||||||
|
s for s in self._songs if f"subsonic:{s['id']}" not in recent
|
||||||
|
] or self._songs
|
||||||
|
song = random.choice(candidates)
|
||||||
|
return Track(
|
||||||
|
backend="subsonic",
|
||||||
|
locator=str(song["id"]),
|
||||||
|
artist=song.get("artist", "Unknown artist"),
|
||||||
|
title=song.get("title", str(song["id"])),
|
||||||
|
origin=self.name,
|
||||||
|
source_ext=song.get("suffix"),
|
||||||
|
)
|
||||||
|
|
@ -1,50 +1,89 @@
|
||||||
"""Queue of tracks ready to be served to the stream layer.
|
"""Prefetching pipeline feeding ready-to-play tracks to the stream layer.
|
||||||
|
|
||||||
Milestone 2: a track is simply an audio file already present in the cache
|
A background thread keeps a small buffer of downloaded tracks: it asks the
|
||||||
directory. When the queue drains, it is refilled with a fresh shuffle of the
|
provider what to play next, has the matching fetcher download it into the
|
||||||
available files. Later milestones will replace the refill logic with providers
|
cache, and enqueues the resulting file. ``pop_next`` hands the oldest ready
|
||||||
that download tracks (Navidrome, yt-dlp, …) before enqueuing them.
|
track to the HTTP API, records the play and runs LRU retention.
|
||||||
|
|
||||||
|
If the provider has nothing (e.g. Navidrome not configured, or unreachable),
|
||||||
|
the buffer simply stays empty and ``pop_next`` returns ``None`` — the stream
|
||||||
|
then plays its own local-cache fallback.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import random
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from . import config
|
from . import config
|
||||||
|
from .db import Database
|
||||||
|
from .models import Track
|
||||||
|
|
||||||
|
log = logging.getLogger("radieo.queue")
|
||||||
|
|
||||||
|
|
||||||
class TrackQueue:
|
class TrackQueue:
|
||||||
def __init__(self):
|
def __init__(self, provider, fetcher, db: Database):
|
||||||
|
self._provider = provider
|
||||||
|
self._fetcher = fetcher
|
||||||
|
self._db = db
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._upcoming: deque[Path] = deque()
|
self._ready: deque[tuple[Path, Track]] = deque()
|
||||||
self._last_served: Path | None = None
|
self._stop = threading.Event()
|
||||||
|
self._thread = threading.Thread(
|
||||||
def _available_files(self) -> list[Path]:
|
target=self._run, name="prefetch", daemon=True
|
||||||
if not config.CACHE_DIR.is_dir():
|
|
||||||
return []
|
|
||||||
return sorted(
|
|
||||||
p
|
|
||||||
for p in config.CACHE_DIR.iterdir()
|
|
||||||
if p.is_file() and p.suffix.lower() in config.AUDIO_EXTENSIONS
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _refill_locked(self) -> None:
|
def start(self) -> None:
|
||||||
files = self._available_files()
|
self._thread.start()
|
||||||
if not files:
|
|
||||||
return
|
|
||||||
# Avoid replaying the last served track back-to-back when we can.
|
|
||||||
pool = [f for f in files if f != self._last_served] or files
|
|
||||||
random.shuffle(pool)
|
|
||||||
self._upcoming.extend(pool)
|
|
||||||
|
|
||||||
def pop_next(self) -> Path | None:
|
def stop(self) -> None:
|
||||||
"""Return the next track to play, or None if the cache is empty."""
|
self._stop.set()
|
||||||
|
|
||||||
|
# --- background prefetching ------------------------------------------
|
||||||
|
|
||||||
|
def _run(self) -> None:
|
||||||
|
while not self._stop.is_set():
|
||||||
|
try:
|
||||||
|
self._prefetch()
|
||||||
|
except Exception: # never let the loop die
|
||||||
|
log.exception("prefetch loop error")
|
||||||
|
self._stop.wait(config.PREFETCH_INTERVAL)
|
||||||
|
|
||||||
|
def _prefetch(self) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if not self._upcoming:
|
missing = config.PREFETCH - len(self._ready)
|
||||||
self._refill_locked()
|
for _ in range(max(0, missing)):
|
||||||
if not self._upcoming:
|
if self._stop.is_set():
|
||||||
|
return
|
||||||
|
track = self._provider.next()
|
||||||
|
if track is None:
|
||||||
|
return # nothing to fetch right now
|
||||||
|
try:
|
||||||
|
path = self._fetcher.fetch(track)
|
||||||
|
except Exception:
|
||||||
|
log.exception("fetch failed for %s", track)
|
||||||
|
continue
|
||||||
|
self._db.register_download(str(path), track.key)
|
||||||
|
with self._lock:
|
||||||
|
self._ready.append((path, track))
|
||||||
|
|
||||||
|
# --- serving ----------------------------------------------------------
|
||||||
|
|
||||||
|
def pop_next(self) -> tuple[Path, Track] | None:
|
||||||
|
with self._lock:
|
||||||
|
if not self._ready:
|
||||||
return None
|
return None
|
||||||
track = self._upcoming.popleft()
|
path, track = self._ready.popleft()
|
||||||
self._last_served = track
|
self._db.mark_played(str(path))
|
||||||
return track
|
self._db.record_play(track)
|
||||||
|
self._evict()
|
||||||
|
return path, track
|
||||||
|
|
||||||
|
def _evict(self) -> None:
|
||||||
|
for path in self._db.evict(config.RETENTION_KEEP):
|
||||||
|
try:
|
||||||
|
Path(path).unlink(missing_ok=True)
|
||||||
|
log.info("evicted %s", Path(path).name)
|
||||||
|
except OSError:
|
||||||
|
log.exception("could not evict %s", path)
|
||||||
|
|
|
||||||
115
ingest/radieo/subsonic.py
Normal file
115
ingest/radieo/subsonic.py
Normal file
|
|
@ -0,0 +1,115 @@
|
||||||
|
"""Minimal OpenSubsonic client (enough for Navidrome playback).
|
||||||
|
|
||||||
|
Uses salted-token authentication (``t = md5(password + salt)``), the scheme
|
||||||
|
recommended by the Subsonic API since 1.13.0 and supported by Navidrome.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
import secrets
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
log = logging.getLogger("radieo.subsonic")
|
||||||
|
|
||||||
|
# Advertised API version and client name.
|
||||||
|
_API_VERSION = "1.16.1"
|
||||||
|
_CLIENT = "radieo"
|
||||||
|
|
||||||
|
# Content-Type -> file extension, used to name downloaded files.
|
||||||
|
_CTYPE_EXT = {
|
||||||
|
"audio/mpeg": ".mp3",
|
||||||
|
"audio/mp3": ".mp3",
|
||||||
|
"audio/flac": ".flac",
|
||||||
|
"audio/x-flac": ".flac",
|
||||||
|
"audio/ogg": ".ogg",
|
||||||
|
"application/ogg": ".ogg",
|
||||||
|
"audio/opus": ".opus",
|
||||||
|
"audio/mp4": ".m4a",
|
||||||
|
"audio/x-m4a": ".m4a",
|
||||||
|
"audio/aac": ".aac",
|
||||||
|
"audio/wav": ".wav",
|
||||||
|
"audio/x-wav": ".wav",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class SubsonicError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SubsonicClient:
|
||||||
|
def __init__(self, base_url: str, user: str, password: str):
|
||||||
|
self._base = base_url.rstrip("/")
|
||||||
|
self._user = user
|
||||||
|
self._password = password
|
||||||
|
self._http = httpx.Client(timeout=30.0, follow_redirects=True)
|
||||||
|
|
||||||
|
def _auth_params(self) -> dict[str, str]:
|
||||||
|
salt = secrets.token_hex(8)
|
||||||
|
token = hashlib.md5((self._password + salt).encode()).hexdigest()
|
||||||
|
return {
|
||||||
|
"u": self._user,
|
||||||
|
"t": token,
|
||||||
|
"s": salt,
|
||||||
|
"v": _API_VERSION,
|
||||||
|
"c": _CLIENT,
|
||||||
|
"f": "json",
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_json(self, view: str, **params) -> dict:
|
||||||
|
url = f"{self._base}/rest/{view}"
|
||||||
|
resp = self._http.get(url, params={**self._auth_params(), **params})
|
||||||
|
resp.raise_for_status()
|
||||||
|
body = resp.json()["subsonic-response"]
|
||||||
|
if body.get("status") != "ok":
|
||||||
|
err = body.get("error", {})
|
||||||
|
raise SubsonicError(
|
||||||
|
f"{view}: {err.get('code')} {err.get('message')}"
|
||||||
|
)
|
||||||
|
return body
|
||||||
|
|
||||||
|
def ping(self) -> None:
|
||||||
|
self._get_json("ping")
|
||||||
|
|
||||||
|
def resolve_playlist_id(self, name_or_id: str) -> str:
|
||||||
|
"""Accept either a playlist id or a playlist name."""
|
||||||
|
body = self._get_json("getPlaylists")
|
||||||
|
playlists = body.get("playlists", {}).get("playlist", [])
|
||||||
|
for pl in playlists:
|
||||||
|
if pl.get("id") == name_or_id or pl.get("name") == name_or_id:
|
||||||
|
return pl["id"]
|
||||||
|
raise SubsonicError(f"playlist not found: {name_or_id!r}")
|
||||||
|
|
||||||
|
def get_playlist_songs(self, playlist_id: str) -> list[dict]:
|
||||||
|
body = self._get_json("getPlaylist", id=playlist_id)
|
||||||
|
return body.get("playlist", {}).get("entry", [])
|
||||||
|
|
||||||
|
def download(self, song_id: str, dest: Path, hint_ext: str | None = None) -> str:
|
||||||
|
"""Download a song to ``dest``; return the file extension used.
|
||||||
|
|
||||||
|
``format=raw`` asks Navidrome for the original file (no transcoding),
|
||||||
|
keeping quality and letting Liquidsoap decode it.
|
||||||
|
"""
|
||||||
|
params = {**self._auth_params(), "id": song_id, "format": "raw"}
|
||||||
|
with self._http.stream(
|
||||||
|
"GET", f"{self._base}/rest/stream", params=params
|
||||||
|
) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
ctype = resp.headers.get("content-type", "").split(";")[0].strip()
|
||||||
|
if ctype.startswith(("application/json", "text/xml")):
|
||||||
|
raise SubsonicError(
|
||||||
|
f"stream {song_id}: error response {resp.read()[:200]!r}"
|
||||||
|
)
|
||||||
|
ext = _CTYPE_EXT.get(ctype)
|
||||||
|
if ext is None and hint_ext:
|
||||||
|
ext = "." + hint_ext.lstrip(".")
|
||||||
|
if ext is None:
|
||||||
|
ext = ".mp3"
|
||||||
|
with open(dest, "wb") as fh:
|
||||||
|
for chunk in resp.iter_bytes(chunk_size=65536):
|
||||||
|
fh.write(chunk)
|
||||||
|
return ext
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._http.close()
|
||||||
1
ingest/requirements.txt
Normal file
1
ingest/requirements.txt
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
httpx>=0.27
|
||||||
Loading…
Add table
Add a link
Reference in a new issue