radieo/ingest/radieo/db.py
Pierre-Olivier Mercier 976f009297
Some checks failed
continuous-integration/drone/push Build is failing
stream: let listeners remove a track from the queue
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-04 15:18:58 +08:00

200 lines
7 KiB
Python

"""SQLite state: play history, cache-file retention and MBID canonical cache.
Three concerns, three tables:
- ``history`` is append-only. It drives anti-repeat (recently played canonical
keys, and raw locators for the providers' cheap local filter) and survives
cache eviction, so a track can stay "recently played" even after its file is
deleted.
- ``cache_files`` tracks downloaded files so we can keep only the N most
recently *played* ones (LRU retention). Files not yet played are never
evicted.
- ``canonical_cache`` memoizes ``(artist, title) -> MBID`` lookups (a NULL mbid
means a confirmed no-match) so the Canonicalizer hits the network at most once
per distinct track.
"""
import sqlite3
import threading
import time
from pathlib import Path
from .models import Track
_SCHEMA = """
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY,
track_key TEXT NOT NULL,
locator TEXT,
artist TEXT,
title TEXT,
origin TEXT,
played_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_history_played_at ON history(played_at);
CREATE TABLE IF NOT EXISTS cache_files (
path TEXT PRIMARY KEY,
track_key TEXT,
played_at REAL -- NULL until the file has been played
);
CREATE TABLE IF NOT EXISTS canonical_cache (
artist_norm TEXT NOT NULL,
title_norm TEXT NOT NULL,
mbid TEXT, -- NULL means a confirmed no-match
resolved_at REAL NOT NULL,
PRIMARY KEY (artist_norm, title_norm)
);
"""
class Database:
def __init__(self, path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._conn = sqlite3.connect(
path, check_same_thread=False, isolation_level=None
)
self._conn.row_factory = sqlite3.Row
self._conn.executescript(_SCHEMA)
self._migrate()
def _migrate(self) -> None:
# Add columns introduced after a DB may have been created (milestone 5).
cols = {
r["name"]
for r in self._conn.execute("PRAGMA table_info(history)").fetchall()
}
if "locator" not in cols:
self._conn.execute("ALTER TABLE history ADD COLUMN locator TEXT")
# --- anti-repeat / history -------------------------------------------
def recent_keys(self, limit: int) -> set[str]:
with self._lock:
rows = self._conn.execute(
"SELECT track_key FROM history ORDER BY played_at DESC LIMIT ?",
(limit,),
).fetchall()
return {r["track_key"] for r in rows}
def recent_locators(self, limit: int) -> set[str]:
"""Raw backend locators recently played (providers' cheap local filter)."""
with self._lock:
rows = self._conn.execute(
"SELECT locator FROM history WHERE locator IS NOT NULL"
" ORDER BY played_at DESC LIMIT ?",
(limit,),
).fetchall()
return {r["locator"] for r in rows}
def record_play(self, track: Track) -> None:
with self._lock:
self._conn.execute(
"INSERT INTO history"
" (track_key, locator, artist, title, origin, played_at)"
" VALUES (?, ?, ?, ?, ?, ?)",
(
track.key,
track.locator,
track.artist,
track.title,
track.origin,
time.time(),
),
)
# --- MBID canonical cache --------------------------------------------
def get_canonical(self, artist_norm: str, title_norm: str):
"""Return (cached: bool, mbid: str | None). ``cached`` False means the
pair was never looked up; a cached row with mbid None is a known miss."""
with self._lock:
row = self._conn.execute(
"SELECT mbid FROM canonical_cache"
" WHERE artist_norm = ? AND title_norm = ?",
(artist_norm, title_norm),
).fetchone()
if row is None:
return False, None
return True, row["mbid"]
def put_canonical(
self, artist_norm: str, title_norm: str, mbid: str | None
) -> None:
with self._lock:
self._conn.execute(
"INSERT OR REPLACE INTO canonical_cache"
" (artist_norm, title_norm, mbid, resolved_at)"
" VALUES (?, ?, ?, ?)",
(artist_norm, title_norm, mbid, time.time()),
)
# --- cache-file retention --------------------------------------------
def register_download(self, path: str, track_key: str) -> None:
with self._lock:
self._conn.execute(
"INSERT OR REPLACE INTO cache_files (path, track_key, played_at)"
" VALUES (?, ?, NULL)",
(path, track_key),
)
def forget_download(self, path: str) -> None:
"""Drop a cache-file row without touching history.
Used when a queued-but-unplayed track is removed from the queue: its
row (played_at NULL) would otherwise linger forever, since retention
only ever considers already-played files.
"""
with self._lock:
self._conn.execute("DELETE FROM cache_files WHERE path = ?", (path,))
def played_files(self, limit: int) -> list[str]:
"""Files already aired, newest first (the stream's fallback pool).
Only played tracks appear here, so files still being pre-fetched are
never served as fallback — that is what stopped the cold-start loop.
"""
with self._lock:
rows = self._conn.execute(
"SELECT path FROM cache_files WHERE played_at IS NOT NULL"
" ORDER BY played_at DESC LIMIT ?",
(limit,),
).fetchall()
return [r["path"] for r in rows]
def mark_played(self, path: str) -> None:
with self._lock:
self._conn.execute(
"UPDATE cache_files SET played_at = ? WHERE path = ?",
(time.time(), path),
)
def evict(self, keep: int) -> list[str]:
"""Return (and forget) played files beyond the ``keep`` most recent.
The caller is responsible for deleting the returned files from disk.
Never touches files that have not been played yet.
"""
with self._lock:
rows = self._conn.execute(
"SELECT path FROM cache_files WHERE played_at IS NOT NULL"
" AND path NOT IN ("
" SELECT path FROM cache_files WHERE played_at IS NOT NULL"
" ORDER BY played_at DESC LIMIT ?"
")",
(keep,),
).fetchall()
paths = [r["path"] for r in rows]
if paths:
self._conn.executemany(
"DELETE FROM cache_files WHERE path = ?",
[(p,) for p in paths],
)
return paths
def close(self) -> None:
with self._lock:
self._conn.close()