radieo/ingest/radieo/api.py
Pierre-Olivier Mercier 62302ac21d stream: show the queue of upcoming tracks (/queue)
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-04 11:18:00 +08:00

114 lines
4.3 KiB
Python

"""HTTP API exposing the next track to the stream layer.
Endpoints:
GET /next -> annotated Liquidsoap URI, or an empty body when nothing
is ready (Liquidsoap then falls back to /fallback.m3u).
GET /fallback.m3u -> playlist of already-aired files, the stream's safety
net; empty (→ silence) until something has played.
GET /status -> JSON prefetch state {ready, prefetch}, surfaced to the
player (proxied by the stream) so it can show buffering.
GET /queue -> JSON list of the upcoming (prefetched) tracks, oldest
first, surfaced to the player (proxied by the stream).
GET /healthz -> "ok"
"""
import json
import logging
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from . import config
from .db import Database
from .models import Track
from .queue import TrackQueue
log = logging.getLogger("radieo.api")
def annotate_uri(path: Path, track: Track) -> str:
"""Build an annotated Liquidsoap request URI for a cache file."""
def esc(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
fields = [
f'title="{esc(track.title)}"',
f'artist="{esc(track.artist)}"',
# Provider that produced the track (subsonic, ytdlp…), surfaced by the
# stream so the player can show a discreet source indicator.
f'origin="{esc(track.origin)}"',
]
# Web page the track was pulled from, so the player can link back to the
# source. Only http(s) locators qualify (yt-dlp tracks); a Subsonic song id
# is opaque and points at no public page.
if track.locator.startswith(("http://", "https://")):
fields.append(f'url="{esc(track.locator)}"')
return f'annotate:{",".join(fields)}:{path}'
class IngestServer(ThreadingHTTPServer):
def __init__(self, address, queue: TrackQueue, db: Database):
super().__init__(address, _Handler)
self.queue = queue
self.db = db
class _Handler(BaseHTTPRequestHandler):
server: IngestServer
def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
if self.path == "/next":
self._serve_next()
elif self.path == "/fallback.m3u":
self._serve_fallback()
elif self.path == "/status":
self._serve_status()
elif self.path == "/queue":
self._serve_queue()
elif self.path == "/healthz":
self._text(200, "ok\n")
else:
self._text(404, "not found\n")
def _serve_next(self):
result = self.server.queue.pop_next()
if result is None:
# Empty body: tells Liquidsoap to use its fallback for now.
self._text(200, "")
return
path, track = result
log.info("next -> %s", track)
self._text(200, annotate_uri(path, track) + "\n")
def _serve_fallback(self):
# Only already-aired files, newest first, that still exist on disk.
lines = ["#EXTM3U"]
for p in self.server.db.played_files(config.RETENTION_KEEP):
if Path(p).exists():
lines.append(p)
self._text(200, "\n".join(lines) + "\n", "audio/x-mpegurl; charset=utf-8")
def _serve_status(self):
# Prefetch progress: how many tracks are buffered vs. the target. The
# stream's initial buffer is "full" once ready reaches PREFETCH.
body = json.dumps({
"ready": self.server.queue.ready_count(),
"prefetch": config.PREFETCH,
})
self._text(200, body + "\n", "application/json; charset=utf-8")
def _serve_queue(self):
# Upcoming prefetched tracks, next first. A peek, nothing consumed.
body = json.dumps(self.server.queue.snapshot())
self._text(200, body + "\n", "application/json; charset=utf-8")
def _text(self, code: int, body: str, ctype: str = "text/plain; charset=utf-8"):
data = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", ctype)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
log.debug("%s - %s", self.address_string(), fmt % args)