From f8eb0655eb79c5c778de26f1c207b5c24235ecd3 Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Thu, 2 Jul 2026 16:52:49 +0800 Subject: [PATCH] Milestone 2: ingestion daemon driving the stream Add the Python `ingest` container exposing `GET /next`, which returns the next track as an annotated Liquidsoap URI (or an empty body when nothing is ready). Liquidsoap switches from a static playlist to a `request.dynamic` source pulling from the daemon, with the local cache as fallback and mksafe for guaranteed continuous output. For now the daemon just cycles through the files already in the cache; the download providers (Navidrome, yt-dlp, ListenBrainz) come in later milestones. Also commit the implementation plan (PLAN.md). Co-Authored-By: Claude Opus 4.8 --- README.md | 24 ++++++++------ docker-compose.yml | 16 +++++++-- ingest/Dockerfile | 12 +++++++ ingest/radieo/__init__.py | 5 +++ ingest/radieo/__main__.py | 35 ++++++++++++++++++++ ingest/radieo/api.py | 68 +++++++++++++++++++++++++++++++++++++++ ingest/radieo/config.py | 18 +++++++++++ ingest/radieo/queue.py | 50 ++++++++++++++++++++++++++++ stream/radio.liq | 38 +++++++++++++++++----- 9 files changed, 247 insertions(+), 19 deletions(-) create mode 100644 ingest/Dockerfile create mode 100644 ingest/radieo/__init__.py create mode 100644 ingest/radieo/__main__.py create mode 100644 ingest/radieo/api.py create mode 100644 ingest/radieo/config.py create mode 100644 ingest/radieo/queue.py diff --git a/README.md b/README.md index 1f3033d..5d71679 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,11 @@ sharing a cache volume: - **`ingest`** (Python) — the brain. It decides what to play next, resolves and downloads tracks into a local cache, keeps a pre-filled queue, and exposes the - next track over HTTP. *(planned — see roadmap below)* -- **`stream`** (Liquidsoap) — deliberately dumb. It broadcasts the audio over - HTTP and never goes silent thanks to a local fallback. + next track over HTTP at `GET /next`. *(currently it only serves the cache + directory; the download providers come in later milestones — see roadmap)* +- **`stream`** (Liquidsoap) — deliberately dumb. It pulls the next track from + the `ingest` daemon, broadcasts the audio over HTTP, and never goes silent + thanks to a local cache fallback. Playback sources (planned): a [Navidrome](https://www.navidrome.org/) library via the OpenSubsonic API, arbitrary tracks fetched with @@ -46,21 +48,25 @@ reloaded when the directory changes). ## Current status -**Milestone 1 — broadcasting skeleton: done.** +**Milestone 2 — ingestion daemon: done.** -- Liquidsoap (v2.4.5) container plays the `cache/` directory in random order. +- `ingest` (Python) container exposes `GET /next`, returning the next track as + an annotated Liquidsoap URI (or an empty body when nothing is ready). +- `stream` (Liquidsoap v2.4.5) pulls from `ingest` via a `request.dynamic` + source, and falls back to the local `cache/` directory when the daemon has + nothing to offer. - HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps). -- Continuous output guaranteed: silence rather than a crash when the cache is +- Continuous output guaranteed: silence rather than a crash when everything is empty (`mksafe`). - Multiple simultaneous listeners supported. -At this stage the playlist is filled manually; the automatic ingestion layer is -not implemented yet. +At this stage the daemon just cycles through the files already in `cache/`; the +download providers (Navidrome, yt-dlp, ListenBrainz) come next. ## Roadmap 1. ✅ **Broadcasting skeleton** — Liquidsoap serving the cache directory. -2. **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap +2. ✅ **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap switches to a `request.dynamic` source with the cache as fallback. 3. **Navidrome provider** — play from an OpenSubsonic playlist, with caching, LRU retention and play history. diff --git a/docker-compose.yml b/docker-compose.yml index 70ff6c2..abc4625 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,21 @@ services: + ingest: + build: ./ingest + image: radieo-ingest + volumes: + - ./cache:/cache # volume partagé avec le stream + environment: + - RADIEO_CACHE_DIR=/cache + - RADIEO_HTTP_PORT=8080 + restart: unless-stopped + stream: build: ./stream image: radieo-stream + depends_on: + - ingest ports: - - "8000:8000" # flux HTTP : http://localhost:8000/radio.mp3 + - "8000:8000" # flux HTTP : http://localhost:8000/radio.mp3 volumes: - - ./cache:/cache:ro # jalon 1 : lecture seule, rempli à la main + - ./cache:/cache:ro # lecture seule : secours + résolution des chemins restart: unless-stopped diff --git a/ingest/Dockerfile b/ingest/Dockerfile new file mode 100644 index 0000000..69a8f6f --- /dev/null +++ b/ingest/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Milestone 2 uses the standard library only; third-party dependencies +# (httpx, yt-dlp, feedparser…) will be added in later milestones. +COPY radieo/ ./radieo/ + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 + +CMD ["python", "-m", "radieo"] diff --git a/ingest/radieo/__init__.py b/ingest/radieo/__init__.py new file mode 100644 index 0000000..ad341ae --- /dev/null +++ b/ingest/radieo/__init__.py @@ -0,0 +1,5 @@ +"""radieo ingestion daemon. + +Decides what to play next, keeps a queue of ready tracks and exposes the next +one over HTTP for the Liquidsoap stream layer to pick up. +""" diff --git a/ingest/radieo/__main__.py b/ingest/radieo/__main__.py new file mode 100644 index 0000000..a7e8840 --- /dev/null +++ b/ingest/radieo/__main__.py @@ -0,0 +1,35 @@ +"""Entry point: start the HTTP API serving the track queue.""" + +import logging + +from . import config +from .api import IngestServer +from .queue import TrackQueue + + +def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + log = logging.getLogger("radieo") + + config.CACHE_DIR.mkdir(parents=True, exist_ok=True) + + server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), TrackQueue()) + log.info( + "ingest listening on %s:%d (cache=%s)", + config.HTTP_HOST, + config.HTTP_PORT, + config.CACHE_DIR, + ) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/ingest/radieo/api.py b/ingest/radieo/api.py new file mode 100644 index 0000000..dfb2743 --- /dev/null +++ b/ingest/radieo/api.py @@ -0,0 +1,68 @@ +"""HTTP API exposing the next track to the stream layer. + +Endpoints: + GET /next -> annotated Liquidsoap URI, or an empty body when nothing is + ready (Liquidsoap then falls back to the local cache). + GET /healthz -> "ok" +""" + +import logging +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path + +from .queue import TrackQueue + +log = logging.getLogger("radieo.api") + + +def annotate_uri(path: Path) -> str: + """Build an annotated Liquidsoap request URI for a cache file. + + Metadata is minimal for now (title derived from the filename); real + metadata will come from the providers in later milestones. + """ + + def esc(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"') + + title = esc(path.stem) + artist = esc("radieo") + return f'annotate:title="{title}",artist="{artist}":{path}' + + +class IngestServer(ThreadingHTTPServer): + def __init__(self, address, queue: TrackQueue): + super().__init__(address, _Handler) + self.queue = queue + + +class _Handler(BaseHTTPRequestHandler): + server: IngestServer + + def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler) + if self.path == "/next": + self._serve_next() + elif self.path == "/healthz": + self._text(200, "ok\n") + else: + self._text(404, "not found\n") + + def _serve_next(self): + track = self.server.queue.pop_next() + if track is None: + # Empty body: tells Liquidsoap to use its fallback for now. + self._text(200, "") + return + log.info("next -> %s", track.name) + self._text(200, annotate_uri(track) + "\n") + + def _text(self, code: int, body: str): + data = body.encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + def log_message(self, fmt, *args): + log.debug("%s - %s", self.address_string(), fmt % args) diff --git a/ingest/radieo/config.py b/ingest/radieo/config.py new file mode 100644 index 0000000..a095658 --- /dev/null +++ b/ingest/radieo/config.py @@ -0,0 +1,18 @@ +"""Runtime configuration, read from the environment. + +Kept intentionally small; later milestones will add source credentials, +weights and retention settings here (or in a config file). +""" + +import os +from pathlib import Path + +# Directory shared with the stream container. Paths returned to Liquidsoap must +# be valid inside *that* container, so both mount the cache at the same path. +CACHE_DIR = Path(os.environ.get("RADIEO_CACHE_DIR", "/cache")) + +HTTP_HOST = os.environ.get("RADIEO_HTTP_HOST", "0.0.0.0") +HTTP_PORT = int(os.environ.get("RADIEO_HTTP_PORT", "8080")) + +# File extensions considered playable when scanning the cache. +AUDIO_EXTENSIONS = {".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"} diff --git a/ingest/radieo/queue.py b/ingest/radieo/queue.py new file mode 100644 index 0000000..ac182d6 --- /dev/null +++ b/ingest/radieo/queue.py @@ -0,0 +1,50 @@ +"""Queue of tracks ready to be served to the stream layer. + +Milestone 2: a track is simply an audio file already present in the cache +directory. When the queue drains, it is refilled with a fresh shuffle of the +available files. Later milestones will replace the refill logic with providers +that download tracks (Navidrome, yt-dlp, …) before enqueuing them. +""" + +import random +import threading +from collections import deque +from pathlib import Path + +from . import config + + +class TrackQueue: + def __init__(self): + self._lock = threading.Lock() + self._upcoming: deque[Path] = deque() + self._last_served: Path | None = None + + def _available_files(self) -> list[Path]: + if not config.CACHE_DIR.is_dir(): + return [] + return sorted( + p + for p in config.CACHE_DIR.iterdir() + if p.is_file() and p.suffix.lower() in config.AUDIO_EXTENSIONS + ) + + def _refill_locked(self) -> None: + files = self._available_files() + if not files: + return + # Avoid replaying the last served track back-to-back when we can. + pool = [f for f in files if f != self._last_served] or files + random.shuffle(pool) + self._upcoming.extend(pool) + + def pop_next(self) -> Path | None: + """Return the next track to play, or None if the cache is empty.""" + with self._lock: + if not self._upcoming: + self._refill_locked() + if not self._upcoming: + return None + track = self._upcoming.popleft() + self._last_served = track + return track diff --git a/stream/radio.liq b/stream/radio.liq index 8388d60..5ac2796 100644 --- a/stream/radio.liq +++ b/stream/radio.liq @@ -1,9 +1,9 @@ #!/usr/bin/liquidsoap -# radieo — couche diffusion (jalon 1) -# Joue le dossier /cache en boucle aléatoire et le diffuse en HTTP. -# Les jalons suivants remplaceront la source par un request.dynamic piloté -# par le daemon d'ingestion, en gardant ce dossier comme secours. +# radieo — couche diffusion (jalon 2) +# La source principale est pilotée par le daemon d'ingestion via GET /next. +# Le dossier /cache sert de secours quand le daemon n'a rien à proposer +# (daemon indisponible, file momentanément vide…). Si tout est vide : silence. # --- Journalisation : tout sur la sortie standard (pratique en conteneur) --- settings.log.stdout := true @@ -13,11 +13,33 @@ settings.log.level := 3 # --- Harbor : écoute sur toutes les interfaces du conteneur --- settings.harbor.bind_addrs := ["0.0.0.0"] -# --- Source : le dossier de cache, rechargé quand son contenu change --- -radio = playlist(mode="randomize", reload_mode="watch", "/cache") +# URL du daemon d'ingestion (nom de service résolu par docker-compose). +ingest_url = "http://ingest:8080/next" -# mksafe garantit un flux continu : si la source échoue ou est vide, -# Liquidsoap émet du silence plutôt que de planter. +# Callback appelé par request.dynamic pour obtenir le prochain morceau. +# Renvoie une requête à jouer, ou null() si rien n'est disponible (→ secours). +def next_track() = + resp = http.get(ingest_url, timeout=5.0) + body = string.trim(resp) + if resp.status_code == 200 and body != "" then + request.create(body) + else + null + end +end + +# Source principale : pilotée par le daemon. prefetch=1 pour anticiper le +# prochain morceau ; retry_delay pour ne pas marteler le daemon en cas de vide. +main = request.dynamic(next_track, prefetch=1, retry_delay=1.0) + +# Secours : le cache local, joué en aléatoire, rechargé quand il change. +backup = playlist(mode="randomize", reload_mode="watch", "/cache") + +# fallback préfère la source principale et bascule sur le cache si elle n'a +# rien de prêt. track_sensitive=true : on ne coupe pas un morceau en cours. +radio = fallback(track_sensitive=true, [main, backup]) + +# mksafe garantit un flux continu : silence plutôt que plantage si tout est vide. radio = mksafe(radio) # --- Sortie : flux MP3 sur http://:8000/radio.mp3 ---