From 493e55ed181ab1ada0b7744583ad87992903a4c1 Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Sat, 4 Jul 2026 12:18:06 +0800 Subject: [PATCH] stream: let listeners queue a yt-dlp URL on request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an input in the queue tab to enqueue a yt-dlp URL: a single track, or a whole playlist/album. Requests are a priority lane in the ingest queue — pop_next serves them before the auto radio, so the next /next plays the request without cutting the current track. They download lazily (a few ahead), so a large playlist queues instantly and bypasses anti-repeat. Co-Authored-By: Claude Opus 4.8 --- ingest/radieo/__main__.py | 6 ++ ingest/radieo/api.py | 25 ++++++++ ingest/radieo/queue.py | 120 +++++++++++++++++++++++++++++++------- stream/index.html | 68 +++++++++++++++++++++ stream/radio.liq | 26 +++++++++ 5 files changed, 224 insertions(+), 21 deletions(-) diff --git a/ingest/radieo/__main__.py b/ingest/radieo/__main__.py index ddf01fb..cd3b00b 100644 --- a/ingest/radieo/__main__.py +++ b/ingest/radieo/__main__.py @@ -153,6 +153,12 @@ def main() -> None: log.info("Canonicalizer disabled: tracks keyed by (artist, title).") providers, fetchers, subsonic_client = _build_pipeline(db, canonicalizer) + # Listener requests (POST /enqueue) are always yt-dlp URLs, so make sure the + # yt-dlp fetcher exists even when the yt-dlp *source* is disabled. + if "ytdlp" not in fetchers: + from .fetchers.ytdlp import YtdlpFetcher + + fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR, canonicalizer) scheduler = Scheduler(providers, canonicalizer, db) queue = TrackQueue(scheduler, fetchers, db) queue.start() diff --git a/ingest/radieo/api.py b/ingest/radieo/api.py index bea71fd..e121586 100644 --- a/ingest/radieo/api.py +++ b/ingest/radieo/api.py @@ -9,6 +9,9 @@ Endpoints: player (proxied by the stream) so it can show buffering. GET /queue -> JSON list of the upcoming (prefetched) tracks, oldest first, surfaced to the player (proxied by the stream). + POST /enqueue?url= -> resolve a yt-dlp URL (single track or whole playlist/ + album) and queue it as priority requests; returns + {queued: N}. Proxied by the stream. GET /healthz -> "ok" """ @@ -82,9 +85,31 @@ class _Handler(BaseHTTPRequestHandler): parsed = urlsplit(self.path) if parsed.path == "/share": self._serve_share(parse_qs(parsed.query)) + elif parsed.path == "/enqueue": + self._serve_enqueue(parse_qs(parsed.query)) else: self._text(404, "not found\n") + def _serve_enqueue(self, query: dict[str, list[str]]): + # Queue a listener-requested yt-dlp URL (single track or whole + # playlist/album) as priority requests. Proxied here by the stream. + url = (query.get("url") or [""])[0].strip() + if not url.startswith(("http://", "https://")): + self._text(400, "missing or invalid url\n") + return + try: + count = self.server.queue.enqueue_url(url) + except Exception as exc: # yt-dlp raises many extractor-specific errors + log.warning("enqueue failed for %s: %s", url, exc) + self._text(502, "could not resolve url\n") + return + if count == 0: + self._text(404, "no track found\n") + return + self._text( + 200, json.dumps({"queued": count}) + "\n", "application/json; charset=utf-8" + ) + def _serve_share(self, query: dict[str, list[str]]): # Mint a public Subsonic share for one song id, on demand. Called by the # stream when a listener clicks a subsonic track's source link, so no diff --git a/ingest/radieo/queue.py b/ingest/radieo/queue.py index 2d4d17f..c82e4f6 100644 --- a/ingest/radieo/queue.py +++ b/ingest/radieo/queue.py @@ -5,6 +5,13 @@ scheduler what to play next, hands the track to the fetcher registered for its backend, and enqueues the resulting file. ``pop_next`` hands the oldest ready track to the HTTP API, records the play and runs LRU retention. +On top of that automatic radio, listeners can push explicit **requests** (a +yt-dlp URL — a single track, or a whole playlist/album). Requests are a +priority lane: ``pop_next`` drains them before the auto buffer, so the very next +``/next`` returns the requested music. They are downloaded lazily (only a few +ahead of playback), so a large playlist queues instantly without pulling every +track at once, and they bypass anti-repeat since the listener asked for them. + If no source has anything (e.g. nothing configured, or all unreachable), the buffer simply stays empty and ``pop_next`` returns ``None`` — the stream then plays its own local-cache fallback. @@ -29,6 +36,11 @@ class TrackQueue: self._db = db self._lock = threading.Lock() self._ready: deque[tuple[Path, Track]] = deque() + # Listener requests: pending references not yet downloaded, and a small + # priority buffer of the ones already fetched. ``pop_next`` serves + # ``_ready_req`` before ``_ready``. + self._requests: deque[Track] = deque() + self._ready_req: deque[tuple[Path, Track]] = deque() self._stop = threading.Event() self._thread = threading.Thread( target=self._run, name="prefetch", daemon=True @@ -51,6 +63,16 @@ class TrackQueue: self._stop.wait(config.PREFETCH_INTERVAL) def _prefetch(self) -> None: + # Listener requests come first: fetch a few ahead into the priority + # buffer, leaving the rest as pending references (downloaded as slots + # free) so a large playlist doesn't pull every track at once. + while not self._stop.is_set(): + with self._lock: + if not self._requests or len(self._ready_req) >= config.PREFETCH: + break + track = self._requests.popleft() + self._fetch_into(track, self._ready_req) + # Then top up the automatic radio buffer. with self._lock: missing = config.PREFETCH - len(self._ready) for _ in range(max(0, missing)): @@ -59,20 +81,65 @@ class TrackQueue: track = self._scheduler.next() if track is None: return # nothing to fetch right now - fetcher = self._fetchers.get(track.backend) - if fetcher is None: - log.error("no fetcher for backend %r (%s)", track.backend, track) - continue - try: - # Fetchers may refine the Track's metadata (e.g. correcting a - # bandcamp label account to the real artist), so take it back. - path, track = fetcher.fetch(track) - except Exception: - log.exception("fetch failed for %s", track) - continue - self._db.register_download(str(path), track.key) + self._fetch_into(track, self._ready) + + def _fetch_into(self, track: Track, target: "deque[tuple[Path, Track]]") -> None: + """Download ``track`` and append the ready ``(path, track)`` to ``target``. + + Shared by the automatic radio and the request lane. Never raises: a + missing fetcher or a failed download is logged and skipped so the + prefetch loop keeps going. + """ + fetcher = self._fetchers.get(track.backend) + if fetcher is None: + log.error("no fetcher for backend %r (%s)", track.backend, track) + return + try: + # Fetchers may refine the Track's metadata (e.g. correcting a + # bandcamp label account to the real artist), so take it back. + path, track = fetcher.fetch(track) + except Exception: + log.exception("fetch failed for %s", track) + return + self._db.register_download(str(path), track.key) + with self._lock: + target.append((path, track)) + + # --- listener requests ------------------------------------------------ + + def enqueue_url(self, url: str) -> int: + """Resolve a yt-dlp URL and queue it as priority requests. + + A container URL (playlist/album) expands to all its tracks; a direct + track URL yields a single one. Returns how many tracks were queued. + Raises on a URL yt-dlp cannot resolve, so the caller can report it. + """ + from . import tagging + from .providers.ytdlp import YtdlpProvider + + entries = YtdlpProvider._extract(url) + tracks = [] + for entry in entries: + locator = entry["url"] + # Split "Artist - Title" the same way the provider/fetcher do, so + # the queued metadata matches what plays. + guess = tagging.guess_metadata( + {"title": entry.get("title"), "artist": entry.get("artist")} + ) + title = guess.title if entry.get("title") else locator + tracks.append(Track( + backend="ytdlp", + locator=locator, + artist=guess.artist, + title=title, + origin="request", + source_url=url if url != locator else None, + )) + if tracks: with self._lock: - self._ready.append((path, track)) + self._requests.extend(tracks) + log.info("queued %d requested track(s) from %s", len(tracks), url) + return len(tracks) # --- introspection ---------------------------------------------------- @@ -82,16 +149,23 @@ class TrackQueue: return len(self._ready) def snapshot(self) -> list[dict]: - """Display metadata of the upcoming tracks, oldest (next) first. + """Display metadata of the upcoming tracks, in play order (next first). - A peek at the prefetch buffer for the player's "up next" view; it does - not consume anything. Mirrors the fields exposed for the current track - (see ``annotate_uri``): a source ``url`` only for http(s) locators. + A peek at the buffers for the player's "up next" view; it does not + consume anything. Requests come first (downloaded, then still-pending + references), then the automatic radio buffer — the same order + ``pop_next`` serves them. Mirrors the fields exposed for the current + track (see ``annotate_uri``): a source ``url`` only for http(s) + locators. """ with self._lock: - ready = list(self._ready) + upcoming = ( + [t for _p, t in self._ready_req] + + list(self._requests) + + [t for _p, t in self._ready] + ) items = [] - for _path, track in ready: + for track in upcoming: entry = { "title": track.title, "artist": track.artist, @@ -106,9 +180,13 @@ class TrackQueue: def pop_next(self) -> tuple[Path, Track] | None: with self._lock: - if not self._ready: + # Requests preempt the automatic radio: the next /next serves them. + if self._ready_req: + path, track = self._ready_req.popleft() + elif self._ready: + path, track = self._ready.popleft() + else: return None - path, track = self._ready.popleft() self._db.mark_played(str(path)) self._db.record_play(track) self._evict() diff --git a/stream/index.html b/stream/index.html index b7207dd..2fbd4a8 100644 --- a/stream/index.html +++ b/stream/index.html @@ -157,6 +157,25 @@ .queue .q-num { color: #6b6480; font-size: .8rem; font-variant-numeric: tabular-nums; min-width: 1.2em; text-align: right; } .queue .q-meta { flex: 1; min-width: 0; } + /* Ajout à la file : un input d'URL + bouton, calqués sur le bloc « share ». + Le message de retour s'affiche discrètement sous le formulaire, en rouge + tamisé quand c'est une erreur. */ + .enqueue { display: flex; gap: .5rem; margin-bottom: .75rem; } + .enqueue input { + flex: 1; min-width: 0; padding: .55rem .7rem; font-size: .85rem; + color: #cfc9de; background: rgba(255,255,255,.05); + border: 1px solid rgba(255,255,255,.1); border-radius: 10px; + } + .enqueue button { + padding: .55rem .9rem; font-size: .8rem; font-weight: 600; cursor: pointer; + color: #f2f0f7; background: rgba(155,140,255,.18); + border: 1px solid rgba(155,140,255,.35); border-radius: 10px; + transition: background .15s; + } + .enqueue button:hover { background: rgba(155,140,255,.3); } + .enqueue button:disabled { opacity: .5; cursor: default; } + .enqueue-msg { font-size: .8rem; color: #8ad9a0; margin-bottom: .5rem; } + .enqueue-msg.err { color: #e08b9b; } .dot { display: inline-block; width: 8px; height: 8px; border-radius: 50%; background: #4ade80; margin-right: .4rem; vertical-align: middle; box-shadow: 0 0 0 0 rgba(74,222,128,.6); animation: pulse 2s infinite; } @@ -193,6 +212,12 @@ @@ -245,6 +270,8 @@ // Filet de secours : morceau rejoué depuis le cache local (déjà diffusé), // quand l'ingest n'a rien à proposer (démarrage, panne…). cache: "le cache local", + // Morceau mis en file par un auditeur via l'input « Ajouter à la file ». + request: "une demande", }; // Tant que le buffer de préchargement (PREFETCH côté ingest) n'est pas @@ -450,6 +477,47 @@ }).join(""); } catch (e) { /* keep last known values */ } } + + // Ajout à la file : on POST l'URL au stream, qui la relaie à l'ingest. Ce + // dernier résout l'URL (piste seule, ou playlist/album entier) et la place + // en file prioritaire — le prochain morceau diffusé sera la demande. On + // affiche un retour bref puis on rafraîchit la file. La résolution d'une + // grosse playlist peut prendre quelques secondes. + const enqueueForm = document.getElementById("enqueueForm"); + const enqueueUrl = document.getElementById("enqueueUrl"); + const enqueueBtn = document.getElementById("enqueueBtn"); + const enqueueMsg = document.getElementById("enqueueMsg"); + function showEnqueueMsg(text, ok) { + enqueueMsg.textContent = text; + enqueueMsg.classList.toggle("err", !ok); + enqueueMsg.hidden = false; + } + enqueueForm.addEventListener("submit", async (e) => { + e.preventDefault(); + const link = enqueueUrl.value.trim(); + if (!link) return; + const prev = enqueueBtn.textContent; + enqueueBtn.disabled = true; + enqueueBtn.textContent = "…"; + try { + const r = await fetch("/enqueue?url=" + encodeURIComponent(link), { method: "POST" }); + if (r.ok) { + const d = await r.json().catch(() => ({})); + const n = Number(d.queued) || 0; + showEnqueueMsg(n > 1 ? `${n} titres ajoutés à la file` : "1 titre ajouté à la file", true); + enqueueUrl.value = ""; + pollQueue(); + } else { + showEnqueueMsg("Impossible de résoudre cette URL", false); + } + } catch (err) { + showEnqueueMsg("Erreur réseau", false); + } finally { + enqueueBtn.disabled = false; + enqueueBtn.textContent = prev; + } + }); + // Lien nu du flux, à ouvrir dans un lecteur externe (VLC…). const shareUrl = location.origin + "/radio.mp3"; const urlEl = document.getElementById("streamUrl"); diff --git a/stream/radio.liq b/stream/radio.liq index cae432d..084fae3 100644 --- a/stream/radio.liq +++ b/stream/radio.liq @@ -290,6 +290,32 @@ harbor.http.register( end ) +# Mettre une URL yt-dlp en file d'attente (piste seule, ou playlist/album +# entier). Le player n'a pas accès au réseau interne : on relaie la demande vers +# l'ingest, qui résout l'URL et la place en file prioritaire (le prochain /next +# la servira). On renvoie tel quel son code et son corps JSON ({queued: N} ou +# une erreur). Timeout large : résoudre une grosse playlist peut prendre du +# temps. NB : la variable locale s'appelle `link`, pas `url`, pour ne pas +# masquer le module `url` (url.encode). +ingest_enqueue_url = "http://ingest:8080/enqueue" +harbor.http.register( + port=8000, method="POST", "/enqueue", + fun(req, resp) -> begin + link = list.assoc(default="", "url", req.query) + if link == "" then + resp.status_code(400) + resp.data("missing url") + else + body = http.post( + data="", timeout=60.0, "#{ingest_enqueue_url}?url=#{url.encode(link)}" + ) + resp.status_code(body.status_code) + resp.content_type("application/json; charset=utf-8") + resp.data(string.trim(body) ^ "\n") + end + end +) + # Passer au morceau suivant : on saute le morceau en cours sur la source # diffusée. request.dynamic a déjà préchargé le suivant, donc l'enchaînement # est immédiat (le prochain /next est demandé au daemon dans la foulée).