Milestone 2: ingestion daemon driving the stream

Add the Python `ingest` container exposing `GET /next`, which returns the
next track as an annotated Liquidsoap URI (or an empty body when nothing is
ready). Liquidsoap switches from a static playlist to a `request.dynamic`
source pulling from the daemon, with the local cache as fallback and mksafe
for guaranteed continuous output.

For now the daemon just cycles through the files already in the cache; the
download providers (Navidrome, yt-dlp, ListenBrainz) come in later milestones.

Also commit the implementation plan (PLAN.md).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
nemunaire 2026-07-02 16:52:49 +08:00
commit f8eb0655eb
9 changed files with 247 additions and 19 deletions

View file

@ -14,9 +14,11 @@ sharing a cache volume:
- **`ingest`** (Python) — the brain. It decides what to play next, resolves and
downloads tracks into a local cache, keeps a pre-filled queue, and exposes the
next track over HTTP. *(planned — see roadmap below)*
- **`stream`** (Liquidsoap) — deliberately dumb. It broadcasts the audio over
HTTP and never goes silent thanks to a local fallback.
next track over HTTP at `GET /next`. *(currently it only serves the cache
directory; the download providers come in later milestones — see roadmap)*
- **`stream`** (Liquidsoap) — deliberately dumb. It pulls the next track from
the `ingest` daemon, broadcasts the audio over HTTP, and never goes silent
thanks to a local cache fallback.
Playback sources (planned): a [Navidrome](https://www.navidrome.org/) library
via the OpenSubsonic API, arbitrary tracks fetched with
@ -46,21 +48,25 @@ reloaded when the directory changes).
## Current status
**Milestone 1 — broadcasting skeleton: done.**
**Milestone 2 — ingestion daemon: done.**
- Liquidsoap (v2.4.5) container plays the `cache/` directory in random order.
- `ingest` (Python) container exposes `GET /next`, returning the next track as
an annotated Liquidsoap URI (or an empty body when nothing is ready).
- `stream` (Liquidsoap v2.4.5) pulls from `ingest` via a `request.dynamic`
source, and falls back to the local `cache/` directory when the daemon has
nothing to offer.
- HTTP stream served at `http://localhost:8000/radio.mp3` (MP3, 192 kbps).
- Continuous output guaranteed: silence rather than a crash when the cache is
- Continuous output guaranteed: silence rather than a crash when everything is
empty (`mksafe`).
- Multiple simultaneous listeners supported.
At this stage the playlist is filled manually; the automatic ingestion layer is
not implemented yet.
At this stage the daemon just cycles through the files already in `cache/`; the
download providers (Navidrome, yt-dlp, ListenBrainz) come next.
## Roadmap
1. ✅ **Broadcasting skeleton** — Liquidsoap serving the cache directory.
2. **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap
2. **Ingestion daemon** — Python daemon exposing `GET /next`; Liquidsoap
switches to a `request.dynamic` source with the cache as fallback.
3. **Navidrome provider** — play from an OpenSubsonic playlist, with caching,
LRU retention and play history.

View file

@ -1,9 +1,21 @@
services:
ingest:
build: ./ingest
image: radieo-ingest
volumes:
- ./cache:/cache # volume partagé avec le stream
environment:
- RADIEO_CACHE_DIR=/cache
- RADIEO_HTTP_PORT=8080
restart: unless-stopped
stream:
build: ./stream
image: radieo-stream
depends_on:
- ingest
ports:
- "8000:8000" # flux HTTP : http://localhost:8000/radio.mp3
- "8000:8000" # flux HTTP : http://localhost:8000/radio.mp3
volumes:
- ./cache:/cache:ro # jalon 1 : lecture seule, rempli à la main
- ./cache:/cache:ro # lecture seule : secours + résolution des chemins
restart: unless-stopped

12
ingest/Dockerfile Normal file
View file

@ -0,0 +1,12 @@
FROM python:3.12-slim
WORKDIR /app
# Milestone 2 uses the standard library only; third-party dependencies
# (httpx, yt-dlp, feedparser…) will be added in later milestones.
COPY radieo/ ./radieo/
ENV PYTHONUNBUFFERED=1
EXPOSE 8080
CMD ["python", "-m", "radieo"]

View file

@ -0,0 +1,5 @@
"""radieo ingestion daemon.
Decides what to play next, keeps a queue of ready tracks and exposes the next
one over HTTP for the Liquidsoap stream layer to pick up.
"""

35
ingest/radieo/__main__.py Normal file
View file

@ -0,0 +1,35 @@
"""Entry point: start the HTTP API serving the track queue."""
import logging
from . import config
from .api import IngestServer
from .queue import TrackQueue
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
log = logging.getLogger("radieo")
config.CACHE_DIR.mkdir(parents=True, exist_ok=True)
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), TrackQueue())
log.info(
"ingest listening on %s:%d (cache=%s)",
config.HTTP_HOST,
config.HTTP_PORT,
config.CACHE_DIR,
)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
if __name__ == "__main__":
main()

68
ingest/radieo/api.py Normal file
View file

@ -0,0 +1,68 @@
"""HTTP API exposing the next track to the stream layer.
Endpoints:
GET /next -> annotated Liquidsoap URI, or an empty body when nothing is
ready (Liquidsoap then falls back to the local cache).
GET /healthz -> "ok"
"""
import logging
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from .queue import TrackQueue
log = logging.getLogger("radieo.api")
def annotate_uri(path: Path) -> str:
"""Build an annotated Liquidsoap request URI for a cache file.
Metadata is minimal for now (title derived from the filename); real
metadata will come from the providers in later milestones.
"""
def esc(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
title = esc(path.stem)
artist = esc("radieo")
return f'annotate:title="{title}",artist="{artist}":{path}'
class IngestServer(ThreadingHTTPServer):
def __init__(self, address, queue: TrackQueue):
super().__init__(address, _Handler)
self.queue = queue
class _Handler(BaseHTTPRequestHandler):
server: IngestServer
def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
if self.path == "/next":
self._serve_next()
elif self.path == "/healthz":
self._text(200, "ok\n")
else:
self._text(404, "not found\n")
def _serve_next(self):
track = self.server.queue.pop_next()
if track is None:
# Empty body: tells Liquidsoap to use its fallback for now.
self._text(200, "")
return
log.info("next -> %s", track.name)
self._text(200, annotate_uri(track) + "\n")
def _text(self, code: int, body: str):
data = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
log.debug("%s - %s", self.address_string(), fmt % args)

18
ingest/radieo/config.py Normal file
View file

@ -0,0 +1,18 @@
"""Runtime configuration, read from the environment.
Kept intentionally small; later milestones will add source credentials,
weights and retention settings here (or in a config file).
"""
import os
from pathlib import Path
# Directory shared with the stream container. Paths returned to Liquidsoap must
# be valid inside *that* container, so both mount the cache at the same path.
CACHE_DIR = Path(os.environ.get("RADIEO_CACHE_DIR", "/cache"))
HTTP_HOST = os.environ.get("RADIEO_HTTP_HOST", "0.0.0.0")
HTTP_PORT = int(os.environ.get("RADIEO_HTTP_PORT", "8080"))
# File extensions considered playable when scanning the cache.
AUDIO_EXTENSIONS = {".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"}

50
ingest/radieo/queue.py Normal file
View file

@ -0,0 +1,50 @@
"""Queue of tracks ready to be served to the stream layer.
Milestone 2: a track is simply an audio file already present in the cache
directory. When the queue drains, it is refilled with a fresh shuffle of the
available files. Later milestones will replace the refill logic with providers
that download tracks (Navidrome, yt-dlp, ) before enqueuing them.
"""
import random
import threading
from collections import deque
from pathlib import Path
from . import config
class TrackQueue:
def __init__(self):
self._lock = threading.Lock()
self._upcoming: deque[Path] = deque()
self._last_served: Path | None = None
def _available_files(self) -> list[Path]:
if not config.CACHE_DIR.is_dir():
return []
return sorted(
p
for p in config.CACHE_DIR.iterdir()
if p.is_file() and p.suffix.lower() in config.AUDIO_EXTENSIONS
)
def _refill_locked(self) -> None:
files = self._available_files()
if not files:
return
# Avoid replaying the last served track back-to-back when we can.
pool = [f for f in files if f != self._last_served] or files
random.shuffle(pool)
self._upcoming.extend(pool)
def pop_next(self) -> Path | None:
"""Return the next track to play, or None if the cache is empty."""
with self._lock:
if not self._upcoming:
self._refill_locked()
if not self._upcoming:
return None
track = self._upcoming.popleft()
self._last_served = track
return track

View file

@ -1,9 +1,9 @@
#!/usr/bin/liquidsoap
# radieo — couche diffusion (jalon 1)
# Joue le dossier /cache en boucle aléatoire et le diffuse en HTTP.
# Les jalons suivants remplaceront la source par un request.dynamic piloté
# par le daemon d'ingestion, en gardant ce dossier comme secours.
# radieo — couche diffusion (jalon 2)
# La source principale est pilotée par le daemon d'ingestion via GET /next.
# Le dossier /cache sert de secours quand le daemon n'a rien à proposer
# (daemon indisponible, file momentanément vide…). Si tout est vide : silence.
# --- Journalisation : tout sur la sortie standard (pratique en conteneur) ---
settings.log.stdout := true
@ -13,11 +13,33 @@ settings.log.level := 3
# --- Harbor : écoute sur toutes les interfaces du conteneur ---
settings.harbor.bind_addrs := ["0.0.0.0"]
# --- Source : le dossier de cache, rechargé quand son contenu change ---
radio = playlist(mode="randomize", reload_mode="watch", "/cache")
# URL du daemon d'ingestion (nom de service résolu par docker-compose).
ingest_url = "http://ingest:8080/next"
# mksafe garantit un flux continu : si la source échoue ou est vide,
# Liquidsoap émet du silence plutôt que de planter.
# Callback appelé par request.dynamic pour obtenir le prochain morceau.
# Renvoie une requête à jouer, ou null() si rien n'est disponible (→ secours).
def next_track() =
resp = http.get(ingest_url, timeout=5.0)
body = string.trim(resp)
if resp.status_code == 200 and body != "" then
request.create(body)
else
null
end
end
# Source principale : pilotée par le daemon. prefetch=1 pour anticiper le
# prochain morceau ; retry_delay pour ne pas marteler le daemon en cas de vide.
main = request.dynamic(next_track, prefetch=1, retry_delay=1.0)
# Secours : le cache local, joué en aléatoire, rechargé quand il change.
backup = playlist(mode="randomize", reload_mode="watch", "/cache")
# fallback préfère la source principale et bascule sur le cache si elle n'a
# rien de prêt. track_sensitive=true : on ne coupe pas un morceau en cours.
radio = fallback(track_sensitive=true, [main, backup])
# mksafe garantit un flux continu : silence plutôt que plantage si tout est vide.
radio = mksafe(radio)
# --- Sortie : flux MP3 sur http://<hote>:8000/radio.mp3 ---