radieo/ingest/radieo/__main__.py
Pierre-Olivier Mercier fbdb2d6bb3
All checks were successful
continuous-integration/drone/push Build is passing
ingest: tag bandcamp downloads and correct label-account artists
yt-dlp's flat extraction mis-attributes bandcamp uploads: on a label
account the artist becomes the label, and many files ship untagged. After
a download, guess (artist, title, album) from the richer download metadata
and the "Artist - Title" filename shape, confirm against MusicBrainz, and
write ID3 tags into the file. Existing tags are respected; MusicBrainz is
primary, filename parsing only a fallback.

The file's tags are the single source of truth: the provider reads them
back on the next pick, so the corrected identity (incl. MBID) is in place
before the scheduler keys and anti-repeat-checks the track, and cache
reuse inherits it too.

Fetchers now return (path, Track) so the corrected metadata flows back.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-03 11:01:24 +08:00

195 lines
6.8 KiB
Python

"""Entry point: wire providers/fetchers into the queue and serve GET /next."""
import logging
import signal
import threading
from . import config
from .api import IngestServer
from .db import Database
from .queue import TrackQueue
from .scheduler import Scheduler
log = logging.getLogger("radieo")
class _NullCanonicalizer:
"""Used when canonicalization is disabled: leaves the Track untouched."""
def canonicalize(self, track):
return track
def identify(self, artist, title):
return None, None, None
def close(self):
pass
def _build_pipeline(db: Database, canonicalizer):
"""Return (providers, fetchers). Assembles whichever sources are enabled;
when none is, the scheduler yields nothing and the stream plays its local
cache fallback. ``canonicalizer`` is handed to the yt-dlp fetcher so it can
confirm guessed bandcamp tags against MusicBrainz."""
providers = [] # list[(provider, weight)]
fetchers = {} # backend name -> fetcher
subsonic_client = None # reused by ListenBrainz for resolution
if config.NAVIDROME_ENABLED:
from .fetchers.subsonic import SubsonicFetcher
from .providers.navidrome import NavidromeProvider
from .subsonic import SubsonicClient
subsonic_client = SubsonicClient(
config.NAVIDROME_URL, config.NAVIDROME_USER, config.NAVIDROME_PASSWORD
)
provider = NavidromeProvider(subsonic_client, config.NAVIDROME_PLAYLIST, db)
providers.append((provider, config.SOURCE_WEIGHTS.get("navidrome", 0)))
fetchers["subsonic"] = SubsonicFetcher(subsonic_client, config.CACHE_DIR)
log.info(
"Navidrome source enabled (playlist=%r, weight=%d)",
config.NAVIDROME_PLAYLIST,
config.SOURCE_WEIGHTS.get("navidrome", 0),
)
else:
log.warning("Navidrome not configured (RADIEO_NAVIDROME_*): source off.")
if config.SOURCE_WEIGHTS.get("ytdlp", 0) > 0 and config.YTDLP_URLS_FILE.exists():
from .fetchers.ytdlp import YtdlpFetcher
from .providers.ytdlp import YtdlpProvider
provider = YtdlpProvider(config.YTDLP_URLS_FILE, db, config.CACHE_DIR)
providers.append((provider, config.SOURCE_WEIGHTS["ytdlp"]))
fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR, canonicalizer)
log.info(
"yt-dlp source enabled (urls=%s, weight=%d)",
config.YTDLP_URLS_FILE,
config.SOURCE_WEIGHTS["ytdlp"],
)
else:
log.info(
"yt-dlp source off (weight=%d, urls file present=%s).",
config.SOURCE_WEIGHTS.get("ytdlp", 0),
config.YTDLP_URLS_FILE.exists(),
)
if config.LISTENBRAINZ_ENABLED and config.SOURCE_WEIGHTS.get("listenbrainz", 0) > 0:
from .providers.listenbrainz import ListenBrainzProvider
# ListenBrainz has no backend of its own: it resolves each suggestion to
# Navidrome then yt-dlp. Make sure the yt-dlp fetcher exists as a
# resolution target even when the yt-dlp *source* is off.
if "ytdlp" not in fetchers:
from .fetchers.ytdlp import YtdlpFetcher
fetchers["ytdlp"] = YtdlpFetcher(config.CACHE_DIR, canonicalizer)
provider = ListenBrainzProvider(
config.LISTENBRAINZ_URL,
db,
subsonic_client=subsonic_client,
ytdlp_enabled="ytdlp" in fetchers,
)
providers.append((provider, config.SOURCE_WEIGHTS["listenbrainz"]))
log.info(
"ListenBrainz source enabled (feed=%s, weight=%d, resolve=%s)",
config.LISTENBRAINZ_URL,
config.SOURCE_WEIGHTS["listenbrainz"],
"navidrome+ytdlp" if subsonic_client else "ytdlp",
)
else:
log.info(
"ListenBrainz source off (weight=%d, feed set=%s).",
config.SOURCE_WEIGHTS.get("listenbrainz", 0),
config.LISTENBRAINZ_ENABLED,
)
if not providers:
log.warning("no source active: the stream plays its local cache only.")
return providers, fetchers
def _sweep_temp_files() -> None:
"""Delete orphaned download temp files left by a killed container.
Fetchers write to hidden temp files and rename them into place atomically;
the only hidden files we ever create are those temps. If a download is
interrupted (SIGKILL on ``compose down``), the temp survives and the stream
fallback keeps trying to decode it. Clearing them on startup avoids that
noise and reclaims disk.
"""
removed = 0
for path in config.CACHE_DIR.glob(".*"):
if path.name == ".gitkeep" or not path.is_file():
continue
try:
path.unlink()
removed += 1
except OSError:
log.warning("could not remove stale temp %s", path.name)
if removed:
log.info("swept %d stale temp file(s) from cache", removed)
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
config.CACHE_DIR.mkdir(parents=True, exist_ok=True)
config.STATE_DIR.mkdir(parents=True, exist_ok=True)
_sweep_temp_files()
db = Database(config.STATE_DB)
if config.CANONICAL_ENABLED:
from .canonicalizer import Canonicalizer
canonicalizer = Canonicalizer(db)
log.info("Canonicalizer enabled (MusicBrainz, min_score=%d)",
config.CANONICAL_MIN_SCORE)
else:
canonicalizer = _NullCanonicalizer()
log.info("Canonicalizer disabled: tracks keyed by (artist, title).")
providers, fetchers = _build_pipeline(db, canonicalizer)
scheduler = Scheduler(providers, canonicalizer, db)
queue = TrackQueue(scheduler, fetchers, db)
queue.start()
server = IngestServer((config.HTTP_HOST, config.HTTP_PORT), queue, db)
log.info(
"ingest listening on %s:%d (cache=%s, state=%s)",
config.HTTP_HOST,
config.HTTP_PORT,
config.CACHE_DIR,
config.STATE_DB,
)
# Shut down cleanly on `docker compose down` (SIGTERM) instead of being
# killed after the grace period. shutdown() must run off the serving
# thread, so hand it to a helper thread.
def _handle_signal(signum, _frame):
log.info("received signal %d, shutting down", signum)
threading.Thread(target=server.shutdown, daemon=True).start()
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
queue.stop()
server.server_close()
for provider, _ in providers:
close = getattr(provider, "close", None)
if callable(close):
close()
canonicalizer.close()
db.close()
if __name__ == "__main__":
main()