stream: let listeners queue a yt-dlp URL on request
All checks were successful
continuous-integration/drone/push Build is passing

Add an input in the queue tab to enqueue a yt-dlp URL: a single track, or
a whole playlist/album. Requests are a priority lane in the ingest queue —
pop_next serves them before the auto radio, so the next /next plays the
request without cutting the current track. They download lazily (a few
ahead), so a large playlist queues instantly and bypasses anti-repeat.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
nemunaire 2026-07-04 12:18:06 +08:00
commit 493e55ed18
5 changed files with 225 additions and 22 deletions

View file

@ -153,6 +153,12 @@ def main() -> None:
log.info("Canonicalizer disabled: tracks keyed by (artist, title).") log.info("Canonicalizer disabled: tracks keyed by (artist, title).")
providers, fetchers, subsonic_client = _build_pipeline(db, canonicalizer) providers, fetchers, subsonic_client = _build_pipeline(db, canonicalizer)
# Listener requests (POST /enqueue) are always yt-dlp URLs, so make sure the
# yt-dlp fetcher exists even when the yt-dlp *source* is disabled.
if "ytdlp" not in fetchers:
from .fetchers.ytdlp import YtdlpFetcher
fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR, canonicalizer)
scheduler = Scheduler(providers, canonicalizer, db) scheduler = Scheduler(providers, canonicalizer, db)
queue = TrackQueue(scheduler, fetchers, db) queue = TrackQueue(scheduler, fetchers, db)
queue.start() queue.start()

View file

@ -9,6 +9,9 @@ Endpoints:
player (proxied by the stream) so it can show buffering. player (proxied by the stream) so it can show buffering.
GET /queue -> JSON list of the upcoming (prefetched) tracks, oldest GET /queue -> JSON list of the upcoming (prefetched) tracks, oldest
first, surfaced to the player (proxied by the stream). first, surfaced to the player (proxied by the stream).
POST /enqueue?url= -> resolve a yt-dlp URL (single track or whole playlist/
album) and queue it as priority requests; returns
{queued: N}. Proxied by the stream.
GET /healthz -> "ok" GET /healthz -> "ok"
""" """
@ -82,9 +85,31 @@ class _Handler(BaseHTTPRequestHandler):
parsed = urlsplit(self.path) parsed = urlsplit(self.path)
if parsed.path == "/share": if parsed.path == "/share":
self._serve_share(parse_qs(parsed.query)) self._serve_share(parse_qs(parsed.query))
elif parsed.path == "/enqueue":
self._serve_enqueue(parse_qs(parsed.query))
else: else:
self._text(404, "not found\n") self._text(404, "not found\n")
def _serve_enqueue(self, query: dict[str, list[str]]):
# Queue a listener-requested yt-dlp URL (single track or whole
# playlist/album) as priority requests. Proxied here by the stream.
url = (query.get("url") or [""])[0].strip()
if not url.startswith(("http://", "https://")):
self._text(400, "missing or invalid url\n")
return
try:
count = self.server.queue.enqueue_url(url)
except Exception as exc: # yt-dlp raises many extractor-specific errors
log.warning("enqueue failed for %s: %s", url, exc)
self._text(502, "could not resolve url\n")
return
if count == 0:
self._text(404, "no track found\n")
return
self._text(
200, json.dumps({"queued": count}) + "\n", "application/json; charset=utf-8"
)
def _serve_share(self, query: dict[str, list[str]]): def _serve_share(self, query: dict[str, list[str]]):
# Mint a public Subsonic share for one song id, on demand. Called by the # Mint a public Subsonic share for one song id, on demand. Called by the
# stream when a listener clicks a subsonic track's source link, so no # stream when a listener clicks a subsonic track's source link, so no

View file

@ -5,6 +5,13 @@ scheduler what to play next, hands the track to the fetcher registered for its
backend, and enqueues the resulting file. ``pop_next`` hands the oldest ready backend, and enqueues the resulting file. ``pop_next`` hands the oldest ready
track to the HTTP API, records the play and runs LRU retention. track to the HTTP API, records the play and runs LRU retention.
On top of that automatic radio, listeners can push explicit **requests** (a
yt-dlp URL a single track, or a whole playlist/album). Requests are a
priority lane: ``pop_next`` drains them before the auto buffer, so the very next
``/next`` returns the requested music. They are downloaded lazily (only a few
ahead of playback), so a large playlist queues instantly without pulling every
track at once, and they bypass anti-repeat since the listener asked for them.
If no source has anything (e.g. nothing configured, or all unreachable), the If no source has anything (e.g. nothing configured, or all unreachable), the
buffer simply stays empty and ``pop_next`` returns ``None`` the stream then buffer simply stays empty and ``pop_next`` returns ``None`` the stream then
plays its own local-cache fallback. plays its own local-cache fallback.
@ -29,6 +36,11 @@ class TrackQueue:
self._db = db self._db = db
self._lock = threading.Lock() self._lock = threading.Lock()
self._ready: deque[tuple[Path, Track]] = deque() self._ready: deque[tuple[Path, Track]] = deque()
# Listener requests: pending references not yet downloaded, and a small
# priority buffer of the ones already fetched. ``pop_next`` serves
# ``_ready_req`` before ``_ready``.
self._requests: deque[Track] = deque()
self._ready_req: deque[tuple[Path, Track]] = deque()
self._stop = threading.Event() self._stop = threading.Event()
self._thread = threading.Thread( self._thread = threading.Thread(
target=self._run, name="prefetch", daemon=True target=self._run, name="prefetch", daemon=True
@ -51,6 +63,16 @@ class TrackQueue:
self._stop.wait(config.PREFETCH_INTERVAL) self._stop.wait(config.PREFETCH_INTERVAL)
def _prefetch(self) -> None: def _prefetch(self) -> None:
# Listener requests come first: fetch a few ahead into the priority
# buffer, leaving the rest as pending references (downloaded as slots
# free) so a large playlist doesn't pull every track at once.
while not self._stop.is_set():
with self._lock:
if not self._requests or len(self._ready_req) >= config.PREFETCH:
break
track = self._requests.popleft()
self._fetch_into(track, self._ready_req)
# Then top up the automatic radio buffer.
with self._lock: with self._lock:
missing = config.PREFETCH - len(self._ready) missing = config.PREFETCH - len(self._ready)
for _ in range(max(0, missing)): for _ in range(max(0, missing)):
@ -59,20 +81,65 @@ class TrackQueue:
track = self._scheduler.next() track = self._scheduler.next()
if track is None: if track is None:
return # nothing to fetch right now return # nothing to fetch right now
self._fetch_into(track, self._ready)
def _fetch_into(self, track: Track, target: "deque[tuple[Path, Track]]") -> None:
"""Download ``track`` and append the ready ``(path, track)`` to ``target``.
Shared by the automatic radio and the request lane. Never raises: a
missing fetcher or a failed download is logged and skipped so the
prefetch loop keeps going.
"""
fetcher = self._fetchers.get(track.backend) fetcher = self._fetchers.get(track.backend)
if fetcher is None: if fetcher is None:
log.error("no fetcher for backend %r (%s)", track.backend, track) log.error("no fetcher for backend %r (%s)", track.backend, track)
continue return
try: try:
# Fetchers may refine the Track's metadata (e.g. correcting a # Fetchers may refine the Track's metadata (e.g. correcting a
# bandcamp label account to the real artist), so take it back. # bandcamp label account to the real artist), so take it back.
path, track = fetcher.fetch(track) path, track = fetcher.fetch(track)
except Exception: except Exception:
log.exception("fetch failed for %s", track) log.exception("fetch failed for %s", track)
continue return
self._db.register_download(str(path), track.key) self._db.register_download(str(path), track.key)
with self._lock: with self._lock:
self._ready.append((path, track)) target.append((path, track))
# --- listener requests ------------------------------------------------
def enqueue_url(self, url: str) -> int:
"""Resolve a yt-dlp URL and queue it as priority requests.
A container URL (playlist/album) expands to all its tracks; a direct
track URL yields a single one. Returns how many tracks were queued.
Raises on a URL yt-dlp cannot resolve, so the caller can report it.
"""
from . import tagging
from .providers.ytdlp import YtdlpProvider
entries = YtdlpProvider._extract(url)
tracks = []
for entry in entries:
locator = entry["url"]
# Split "Artist - Title" the same way the provider/fetcher do, so
# the queued metadata matches what plays.
guess = tagging.guess_metadata(
{"title": entry.get("title"), "artist": entry.get("artist")}
)
title = guess.title if entry.get("title") else locator
tracks.append(Track(
backend="ytdlp",
locator=locator,
artist=guess.artist,
title=title,
origin="request",
source_url=url if url != locator else None,
))
if tracks:
with self._lock:
self._requests.extend(tracks)
log.info("queued %d requested track(s) from %s", len(tracks), url)
return len(tracks)
# --- introspection ---------------------------------------------------- # --- introspection ----------------------------------------------------
@ -82,16 +149,23 @@ class TrackQueue:
return len(self._ready) return len(self._ready)
def snapshot(self) -> list[dict]: def snapshot(self) -> list[dict]:
"""Display metadata of the upcoming tracks, oldest (next) first. """Display metadata of the upcoming tracks, in play order (next first).
A peek at the prefetch buffer for the player's "up next" view; it does A peek at the buffers for the player's "up next" view; it does not
not consume anything. Mirrors the fields exposed for the current track consume anything. Requests come first (downloaded, then still-pending
(see ``annotate_uri``): a source ``url`` only for http(s) locators. references), then the automatic radio buffer the same order
``pop_next`` serves them. Mirrors the fields exposed for the current
track (see ``annotate_uri``): a source ``url`` only for http(s)
locators.
""" """
with self._lock: with self._lock:
ready = list(self._ready) upcoming = (
[t for _p, t in self._ready_req]
+ list(self._requests)
+ [t for _p, t in self._ready]
)
items = [] items = []
for _path, track in ready: for track in upcoming:
entry = { entry = {
"title": track.title, "title": track.title,
"artist": track.artist, "artist": track.artist,
@ -106,9 +180,13 @@ class TrackQueue:
def pop_next(self) -> tuple[Path, Track] | None: def pop_next(self) -> tuple[Path, Track] | None:
with self._lock: with self._lock:
if not self._ready: # Requests preempt the automatic radio: the next /next serves them.
return None if self._ready_req:
path, track = self._ready_req.popleft()
elif self._ready:
path, track = self._ready.popleft() path, track = self._ready.popleft()
else:
return None
self._db.mark_played(str(path)) self._db.mark_played(str(path))
self._db.record_play(track) self._db.record_play(track)
self._evict() self._evict()

View file

@ -157,6 +157,25 @@
.queue .q-num { color: #6b6480; font-size: .8rem; font-variant-numeric: tabular-nums; .queue .q-num { color: #6b6480; font-size: .8rem; font-variant-numeric: tabular-nums;
min-width: 1.2em; text-align: right; } min-width: 1.2em; text-align: right; }
.queue .q-meta { flex: 1; min-width: 0; } .queue .q-meta { flex: 1; min-width: 0; }
/* Ajout à la file : un input d'URL + bouton, calqués sur le bloc « share ».
Le message de retour s'affiche discrètement sous le formulaire, en rouge
tamisé quand c'est une erreur. */
.enqueue { display: flex; gap: .5rem; margin-bottom: .75rem; }
.enqueue input {
flex: 1; min-width: 0; padding: .55rem .7rem; font-size: .85rem;
color: #cfc9de; background: rgba(255,255,255,.05);
border: 1px solid rgba(255,255,255,.1); border-radius: 10px;
}
.enqueue button {
padding: .55rem .9rem; font-size: .8rem; font-weight: 600; cursor: pointer;
color: #f2f0f7; background: rgba(155,140,255,.18);
border: 1px solid rgba(155,140,255,.35); border-radius: 10px;
transition: background .15s;
}
.enqueue button:hover { background: rgba(155,140,255,.3); }
.enqueue button:disabled { opacity: .5; cursor: default; }
.enqueue-msg { font-size: .8rem; color: #8ad9a0; margin-bottom: .5rem; }
.enqueue-msg.err { color: #e08b9b; }
.dot { display: inline-block; width: 8px; height: 8px; border-radius: 50%; .dot { display: inline-block; width: 8px; height: 8px; border-radius: 50%;
background: #4ade80; margin-right: .4rem; vertical-align: middle; background: #4ade80; margin-right: .4rem; vertical-align: middle;
box-shadow: 0 0 0 0 rgba(74,222,128,.6); animation: pulse 2s infinite; } box-shadow: 0 0 0 0 rgba(74,222,128,.6); animation: pulse 2s infinite; }
@ -193,6 +212,12 @@
<ul id="historyList"></ul> <ul id="historyList"></ul>
</section> </section>
<section class="tab-panel queue" id="panelQueue" role="tabpanel" aria-labelledby="tabQueue" hidden> <section class="tab-panel queue" id="panelQueue" role="tabpanel" aria-labelledby="tabQueue" hidden>
<form class="enqueue" id="enqueueForm">
<input id="enqueueUrl" type="url" inputmode="url"
placeholder="URL yt-dlp (piste, playlist ou album)…" required>
<button type="submit" id="enqueueBtn">Ajouter</button>
</form>
<div class="enqueue-msg" id="enqueueMsg" hidden></div>
<ul id="queueList"></ul> <ul id="queueList"></ul>
</section> </section>
</div> </div>
@ -245,6 +270,8 @@
// Filet de secours : morceau rejoué depuis le cache local (déjà diffusé), // Filet de secours : morceau rejoué depuis le cache local (déjà diffusé),
// quand l'ingest n'a rien à proposer (démarrage, panne…). // quand l'ingest n'a rien à proposer (démarrage, panne…).
cache: "le cache local", cache: "le cache local",
// Morceau mis en file par un auditeur via l'input « Ajouter à la file ».
request: "une demande",
}; };
// Tant que le buffer de préchargement (PREFETCH côté ingest) n'est pas // Tant que le buffer de préchargement (PREFETCH côté ingest) n'est pas
@ -450,6 +477,47 @@
}).join(""); }).join("");
} catch (e) { /* keep last known values */ } } catch (e) { /* keep last known values */ }
} }
// Ajout à la file : on POST l'URL au stream, qui la relaie à l'ingest. Ce
// dernier résout l'URL (piste seule, ou playlist/album entier) et la place
// en file prioritaire — le prochain morceau diffusé sera la demande. On
// affiche un retour bref puis on rafraîchit la file. La résolution d'une
// grosse playlist peut prendre quelques secondes.
const enqueueForm = document.getElementById("enqueueForm");
const enqueueUrl = document.getElementById("enqueueUrl");
const enqueueBtn = document.getElementById("enqueueBtn");
const enqueueMsg = document.getElementById("enqueueMsg");
function showEnqueueMsg(text, ok) {
enqueueMsg.textContent = text;
enqueueMsg.classList.toggle("err", !ok);
enqueueMsg.hidden = false;
}
enqueueForm.addEventListener("submit", async (e) => {
e.preventDefault();
const link = enqueueUrl.value.trim();
if (!link) return;
const prev = enqueueBtn.textContent;
enqueueBtn.disabled = true;
enqueueBtn.textContent = "…";
try {
const r = await fetch("/enqueue?url=" + encodeURIComponent(link), { method: "POST" });
if (r.ok) {
const d = await r.json().catch(() => ({}));
const n = Number(d.queued) || 0;
showEnqueueMsg(n > 1 ? `${n} titres ajoutés à la file` : "1 titre ajouté à la file", true);
enqueueUrl.value = "";
pollQueue();
} else {
showEnqueueMsg("Impossible de résoudre cette URL", false);
}
} catch (err) {
showEnqueueMsg("Erreur réseau", false);
} finally {
enqueueBtn.disabled = false;
enqueueBtn.textContent = prev;
}
});
// Lien nu du flux, à ouvrir dans un lecteur externe (VLC…). // Lien nu du flux, à ouvrir dans un lecteur externe (VLC…).
const shareUrl = location.origin + "/radio.mp3"; const shareUrl = location.origin + "/radio.mp3";
const urlEl = document.getElementById("streamUrl"); const urlEl = document.getElementById("streamUrl");

View file

@ -290,6 +290,32 @@ harbor.http.register(
end end
) )
# Mettre une URL yt-dlp en file d'attente (piste seule, ou playlist/album
# entier). Le player n'a pas accès au réseau interne : on relaie la demande vers
# l'ingest, qui résout l'URL et la place en file prioritaire (le prochain /next
# la servira). On renvoie tel quel son code et son corps JSON ({queued: N} ou
# une erreur). Timeout large : résoudre une grosse playlist peut prendre du
# temps. NB : la variable locale s'appelle `link`, pas `url`, pour ne pas
# masquer le module `url` (url.encode).
ingest_enqueue_url = "http://ingest:8080/enqueue"
harbor.http.register(
port=8000, method="POST", "/enqueue",
fun(req, resp) -> begin
link = list.assoc(default="", "url", req.query)
if link == "" then
resp.status_code(400)
resp.data("missing url")
else
body = http.post(
data="", timeout=60.0, "#{ingest_enqueue_url}?url=#{url.encode(link)}"
)
resp.status_code(body.status_code)
resp.content_type("application/json; charset=utf-8")
resp.data(string.trim(body) ^ "\n")
end
end
)
# Passer au morceau suivant : on saute le morceau en cours sur la source # Passer au morceau suivant : on saute le morceau en cours sur la source
# diffusée. request.dynamic a déjà préchargé le suivant, donc l'enchaînement # diffusée. request.dynamic a déjà préchargé le suivant, donc l'enchaînement
# est immédiat (le prochain /next est demandé au daemon dans la foulée). # est immédiat (le prochain /next est demandé au daemon dans la foulée).