radieo/ingest/radieo/__main__.py
Pierre-Olivier Mercier efd7307cc6 ingest: give every track a source link
The player's "source" link only worked for direct yt-dlp URLs. Two other
cases had no linkable page: ListenBrainz picks resolved via ytsearch1: (the
locator is a search query) and Subsonic library tracks (an opaque song id).

Centralise the rule in Track.page_url and cover both: the yt-dlp fetcher now
records the concrete video URL it resolved into source_url, and a Subsonic
track links to the stream's new /share endpoint, which asks ingest to mint a
public share (createShare) on demand and redirects to it — so a share is only
created when a listener actually clicks, never per played track.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-04 11:42:29 +08:00

197 lines
6.9 KiB
Python

"""Entry point: wire providers/fetchers into the queue and serve GET /next."""
import logging
import signal
import threading
from . import config
from .api import IngestServer
from .db import Database
from .queue import TrackQueue
from .scheduler import Scheduler
log = logging.getLogger("radieo")
class _NullCanonicalizer:
"""Used when canonicalization is disabled: leaves the Track untouched."""
def canonicalize(self, track):
return track
def identify(self, artist, title):
return None, None, None
def close(self):
pass
def _build_pipeline(db: Database, canonicalizer):
"""Return (providers, fetchers). Assembles whichever sources are enabled;
when none is, the scheduler yields nothing and the stream plays its local
cache fallback. ``canonicalizer`` is handed to the yt-dlp fetcher so it can
confirm guessed bandcamp tags against MusicBrainz."""
providers = [] # list[(provider, weight)]
fetchers = {} # backend name -> fetcher
subsonic_client = None # reused by ListenBrainz for resolution
if config.SUBSONIC_ENABLED:
from .fetchers.subsonic import SubsonicFetcher
from .providers.subsonic import SubsonicProvider
from .subsonic import SubsonicClient
subsonic_client = SubsonicClient(
config.SUBSONIC_URL, config.SUBSONIC_USER, config.SUBSONIC_PASSWORD
)
provider = SubsonicProvider(subsonic_client, config.SUBSONIC_PLAYLIST, db)
providers.append((provider, config.SOURCE_WEIGHTS.get("subsonic", 0)))
fetchers["subsonic"] = SubsonicFetcher(subsonic_client, config.CACHE_DIR)
log.info(
"OpenSubsonic source enabled (playlist=%r, weight=%d)",
config.SUBSONIC_PLAYLIST,
config.SOURCE_WEIGHTS.get("subsonic", 0),
)
else:
log.warning("OpenSubsonic not configured (RADIEO_SUBSONIC_*): source off.")
if config.SOURCE_WEIGHTS.get("ytdlp", 0) > 0 and config.YTDLP_URLS_FILE.exists():
from .fetchers.ytdlp import YtdlpFetcher
from .providers.ytdlp import YtdlpProvider
provider = YtdlpProvider(config.YTDLP_URLS_FILE, db, config.CACHE_DIR)
providers.append((provider, config.SOURCE_WEIGHTS["ytdlp"]))
fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR, canonicalizer)
log.info(
"yt-dlp source enabled (urls=%s, weight=%d)",
config.YTDLP_URLS_FILE,
config.SOURCE_WEIGHTS["ytdlp"],
)
else:
log.info(
"yt-dlp source off (weight=%d, urls file present=%s).",
config.SOURCE_WEIGHTS.get("ytdlp", 0),
config.YTDLP_URLS_FILE.exists(),
)
if config.LISTENBRAINZ_ENABLED and config.SOURCE_WEIGHTS.get("listenbrainz", 0) > 0:
from .providers.listenbrainz import ListenBrainzProvider
# ListenBrainz has no backend of its own: it resolves each suggestion to
# the Subsonic library then yt-dlp. Make sure the yt-dlp fetcher exists as a
# resolution target even when the yt-dlp *source* is off.
if "ytdlp" not in fetchers:
from .fetchers.ytdlp import YtdlpFetcher
fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR, canonicalizer)
provider = ListenBrainzProvider(
config.LISTENBRAINZ_URL,
db,
subsonic_client=subsonic_client,
ytdlp_enabled="ytdlp" in fetchers,
)
providers.append((provider, config.SOURCE_WEIGHTS["listenbrainz"]))
log.info(
"ListenBrainz source enabled (feed=%s, weight=%d, resolve=%s)",
config.LISTENBRAINZ_URL,
config.SOURCE_WEIGHTS["listenbrainz"],
"subsonic+ytdlp" if subsonic_client else "ytdlp",
)
else:
log.info(
"ListenBrainz source off (weight=%d, feed set=%s).",
config.SOURCE_WEIGHTS.get("listenbrainz", 0),
config.LISTENBRAINZ_ENABLED,
)
if not providers:
log.warning("no source active: the stream plays its local cache only.")
return providers, fetchers, subsonic_client
def _sweep_temp_files() -> None:
"""Delete orphaned download temp files left by a killed container.
Fetchers write to hidden temp files and rename them into place atomically;
the only hidden files we ever create are those temps. If a download is
interrupted (SIGKILL on ``compose down``), the temp survives and the stream
fallback keeps trying to decode it. Clearing them on startup avoids that
noise and reclaims disk.
"""
removed = 0
for path in config.CACHE_DIR.glob(".*"):
if path.name == ".gitkeep" or not path.is_file():
continue
try:
path.unlink()
removed += 1
except OSError:
log.warning("could not remove stale temp %s", path.name)
if removed:
log.info("swept %d stale temp file(s) from cache", removed)
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
config.CACHE_DIR.mkdir(parents=True, exist_ok=True)
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
_sweep_temp_files()
db = Database(config.STATE_DB)
if config.CANONICAL_ENABLED:
from .canonicalizer import Canonicalizer
canonicalizer = Canonicalizer(db)
log.info("Canonicalizer enabled (MusicBrainz, min_score=%d)",
config.CANONICAL_MIN_SCORE)
else:
canonicalizer = _NullCanonicalizer()
log.info("Canonicalizer disabled: tracks keyed by (artist, title).")
providers, fetchers, subsonic_client = _build_pipeline(db, canonicalizer)
scheduler = Scheduler(providers, canonicalizer, db)
queue = TrackQueue(scheduler, fetchers, db)
queue.start()
server = IngestServer(
(config.HTTP_HOST, config.HTTP_PORT), queue, db, subsonic=subsonic_client
)
log.info(
"ingest listening on %s:%d (cache=%s, state=%s)",
config.HTTP_HOST,
config.HTTP_PORT,
config.CACHE_DIR,
config.STATE_DB,
)
# Shut down cleanly on `docker compose down` (SIGTERM) instead of being
# killed after the grace period. shutdown() must run off the serving
# thread, so hand it to a helper thread.
def _handle_signal(signum, _frame):
log.info("received signal %d, shutting down", signum)
threading.Thread(target=server.shutdown, daemon=True).start()
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
queue.stop()
server.server_close()
for provider, _ in providers:
close = getattr(provider, "close", None)
if callable(close):
close()
canonicalizer.close()
db.close()
if __name__ == "__main__":
main()