radieo/ingest/radieo/api.py
Pierre-Olivier Mercier d302cf1c88
All checks were successful
continuous-integration/drone/push Build is passing
stream: scrobble listened tracks to ListenBrainz
The web player decides when a track counts as listened (caught near its
start and heard to ~90%, capped at 4 min) and triggers POST /scrobble.
The token stays server-side (RADIEO_LISTENBRAINZ_TOKEN), submitting the
listen with the canonical MusicBrainz MBID when available. Each airing is
deduplicated so multiple tabs submit it once.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-04 17:51:41 +08:00

205 lines
8.1 KiB
Python

"""HTTP API exposing the next track to the stream layer.
Endpoints:
GET /next -> annotated Liquidsoap URI, or an empty body when nothing
is ready (Liquidsoap then falls back to /fallback.m3u).
GET /fallback.m3u -> playlist of already-aired files, the stream's safety
net; empty (→ silence) until something has played.
GET /status -> JSON prefetch state {ready, prefetch}, surfaced to the
player (proxied by the stream) so it can show buffering.
GET /queue -> JSON list of the upcoming (prefetched) tracks, oldest
first, surfaced to the player (proxied by the stream).
POST /enqueue?url= -> resolve a yt-dlp URL (single track or whole playlist/
album) and queue it as priority requests; returns
{queued: N}. Proxied by the stream.
POST /dequeue?id= -> drop one upcoming track (the opaque id from /queue)
from the queue; returns {removed: bool}. Proxied by
the stream.
GET /healthz -> "ok"
"""
import json
import logging
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlsplit
from . import config
from .db import Database
from .models import Track
from .queue import TrackQueue
from .subsonic import SubsonicClient
log = logging.getLogger("radieo.api")
def annotate_uri(path: Path, track: Track) -> str:
"""Build an annotated Liquidsoap request URI for a cache file."""
def esc(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
fields = [
f'title="{esc(track.title)}"',
f'artist="{esc(track.artist)}"',
# Provider that produced the track (subsonic, ytdlp…), surfaced by the
# stream so the player can show a discreet source indicator.
f'origin="{esc(track.origin)}"',
]
# Canonical MusicBrainz recording MBID when the Canonicalizer found one.
# Surfaced on the stream so listeners can scrobble the exact recording to
# ListenBrainz (see stream/web.liq /scrobble). Passed explicitly rather than
# relying on Liquidsoap re-reading the file tags, which is format-dependent.
if track.mbid:
fields.append(f'musicbrainz_trackid="{esc(track.mbid)}"')
# Web page the track was pulled from, so the player can link back to the
# source (see Track.page_url for how it's derived per backend).
if track.page_url is not None:
fields.append(f'url="{esc(track.page_url)}"')
return f'annotate:{",".join(fields)}:{path}'
class IngestServer(ThreadingHTTPServer):
def __init__(
self,
address,
queue: TrackQueue,
db: Database,
subsonic: SubsonicClient | None = None,
):
super().__init__(address, _Handler)
self.queue = queue
self.db = db
self.subsonic = subsonic
class _Handler(BaseHTTPRequestHandler):
server: IngestServer
def do_GET(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
if self.path == "/next":
self._serve_next()
elif self.path == "/fallback.m3u":
self._serve_fallback()
elif self.path == "/status":
self._serve_status()
elif self.path == "/queue":
self._serve_queue()
elif self.path == "/healthz":
self._text(200, "ok\n")
else:
self._text(404, "not found\n")
def do_POST(self): # noqa: N802 (name imposed by BaseHTTPRequestHandler)
parsed = urlsplit(self.path)
if parsed.path == "/share":
self._serve_share(parse_qs(parsed.query))
elif parsed.path == "/enqueue":
self._serve_enqueue(parse_qs(parsed.query))
elif parsed.path == "/dequeue":
self._serve_dequeue(parse_qs(parsed.query))
else:
self._text(404, "not found\n")
def _serve_enqueue(self, query: dict[str, list[str]]):
# Queue a listener-requested yt-dlp URL (single track or whole
# playlist/album) as priority requests. Proxied here by the stream.
url = (query.get("url") or [""])[0].strip()
if not url.startswith(("http://", "https://")):
self._text(400, "missing or invalid url\n")
return
try:
count = self.server.queue.enqueue_url(url)
except Exception as exc: # yt-dlp raises many extractor-specific errors
log.warning("enqueue failed for %s: %s", url, exc)
self._text(502, "could not resolve url\n")
return
if count == 0:
self._text(404, "no track found\n")
return
self._text(
200, json.dumps({"queued": count}) + "\n", "application/json; charset=utf-8"
)
def _serve_dequeue(self, query: dict[str, list[str]]):
# Remove one upcoming track by the opaque id handed out by /queue.
# Proxied here by the stream. Unknown id (already played, or gone) → 404.
raw = (query.get("id") or [""])[0].strip()
try:
entry_id = int(raw)
except ValueError:
self._text(400, "missing or invalid id\n")
return
removed = self.server.queue.remove(entry_id)
if not removed:
self._text(404, "not in queue\n")
return
self._text(
200, json.dumps({"removed": True}) + "\n",
"application/json; charset=utf-8",
)
def _serve_share(self, query: dict[str, list[str]]):
# Mint a public Subsonic share for one song id, on demand. Called by the
# stream when a listener clicks a subsonic track's source link, so no
# share is created for tracks nobody opens.
client = self.server.subsonic
if client is None:
self._text(503, "subsonic not configured\n")
return
song_id = (query.get("id") or [""])[0]
if not song_id:
self._text(400, "missing id\n")
return
try:
url = client.create_share(song_id)
except Exception as exc: # sharing disabled, network error, bad id…
log.warning("createShare failed for %s: %s", song_id, exc)
self._text(502, "share unavailable\n")
return
self._text(
200, json.dumps({"url": url}) + "\n", "application/json; charset=utf-8"
)
def _serve_next(self):
result = self.server.queue.pop_next()
if result is None:
# Empty body: tells Liquidsoap to use its fallback for now.
self._text(200, "")
return
path, track = result
log.info("next -> %s", track)
self._text(200, annotate_uri(path, track) + "\n")
def _serve_fallback(self):
# Only already-aired files, newest first, that still exist on disk.
lines = ["#EXTM3U"]
for p in self.server.db.played_files(config.RETENTION_KEEP):
if Path(p).exists():
lines.append(p)
self._text(200, "\n".join(lines) + "\n", "audio/x-mpegurl; charset=utf-8")
def _serve_status(self):
# Prefetch progress: how many tracks are buffered vs. the target. The
# stream's initial buffer is "full" once ready reaches PREFETCH.
body = json.dumps({
"ready": self.server.queue.ready_count(),
"prefetch": config.PREFETCH,
})
self._text(200, body + "\n", "application/json; charset=utf-8")
def _serve_queue(self):
# Upcoming prefetched tracks, next first. A peek, nothing consumed.
body = json.dumps(self.server.queue.snapshot())
self._text(200, body + "\n", "application/json; charset=utf-8")
def _text(self, code: int, body: str, ctype: str = "text/plain; charset=utf-8"):
data = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", ctype)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
log.debug("%s - %s", self.address_string(), fmt % args)