radieo/ingest/radieo/api.py
Pierre-Olivier Mercier c12e522fee stream: link the now-playing title to its source page
For yt-dlp tracks the locator is the original web page, so pass it as a
url annotation, carry it through the stream metadata and history, and
turn the track title into a link back to that page (both live and in the
history). Subsonic ids are opaque and stay plain text.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-03 12:14:50 +08:00

85 lines
3 KiB
Python

"""HTTP API exposing the next track to the stream layer.
Endpoints:
GET /next -> annotated Liquidsoap URI, or an empty body when nothing
is ready (Liquidsoap then falls back to /fallback.m3u).
GET /fallback.m3u -> playlist of already-aired files, the stream's safety
net; empty (→ silence) until something has played.
GET /healthz -> "ok"
"""
import logging
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from . import config
from .db import Database
from .models import Track
from .queue import TrackQueue
log = logging.getLogger("radieo.api")
def annotate_uri(path: Path, track: Track) -> str:
"""Build an annotated Liquidsoap request URI for a cache file."""
def esc(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
fields = [f'title="{esc(track.title)}"', f'artist="{esc(track.artist)}"']
# Web page the track was pulled from, so the player can link back to the
# source. Only http(s) locators qualify (yt-dlp tracks); a Subsonic song id
# is opaque and points at no public page.
if track.locator.startswith(("http://", "https://")):
fields.append(f'url="{esc(track.locator)}"')
return f'annotate:{",".join(fields)}:{path}'
class IngestServer(ThreadingHTTPServer):
def __init__(self, address, queue: TrackQueue, db: Database):
super().__init__(address, _Handler)
self.queue = queue
self.db = db
class _Handler(BaseHTTPRequestHandler):
server: IngestServer
def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
if self.path == "/next":
self._serve_next()
elif self.path == "/fallback.m3u":
self._serve_fallback()
elif self.path == "/healthz":
self._text(200, "ok\n")
else:
self._text(404, "not found\n")
def _serve_next(self):
result = self.server.queue.pop_next()
if result is None:
# Empty body: tells Liquidsoap to use its fallback for now.
self._text(200, "")
return
path, track = result
log.info("next -> %s", track)
self._text(200, annotate_uri(path, track) + "\n")
def _serve_fallback(self):
# Only already-aired files, newest first, that still exist on disk.
lines = ["#EXTM3U"]
for p in self.server.db.played_files(config.RETENTION_KEEP):
if Path(p).exists():
lines.append(p)
self._text(200, "\n".join(lines) + "\n", "audio/x-mpegurl; charset=utf-8")
def _text(self, code: int, body: str, ctype: str = "text/plain; charset=utf-8"):
data = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", ctype)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
log.debug("%s - %s", self.address_string(), fmt % args)