diff --git a/README.md b/README.md index c00bea3..aaadaf4 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ sharing a cache volume: next track over HTTP at `GET /next`. *(currently it only serves the cache directory; the download providers come in later milestones — see roadmap)* - **`stream`** (Liquidsoap) — deliberately dumb. It pulls the next track from - the `ingest` daemon, broadcasts the audio over HTTP, and never goes silent - thanks to a local cache fallback. + the `ingest` daemon, broadcasts the audio over HTTP, and falls back to the + already-aired tracks (via `/fallback.m3u`) if the daemon has nothing ready. Playback sources (planned): a [Navidrome](https://www.navidrome.org/) library via the OpenSubsonic API, arbitrary tracks fetched with @@ -89,9 +89,13 @@ Open the player at `http://localhost:8000/` — a small web page with an `/nowplaying` (JSON), fed from the broadcast source's live metadata — so it is accurate even though the ingest daemon runs a track ahead (prefetch). - A 3 s crossfade smooths transitions between tracks. -- The fallback playlist now ignores non-audio and hidden files (`.gitkeep`, - in-progress `.part`), so the earlier startup ffmpeg "Invalid data" warnings - are gone. +- The fallback only replays tracks **already aired**: the ingest daemon exposes + them as an `/fallback.m3u` playlist (served into a local file the stream + watches). The pre-fetch buffer (downloaded but not-yet-played tracks) is + excluded, so at cold start the list is empty and the stream stays **silent** + rather than looping the two or three tracks being pre-fetched. Once the buffer + drains mid-stream, it degrades gracefully across every track heard so far + instead of a tight loop. - Robustness: the ingest daemon shuts down cleanly on SIGTERM (fast `docker compose down`), has a Docker healthcheck on `/healthz` (the stream waits for it to be healthy), and outgoing HTTP calls retry transient diff --git a/ingest/radieo/__main__.py b/ingest/radieo/__main__.py index 160f4ff..a1d03aa 100644 --- a/ingest/radieo/__main__.py +++ b/ingest/radieo/__main__.py @@ -153,7 +153,7 @@ def main() -> None: queue = TrackQueue(scheduler, fetchers, db) queue.start() - server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue) + server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue, db) log.info( "ingest listening on %s:%d (cache=%s, state=%s)", config.HTTP_HOST, diff --git a/ingest/radieo/api.py b/ingest/radieo/api.py index 1209dd2..1c0d2e9 100644 --- a/ingest/radieo/api.py +++ b/ingest/radieo/api.py @@ -1,15 +1,19 @@ """HTTP API exposing the next track to the stream layer. Endpoints: - GET /next -> annotated Liquidsoap URI, or an empty body when nothing is - ready (Liquidsoap then falls back to the local cache). - GET /healthz -> "ok" + GET /next -> annotated Liquidsoap URI, or an empty body when nothing + is ready (Liquidsoap then falls back to /fallback.m3u). + GET /fallback.m3u -> playlist of already-aired files, the stream's safety + net; empty (→ silence) until something has played. + GET /healthz -> "ok" """ import logging from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path +from . import config +from .db import Database from .models import Track from .queue import TrackQueue @@ -29,9 +33,10 @@ def annotate_uri(path: Path, track: Track) -> str: class IngestServer(ThreadingHTTPServer): - def __init__(self, address, queue: TrackQueue): + def __init__(self, address, queue: TrackQueue, db: Database): super().__init__(address, _Handler) self.queue = queue + self.db = db class _Handler(BaseHTTPRequestHandler): @@ -40,6 +45,8 @@ class _Handler(BaseHTTPRequestHandler): def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler) if self.path == "/next": self._serve_next() + elif self.path == "/fallback.m3u": + self._serve_fallback() elif self.path == "/healthz": self._text(200, "ok\n") else: @@ -55,10 +62,18 @@ class _Handler(BaseHTTPRequestHandler): log.info("next -> %s", track) self._text(200, annotate_uri(path, track) + "\n") - def _text(self, code: int, body: str): + def _serve_fallback(self): + # Only already-aired files, newest first, that still exist on disk. + lines = ["#EXTM3U"] + for p in self.server.db.played_files(config.RETENTION_KEEP): + if Path(p).exists(): + lines.append(p) + self._text(200, "\n".join(lines) + "\n", "audio/x-mpegurl; charset=utf-8") + + def _text(self, code: int, body: str, ctype: str = "text/plain; charset=utf-8"): data = body.encode("utf-8") self.send_response(code) - self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) diff --git a/ingest/radieo/db.py b/ingest/radieo/db.py index e48bacf..01519cf 100644 --- a/ingest/radieo/db.py +++ b/ingest/radieo/db.py @@ -141,6 +141,20 @@ class Database: (path, track_key), ) + def played_files(self, limit: int) -> list[str]: + """Files already aired, newest first (the stream's fallback pool). + + Only played tracks appear here, so files still being pre-fetched are + never served as fallback — that is what stopped the cold-start loop. + """ + with self._lock: + rows = self._conn.execute( + "SELECT path FROM cache_files WHERE played_at IS NOT NULL" + " ORDER BY played_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [r["path"] for r in rows] + def mark_played(self, path: str) -> None: with self._lock: self._conn.execute( diff --git a/stream/radio.liq b/stream/radio.liq index eb9c73c..8e536a1 100644 --- a/stream/radio.liq +++ b/stream/radio.liq @@ -34,8 +34,8 @@ end main = request.dynamic(next_track, prefetch=1, retry_delay=1.0) # Filtre du secours : ne garder que les vrais fichiers audio et ignorer les -# fichiers cachés (.gitkeep, téléchargements .part en cours). Évite que le -# décodeur ne tente — et logue en erreur — des fichiers non-audio. +# fichiers cachés (.gitkeep, téléchargements .part en cours). Ceinture et +# bretelles : la m3u ne liste déjà que des fichiers audio réels. audio_ext = [".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"] def audio_only(r) = u = string.case(lower=true, request.uri(r)) @@ -44,9 +44,29 @@ def audio_only(r) = is_audio and not string.contains(prefix=".", base) end -# Secours : le cache local, joué en aléatoire, rechargé quand il change. +# Secours : uniquement les morceaux DÉJÀ passés à l'antenne. Le daemon les +# expose en m3u (/fallback.m3u) ; on la récupère périodiquement dans un fichier +# LOCAL que playlist surveille. Le buffer de préchargement (morceaux pas encore +# diffusés) en est exclu : au démarrage à froid la liste est vide → silence +# assumé plutôt qu'une boucle sur 2-3 titres. +# Passer par un fichier local (plutôt qu'une URL directe dans playlist) évite la +# détection de type hasardeuse du résolveur http, et garde la dernière liste +# valide si l'ingest devient injoignable. +fallback_url = "http://ingest:8080/fallback.m3u" +fallback_file = "/tmp/fallback.m3u" +file.write(data="#EXTM3U\n", atomic=true, fallback_file) # amorce vide au boot + +def refresh_fallback() = + resp = http.get(fallback_url, timeout=5.0) + if resp.status_code == 200 then + file.write(data=resp, atomic=true, fallback_file) + end +end +thread.run(fast=false, every={30.}, refresh_fallback) + backup = playlist( - mode="randomize", reload_mode="watch", check_next=audio_only, "/cache" + mode="randomize", reload_mode="watch", mime_type="audio/x-mpegurl", + check_next=audio_only, fallback_file ) # fallback préfère la source principale et bascule sur le cache si elle n'a