stream: fallback only replays already-aired tracks
The fallback played the whole /cache directory, which at cold start holds only the 2-3 tracks being pre-fetched — so it looped them until the request.dynamic buffer filled. Restrict the fallback to tracks already aired: the ingest daemon exposes them at GET /fallback.m3u (played_at set, still on disk), and the stream fetches that into a local /tmp/fallback.m3u that playlist watches. Cold start is now silent (assumed) instead of a tight loop, and a mid-stream drain degrades across the whole listening history. A local file (not a remote playlist URL) is used to avoid Liquidsoap's http resolver mis-sniffing the response as text/html; mime_type is forced so an empty header-only m3u still parses. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
3ff4e24872
commit
80f27d2795
5 changed files with 69 additions and 16 deletions
14
README.md
14
README.md
|
|
@ -17,8 +17,8 @@ sharing a cache volume:
|
||||||
next track over HTTP at `GET /next`. *(currently it only serves the cache
|
next track over HTTP at `GET /next`. *(currently it only serves the cache
|
||||||
directory; the download providers come in later milestones — see roadmap)*
|
directory; the download providers come in later milestones — see roadmap)*
|
||||||
- **`stream`** (Liquidsoap) — deliberately dumb. It pulls the next track from
|
- **`stream`** (Liquidsoap) — deliberately dumb. It pulls the next track from
|
||||||
the `ingest` daemon, broadcasts the audio over HTTP, and never goes silent
|
the `ingest` daemon, broadcasts the audio over HTTP, and falls back to the
|
||||||
thanks to a local cache fallback.
|
already-aired tracks (via `/fallback.m3u`) if the daemon has nothing ready.
|
||||||
|
|
||||||
Playback sources (planned): a [Navidrome](https://www.navidrome.org/) library
|
Playback sources (planned): a [Navidrome](https://www.navidrome.org/) library
|
||||||
via the OpenSubsonic API, arbitrary tracks fetched with
|
via the OpenSubsonic API, arbitrary tracks fetched with
|
||||||
|
|
@ -89,9 +89,13 @@ Open the player at `http://localhost:8000/` — a small web page with an
|
||||||
`/nowplaying` (JSON), fed from the broadcast source's live metadata — so it
|
`/nowplaying` (JSON), fed from the broadcast source's live metadata — so it
|
||||||
is accurate even though the ingest daemon runs a track ahead (prefetch).
|
is accurate even though the ingest daemon runs a track ahead (prefetch).
|
||||||
- A 3 s crossfade smooths transitions between tracks.
|
- A 3 s crossfade smooths transitions between tracks.
|
||||||
- The fallback playlist now ignores non-audio and hidden files (`.gitkeep`,
|
- The fallback only replays tracks **already aired**: the ingest daemon exposes
|
||||||
in-progress `.part`), so the earlier startup ffmpeg "Invalid data" warnings
|
them as an `/fallback.m3u` playlist (served into a local file the stream
|
||||||
are gone.
|
watches). The pre-fetch buffer (downloaded but not-yet-played tracks) is
|
||||||
|
excluded, so at cold start the list is empty and the stream stays **silent**
|
||||||
|
rather than looping the two or three tracks being pre-fetched. Once the buffer
|
||||||
|
drains mid-stream, it degrades gracefully across every track heard so far
|
||||||
|
instead of a tight loop.
|
||||||
- Robustness: the ingest daemon shuts down cleanly on SIGTERM (fast
|
- Robustness: the ingest daemon shuts down cleanly on SIGTERM (fast
|
||||||
`docker compose down`), has a Docker healthcheck on `/healthz` (the stream
|
`docker compose down`), has a Docker healthcheck on `/healthz` (the stream
|
||||||
waits for it to be healthy), and outgoing HTTP calls retry transient
|
waits for it to be healthy), and outgoing HTTP calls retry transient
|
||||||
|
|
|
||||||
|
|
@ -153,7 +153,7 @@ def main() -> None:
|
||||||
queue = TrackQueue(scheduler, fetchers, db)
|
queue = TrackQueue(scheduler, fetchers, db)
|
||||||
queue.start()
|
queue.start()
|
||||||
|
|
||||||
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue)
|
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue, db)
|
||||||
log.info(
|
log.info(
|
||||||
"ingest listening on %s:%d (cache=%s, state=%s)",
|
"ingest listening on %s:%d (cache=%s, state=%s)",
|
||||||
config.HTTP_HOST,
|
config.HTTP_HOST,
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,10 @@
|
||||||
"""HTTP API exposing the next track to the stream layer.
|
"""HTTP API exposing the next track to the stream layer.
|
||||||
|
|
||||||
Endpoints:
|
Endpoints:
|
||||||
GET /next -> annotated Liquidsoap URI, or an empty body when nothing is
|
GET /next -> annotated Liquidsoap URI, or an empty body when nothing
|
||||||
ready (Liquidsoap then falls back to the local cache).
|
is ready (Liquidsoap then falls back to /fallback.m3u).
|
||||||
|
GET /fallback.m3u -> playlist of already-aired files, the stream's safety
|
||||||
|
net; empty (→ silence) until something has played.
|
||||||
GET /healthz -> "ok"
|
GET /healthz -> "ok"
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -10,6 +12,8 @@ import logging
|
||||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
from . import config
|
||||||
|
from .db import Database
|
||||||
from .models import Track
|
from .models import Track
|
||||||
from .queue import TrackQueue
|
from .queue import TrackQueue
|
||||||
|
|
||||||
|
|
@ -29,9 +33,10 @@ def annotate_uri(path: Path, track: Track) -> str:
|
||||||
|
|
||||||
|
|
||||||
class IngestServer(ThreadingHTTPServer):
|
class IngestServer(ThreadingHTTPServer):
|
||||||
def __init__(self, address, queue: TrackQueue):
|
def __init__(self, address, queue: TrackQueue, db: Database):
|
||||||
super().__init__(address, _Handler)
|
super().__init__(address, _Handler)
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
|
||||||
class _Handler(BaseHTTPRequestHandler):
|
class _Handler(BaseHTTPRequestHandler):
|
||||||
|
|
@ -40,6 +45,8 @@ class _Handler(BaseHTTPRequestHandler):
|
||||||
def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
|
def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
|
||||||
if self.path == "/next":
|
if self.path == "/next":
|
||||||
self._serve_next()
|
self._serve_next()
|
||||||
|
elif self.path == "/fallback.m3u":
|
||||||
|
self._serve_fallback()
|
||||||
elif self.path == "/healthz":
|
elif self.path == "/healthz":
|
||||||
self._text(200, "ok\n")
|
self._text(200, "ok\n")
|
||||||
else:
|
else:
|
||||||
|
|
@ -55,10 +62,18 @@ class _Handler(BaseHTTPRequestHandler):
|
||||||
log.info("next -> %s", track)
|
log.info("next -> %s", track)
|
||||||
self._text(200, annotate_uri(path, track) + "\n")
|
self._text(200, annotate_uri(path, track) + "\n")
|
||||||
|
|
||||||
def _text(self, code: int, body: str):
|
def _serve_fallback(self):
|
||||||
|
# Only already-aired files, newest first, that still exist on disk.
|
||||||
|
lines = ["#EXTM3U"]
|
||||||
|
for p in self.server.db.played_files(config.RETENTION_KEEP):
|
||||||
|
if Path(p).exists():
|
||||||
|
lines.append(p)
|
||||||
|
self._text(200, "\n".join(lines) + "\n", "audio/x-mpegurl; charset=utf-8")
|
||||||
|
|
||||||
|
def _text(self, code: int, body: str, ctype: str = "text/plain; charset=utf-8"):
|
||||||
data = body.encode("utf-8")
|
data = body.encode("utf-8")
|
||||||
self.send_response(code)
|
self.send_response(code)
|
||||||
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
self.send_header("Content-Type", ctype)
|
||||||
self.send_header("Content-Length", str(len(data)))
|
self.send_header("Content-Length", str(len(data)))
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(data)
|
self.wfile.write(data)
|
||||||
|
|
|
||||||
|
|
@ -141,6 +141,20 @@ class Database:
|
||||||
(path, track_key),
|
(path, track_key),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def played_files(self, limit: int) -> list[str]:
|
||||||
|
"""Files already aired, newest first (the stream's fallback pool).
|
||||||
|
|
||||||
|
Only played tracks appear here, so files still being pre-fetched are
|
||||||
|
never served as fallback — that is what stopped the cold-start loop.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
rows = self._conn.execute(
|
||||||
|
"SELECT path FROM cache_files WHERE played_at IS NOT NULL"
|
||||||
|
" ORDER BY played_at DESC LIMIT ?",
|
||||||
|
(limit,),
|
||||||
|
).fetchall()
|
||||||
|
return [r["path"] for r in rows]
|
||||||
|
|
||||||
def mark_played(self, path: str) -> None:
|
def mark_played(self, path: str) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._conn.execute(
|
self._conn.execute(
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@ end
|
||||||
main = request.dynamic(next_track, prefetch=1, retry_delay=1.0)
|
main = request.dynamic(next_track, prefetch=1, retry_delay=1.0)
|
||||||
|
|
||||||
# Filtre du secours : ne garder que les vrais fichiers audio et ignorer les
|
# Filtre du secours : ne garder que les vrais fichiers audio et ignorer les
|
||||||
# fichiers cachés (.gitkeep, téléchargements .part en cours). Évite que le
|
# fichiers cachés (.gitkeep, téléchargements .part en cours). Ceinture et
|
||||||
# décodeur ne tente — et logue en erreur — des fichiers non-audio.
|
# bretelles : la m3u ne liste déjà que des fichiers audio réels.
|
||||||
audio_ext = [".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"]
|
audio_ext = [".mp3", ".flac", ".ogg", ".opus", ".m4a", ".aac", ".wav"]
|
||||||
def audio_only(r) =
|
def audio_only(r) =
|
||||||
u = string.case(lower=true, request.uri(r))
|
u = string.case(lower=true, request.uri(r))
|
||||||
|
|
@ -44,9 +44,29 @@ def audio_only(r) =
|
||||||
is_audio and not string.contains(prefix=".", base)
|
is_audio and not string.contains(prefix=".", base)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Secours : le cache local, joué en aléatoire, rechargé quand il change.
|
# Secours : uniquement les morceaux DÉJÀ passés à l'antenne. Le daemon les
|
||||||
|
# expose en m3u (/fallback.m3u) ; on la récupère périodiquement dans un fichier
|
||||||
|
# LOCAL que playlist surveille. Le buffer de préchargement (morceaux pas encore
|
||||||
|
# diffusés) en est exclu : au démarrage à froid la liste est vide → silence
|
||||||
|
# assumé plutôt qu'une boucle sur 2-3 titres.
|
||||||
|
# Passer par un fichier local (plutôt qu'une URL directe dans playlist) évite la
|
||||||
|
# détection de type hasardeuse du résolveur http, et garde la dernière liste
|
||||||
|
# valide si l'ingest devient injoignable.
|
||||||
|
fallback_url = "http://ingest:8080/fallback.m3u"
|
||||||
|
fallback_file = "/tmp/fallback.m3u"
|
||||||
|
file.write(data="#EXTM3U\n", atomic=true, fallback_file) # amorce vide au boot
|
||||||
|
|
||||||
|
def refresh_fallback() =
|
||||||
|
resp = http.get(fallback_url, timeout=5.0)
|
||||||
|
if resp.status_code == 200 then
|
||||||
|
file.write(data=resp, atomic=true, fallback_file)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
thread.run(fast=false, every={30.}, refresh_fallback)
|
||||||
|
|
||||||
backup = playlist(
|
backup = playlist(
|
||||||
mode="randomize", reload_mode="watch", check_next=audio_only, "/cache"
|
mode="randomize", reload_mode="watch", mime_type="audio/x-mpegurl",
|
||||||
|
check_next=audio_only, fallback_file
|
||||||
)
|
)
|
||||||
|
|
||||||
# fallback préfère la source principale et bascule sur le cache si elle n'a
|
# fallback préfère la source principale et bascule sur le cache si elle n'a
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue