"""
Database management for the three-database approach:
- all_feed_entries.db: All RSS entries for deduplication
- matched_entries_history.db: Historical matches across all topics
- papers.db: Current run processing data
"""
import sqlite3
import os
import datetime
import hashlib
import re
import urllib.parse
from typing import Dict, List, Any, Optional, Iterator
from contextlib import contextmanager
import logging
import glob
from .paths import resolve_data_file
from .doi_utils import extract_doi_from_entry
logger = logging.getLogger(__name__)
[docs]
class DatabaseManager:
"""Manages the three-database system for feed processing."""
def __init__(self, config: Dict[str, Any]):
"""Resolve database file paths from config and ensure schemas exist."""
self.config = config
self.db_paths = {
'all_feeds': str(resolve_data_file(config['database']['all_feeds_path'], ensure_parent=True)),
'history': str(resolve_data_file(config['database']['history_path'], ensure_parent=True)),
'current': str(resolve_data_file(config['database']['path'], ensure_parent=True)),
}
self._init_databases()
def _init_databases(self):
"""Initialize all three databases with proper schemas."""
# Initialize all_feed_entries.db
self._init_all_feeds_db()
# Initialize matched_entries_history.db
self._init_history_db()
# Initialize papers.db (current run)
self._init_current_db()
@staticmethod
def _apply_pragmas(conn: sqlite3.Connection) -> None:
"""Apply performance pragmas to a database connection.
``page_size`` only takes effect on a freshly created (empty) database
or after a subsequent VACUUM on an existing one; setting it here is
therefore a no-op for databases that already contain data, but it
ensures new databases start with the larger page size.
"""
conn.execute("PRAGMA auto_vacuum = INCREMENTAL")
conn.execute("PRAGMA page_size = 8192")
@staticmethod
def _create_fts5_trigram(conn: sqlite3.Connection, table: str, columns: list[str]) -> None:
"""Create an FTS5 external-content virtual table with trigram tokenizer.
The virtual table references *table* via ``content=`` so no data is
duplicated — only the trigram inverted index is stored. INSERT /
DELETE / UPDATE triggers keep the index in sync automatically.
If the main table already has rows but the FTS index is empty (e.g.
after a migration), the index is rebuilt.
"""
fts_table = f"{table}_fts"
col_list = ", ".join(columns)
conn.execute(
f"CREATE VIRTUAL TABLE IF NOT EXISTS {fts_table} "
f"USING fts5({col_list}, content='{table}', content_rowid='rowid', tokenize='trigram')"
)
# Sync triggers — idempotent thanks to IF NOT EXISTS-style naming.
conn.execute(
f"""CREATE TRIGGER IF NOT EXISTS {fts_table}_ai AFTER INSERT ON {table} BEGIN
INSERT INTO {fts_table}(rowid, {col_list})
VALUES (new.rowid, {', '.join('new.' + c for c in columns)});
END"""
)
conn.execute(
f"""CREATE TRIGGER IF NOT EXISTS {fts_table}_ad AFTER DELETE ON {table} BEGIN
INSERT INTO {fts_table}({fts_table}, rowid, {col_list})
VALUES ('delete', old.rowid, {', '.join('old.' + c for c in columns)});
END"""
)
conn.execute(
f"""CREATE TRIGGER IF NOT EXISTS {fts_table}_au AFTER UPDATE ON {table} BEGIN
INSERT INTO {fts_table}({fts_table}, rowid, {col_list})
VALUES ('delete', old.rowid, {', '.join('old.' + c for c in columns)});
INSERT INTO {fts_table}(rowid, {col_list})
VALUES (new.rowid, {', '.join('new.' + c for c in columns)});
END"""
)
# Rebuild if main table has rows but FTS is empty (post-migration).
cursor = conn.execute(f"SELECT COUNT(*) FROM {table}")
main_count = cursor.fetchone()[0]
if main_count > 0:
cursor = conn.execute(f"SELECT COUNT(*) FROM {fts_table}")
fts_count = cursor.fetchone()[0]
if fts_count == 0:
logger.info("Rebuilding FTS index for %s (%d rows)", table, main_count)
conn.execute(f"INSERT INTO {fts_table}({fts_table}) VALUES('rebuild')")
@staticmethod
def _create_fts5_keyword(conn: sqlite3.Connection, table: str, columns: list[str]) -> None:
"""Create an FTS5 external-content virtual table with porter stemming.
Same pattern as :meth:`_create_fts5_trigram` but uses the ``porter
unicode61`` tokenizer for keyword search with stemming, BM25 ranking,
phrase queries, prefix matching, and boolean operators.
"""
kw_table = f"{table}_kw"
col_list = ", ".join(columns)
conn.execute(
f"CREATE VIRTUAL TABLE IF NOT EXISTS {kw_table} "
f"USING fts5({col_list}, content='{table}', content_rowid='rowid', "
f"tokenize='porter unicode61')"
)
conn.execute(
f"""CREATE TRIGGER IF NOT EXISTS {kw_table}_ai AFTER INSERT ON {table} BEGIN
INSERT INTO {kw_table}(rowid, {col_list})
VALUES (new.rowid, {', '.join('new.' + c for c in columns)});
END"""
)
conn.execute(
f"""CREATE TRIGGER IF NOT EXISTS {kw_table}_ad AFTER DELETE ON {table} BEGIN
INSERT INTO {kw_table}({kw_table}, rowid, {col_list})
VALUES ('delete', old.rowid, {', '.join('old.' + c for c in columns)});
END"""
)
conn.execute(
f"""CREATE TRIGGER IF NOT EXISTS {kw_table}_au AFTER UPDATE ON {table} BEGIN
INSERT INTO {kw_table}({kw_table}, rowid, {col_list})
VALUES ('delete', old.rowid, {', '.join('old.' + c for c in columns)});
INSERT INTO {kw_table}(rowid, {col_list})
VALUES (new.rowid, {', '.join('new.' + c for c in columns)});
END"""
)
cursor = conn.execute(f"SELECT COUNT(*) FROM {table}")
main_count = cursor.fetchone()[0]
if main_count > 0:
cursor = conn.execute(f"SELECT COUNT(*) FROM {kw_table}")
kw_count = cursor.fetchone()[0]
if kw_count == 0:
logger.info("Rebuilding keyword FTS index for %s (%d rows)", table, main_count)
conn.execute(f"INSERT INTO {kw_table}({kw_table}) VALUES('rebuild')")
def _backup_sqlite(self, src_path: str, dest_path: str) -> None:
"""Create a consistent backup copy of a SQLite database.
Creates a consistent copy of `src_path` at `dest_path` using the
SQLite backup API. Overwrites any existing backup file at dest_path.
"""
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# Use SQLite backup API for safety
src_conn = sqlite3.connect(src_path)
try:
# Remove existing backup to avoid appending old pages
if os.path.exists(dest_path):
os.remove(dest_path)
dest_conn = sqlite3.connect(dest_path)
try:
src_conn.backup(dest_conn)
finally:
dest_conn.close()
finally:
src_conn.close()
def _rotate_backups(self, directory: str, stem: str, keep: int = 3) -> None:
"""Keep only the newest `keep` backups matching the given stem.
Backup files are expected to match pattern: f"{stem}.YYYYMMDD-HHMMSS.backup.db".
Older files beyond the `keep` most recent (by filename) are deleted.
"""
pattern = os.path.join(directory, f"{stem}.*.backup.db")
files = sorted(glob.glob(pattern))
if len(files) <= keep:
return
to_delete = files[0 : len(files) - keep]
for fp in to_delete:
try:
os.remove(fp)
logger.info(f"Pruned old backup: {fp}")
except Exception as e:
logger.warning(f"Failed to remove old backup {fp}: {e}")
[docs]
def backup_important_databases(self) -> Dict[str, str]:
"""Backup history and all_feeds databases with timestamped rotation.
- Writes timestamped backups alongside the source DBs in the runtime data directory.
- Keeps up to 3 most recent backups per database, pruning older ones.
Returns a dict mapping logical db keys to the created backup file paths.
"""
backups = {}
now = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
mappings = {
'all_feeds': ('all_feed_entries', self.db_paths['all_feeds']),
'history': ('matched_entries_history', self.db_paths['history']),
}
for key, (stem, path) in mappings.items():
# Skip if source DB does not yet exist
if not os.path.exists(path):
continue
directory = os.path.dirname(path)
dest = os.path.join(directory, f"{stem}.{now}.backup.db")
try:
self._backup_sqlite(path, dest)
backups[key] = dest
logger.info(f"Backed up database '{key}' to {dest}")
# Rotate: keep only newest 3
self._rotate_backups(directory, stem, keep=3)
except Exception as e:
logger.error(f"Failed to backup database '{key}' from {path} to {dest}: {e}")
return backups
def _init_all_feeds_db(self):
"""Initialize the all RSS entries database."""
conn = sqlite3.connect(self.db_paths['all_feeds'])
self._apply_pragmas(conn)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS feed_entries (
entry_id TEXT PRIMARY KEY,
feed_name TEXT NOT NULL,
title TEXT NOT NULL,
link TEXT NOT NULL,
summary TEXT,
authors TEXT,
published_date TEXT,
first_seen TEXT DEFAULT (datetime('now')),
last_seen TEXT DEFAULT (datetime('now')),
UNIQUE(feed_name, entry_id)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_feed_entries_feed_name
ON feed_entries(feed_name)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_feed_entries_first_seen
ON feed_entries(first_seen)
''')
self._create_fts5_trigram(conn, 'feed_entries', ['title', 'summary', 'authors'])
self._create_fts5_keyword(conn, 'feed_entries', ['title', 'summary', 'authors'])
conn.commit()
conn.close()
def _init_history_db(self):
"""Initialize the historical matches database.
Creates the `matched_entries` table if missing. If present but
missing required columns (abstract, doi, topics), it recreates the
table for testing simplicity.
"""
conn = sqlite3.connect(self.db_paths['history'])
self._apply_pragmas(conn)
cursor = conn.cursor()
# Inspect existing table
cursor.execute("PRAGMA table_info(matched_entries)")
info = cursor.fetchall()
columns = {row[1] for row in info}
required_columns = {
'entry_id', 'feed_name', 'topics', 'title', 'link', 'summary',
'authors', 'abstract', 'doi', 'published_date', 'matched_date',
'llm_summary', 'paper_qa_summary', 'rank_score'
}
# If table exists, try lightweight migrations for new columns; otherwise recreate
if len(columns) > 0:
# Add new optional columns if missing (non-destructive)
if 'llm_summary' not in columns:
try:
cursor.execute("ALTER TABLE matched_entries ADD COLUMN llm_summary TEXT")
columns.add('llm_summary')
except Exception as e:
logger.debug(f"Column llm_summary may already exist: {e}")
if 'paper_qa_summary' not in columns:
try:
cursor.execute("ALTER TABLE matched_entries ADD COLUMN paper_qa_summary TEXT")
columns.add('paper_qa_summary')
except Exception as e:
logger.debug(f"Column paper_qa_summary may already exist: {e}")
if 'rank_score' not in columns:
try:
cursor.execute("ALTER TABLE matched_entries ADD COLUMN rank_score REAL")
columns.add('rank_score')
except Exception as e:
logger.debug(f"Column rank_score may already exist: {e}")
need_recreate = (len(columns) == 0) or (not required_columns.issubset(columns))
if need_recreate:
cursor.execute('''
CREATE TABLE matched_entries (
entry_id TEXT PRIMARY KEY,
feed_name TEXT NOT NULL,
topics TEXT NOT NULL,
title TEXT NOT NULL,
link TEXT NOT NULL,
summary TEXT,
authors TEXT,
abstract TEXT,
doi TEXT,
published_date TEXT,
matched_date TEXT DEFAULT (datetime('now')),
llm_summary TEXT,
paper_qa_summary TEXT,
rank_score REAL
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_matched_entries_topics
ON matched_entries(topics)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_matched_entries_matched_date
ON matched_entries(matched_date)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_matched_entries_entry_id
ON matched_entries(entry_id)
''')
self._create_fts5_trigram(conn, 'matched_entries', ['title', 'summary', 'abstract', 'authors'])
self._create_fts5_keyword(conn, 'matched_entries', ['title', 'summary', 'abstract', 'authors'])
conn.commit()
conn.close()
def _init_current_db(self):
"""Initialize the current run processing database.
Uses a composite primary key on (id, topic) so the same entry can
exist once per topic.
Creates the `entries` table if missing. If present but missing
required columns (abstract, doi), it recreates the table for
testing simplicity.
"""
conn = sqlite3.connect(self.db_paths['current'])
self._apply_pragmas(conn)
cursor = conn.cursor()
# Inspect existing table
cursor.execute("PRAGMA table_info(entries)")
info = cursor.fetchall()
columns = {row[1] for row in info}
required_columns = {
'id', 'topic', 'feed_name', 'title', 'link', 'summary', 'authors',
'abstract', 'doi', 'published_date', 'discovered_date', 'status',
'rank_score', 'rank_reasoning', 'llm_summary'
}
need_recreate = (len(columns) == 0) or (not required_columns.issubset(columns))
if need_recreate:
cursor.execute('DROP TABLE IF EXISTS entries_fts')
cursor.execute('DROP TABLE IF EXISTS entries')
cursor.execute('''
CREATE TABLE entries (
id TEXT NOT NULL,
topic TEXT NOT NULL,
feed_name TEXT NOT NULL,
title TEXT NOT NULL,
link TEXT NOT NULL,
summary TEXT,
authors TEXT,
abstract TEXT,
doi TEXT,
published_date TEXT,
discovered_date TEXT DEFAULT (datetime('now')),
status TEXT DEFAULT 'new' CHECK(status IN ('new', 'filtered', 'ranked', 'summarized')),
rank_score REAL,
rank_reasoning TEXT,
llm_summary TEXT,
paper_qa_summary TEXT,
PRIMARY KEY (id, topic),
UNIQUE(feed_name, topic, id)
)
''')
else:
# Lightweight migrations for new optional columns
if 'paper_qa_summary' not in columns:
try:
cursor.execute("ALTER TABLE entries ADD COLUMN paper_qa_summary TEXT")
except Exception as e:
logger.debug(f"Column paper_qa_summary may already exist in entries table: {e}")
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_entries_topic_status
ON entries(topic, status)
''')
self._create_fts5_trigram(conn, 'entries', ['title', 'summary', 'abstract', 'authors'])
self._create_fts5_keyword(conn, 'entries', ['title', 'summary', 'abstract', 'authors'])
conn.commit()
conn.close()
[docs]
def compute_entry_id(self, entry: Dict[str, Any]) -> str:
"""Generate a stable SHA-1 based ID for a feed entry."""
candidate = entry.get("id") or entry.get("link")
if candidate:
parsed = urllib.parse.urlparse(candidate)
candidate = urllib.parse.urlunparse(
parsed._replace(query="", fragment="")
)
return hashlib.sha1(candidate.encode("utf-8")).hexdigest()
parts = [
entry.get("title", ""),
entry.get("published", entry.get("updated", "")),
]
concat = "||".join(parts)
return hashlib.sha1(concat.encode("utf-8")).hexdigest()
[docs]
def is_new_entry(self, title: str) -> bool:
"""Check if an entry is new (title not in all_feed_entries.db)."""
with self.get_connection('all_feeds', row_factory=False) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM feed_entries WHERE title = ?",
(title,)
)
result = cursor.fetchone()
return result is None
[docs]
def save_feed_entry(self, entry: Dict[str, Any], feed_name: str, entry_id: str):
"""Save an entry to all_feed_entries.db with proper date formatting."""
with self.get_connection('all_feeds', row_factory=False) as conn:
cursor = conn.cursor()
authors = self._extract_authors(entry)
# Ensure published_date is in YYYY-MM-DD format
published_date = self._format_published_date(entry)
title = entry.get('title', '').strip()
cursor.execute('''
INSERT OR REPLACE INTO feed_entries
(entry_id, feed_name, title, link, summary, authors, published_date,
first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?,
COALESCE((SELECT first_seen FROM feed_entries WHERE title = ?), datetime('now')),
datetime('now'))
''', (
entry_id, feed_name,
title,
entry.get('link', ''),
entry.get('summary', entry.get('description', '')),
authors,
published_date,
title, # for COALESCE subquery
))
# Note: helper methods `is_entry_in_history` and `get_entry_topics_from_history`
# were unused and have been removed to reduce surface area.
[docs]
def save_matched_entry(self, entry: Dict[str, Any], feed_name: str, topic: str, entry_id: str):
"""Save a matched entry to matched_entries_history.db, merging topics if entry already exists."""
with self.get_connection('history', row_factory=False) as conn:
cursor = conn.cursor()
# Check if entry already exists in history
cursor.execute(
"SELECT topics FROM matched_entries WHERE entry_id = ?",
(entry_id,)
)
existing = cursor.fetchone()
if existing:
# Entry exists, merge the new topic with existing topics
existing_topics = existing[0].split(', ') if existing[0] else []
if topic not in existing_topics:
existing_topics.append(topic)
merged_topics = ', '.join(sorted(existing_topics))
cursor.execute('''
UPDATE matched_entries
SET topics = ?, matched_date = datetime('now')
WHERE entry_id = ?
''', (merged_topics, entry_id))
logger.debug(f"Updated entry {entry_id[:8]}... with merged topics: {merged_topics}")
else:
logger.debug(f"Entry {entry_id[:8]}... already has topic '{topic}', skipping")
else:
# New entry, insert it
authors = self._extract_authors(entry)
published_date = self._format_published_date(entry)
doi = self._extract_doi(entry)
rank_value = entry.get('rank_score')
if rank_value is not None:
try:
rank_value = float(rank_value)
except (TypeError, ValueError):
rank_value = None
cursor.execute('''
INSERT INTO matched_entries
(entry_id, feed_name, topics, title, link, summary, authors, abstract, doi,
published_date, matched_date, rank_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), ?)
''', (
entry_id, feed_name, topic,
entry.get('title', ''),
entry.get('link', ''),
entry.get('summary', entry.get('description', '')),
authors,
None, # abstract to be populated later (Crossref)
doi,
published_date,
rank_value
))
logger.debug(f"Added new entry {entry_id[:8]}... to history database with topic: {topic}")
[docs]
def save_current_entry(self, entry: Dict[str, Any], feed_name: str, topic: str, entry_id: str):
"""Save an entry to papers.db for current run processing."""
with self.get_connection('current', row_factory=False) as conn:
cursor = conn.cursor()
authors = self._extract_authors(entry)
published_date = self._format_published_date(entry)
doi = self._extract_doi(entry)
cursor.execute('''
INSERT OR REPLACE INTO entries
(id, topic, feed_name, title, link, summary, authors, abstract, doi,
published_date, discovered_date, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), 'filtered')
''', (
entry_id, topic, feed_name,
entry.get('title', ''),
entry.get('link', ''),
entry.get('summary', entry.get('description', '')),
authors,
None, # abstract to be populated later (Crossref)
doi,
published_date,
))
[docs]
def get_current_entries(self, topic: str = None, status: str = None) -> List[Dict[str, Any]]:
"""Get entries from papers.db with optional filtering."""
with self.get_connection('current', row_factory=True) as conn:
cursor = conn.cursor()
query = "SELECT * FROM entries WHERE 1=1"
params = []
if topic:
query += " AND topic = ?"
params.append(topic)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY discovered_date DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
# Convert Row objects to dicts
return [dict(row) for row in rows]
# Note: `get_entries_for_html_generation` has been removed; HTML generation
# reads via `get_current_entries` directly.
[docs]
def clear_current_db(self):
"""Clear the current run database.
Re-initialises the FTS index and triggers before deleting rows
so that DELETE triggers can fire cleanly even if a previous
migration or crash left the FTS table missing.
"""
with self.get_connection('current', row_factory=False) as conn:
cursor = conn.cursor()
# Ensure FTS tables and triggers exist before DELETE fires them.
self._create_fts5_trigram(
conn, 'entries', ['title', 'summary', 'abstract', 'authors']
)
self._create_fts5_keyword(
conn, 'entries', ['title', 'summary', 'abstract', 'authors']
)
cursor.execute("DELETE FROM entries")
[docs]
def update_entry_rank(self, entry_id: str, topic: str, score: float | None, reasoning: str | None = None) -> None:
"""Update rank_score (and optionally rank_reasoning) for a single entry.
Args:
entry_id: Entry identifier (sha1 or normalized link id)
topic: Topic name (composite key component)
score: Rank score to persist (cosine similarity or None)
reasoning: Optional concise reasoning string
"""
with self.get_connection('current', row_factory=False) as conn:
cursor = conn.cursor()
if reasoning is None:
cursor.execute(
"UPDATE entries SET rank_score = ?, status = 'ranked' WHERE id = ? AND topic = ?",
(score, entry_id, topic),
)
else:
cursor.execute(
"UPDATE entries SET rank_score = ?, rank_reasoning = ?, status = 'ranked' WHERE id = ? AND topic = ?",
(score, reasoning, entry_id, topic),
)
[docs]
def update_history_rank(self, entry_id: str, score: float | None) -> None:
"""Update the historical rank_score, keeping the highest score seen."""
with self.get_connection('history', row_factory=False) as conn:
cursor = conn.cursor()
if score is None:
cursor.execute(
"UPDATE matched_entries SET rank_score = NULL WHERE entry_id = ?",
(entry_id,),
)
else:
score_val = float(score)
cursor.execute(
"""
UPDATE matched_entries
SET rank_score = CASE
WHEN rank_score IS NULL OR rank_score < ?
THEN ?
ELSE rank_score
END
WHERE entry_id = ?
""",
(score_val, score_val, entry_id),
)
[docs]
def purge_old_entries(self, days: int):
"""Remove entries from the most recent N days (including today) based on publication date (YYYY-MM-DD)."""
start_date = (datetime.datetime.now().date() - datetime.timedelta(days=days - 1)).isoformat()
end_date = datetime.datetime.now().date().isoformat()
logger.info(f"Purging entries from {start_date} to {end_date} (last {days} days)")
# Purge from all_feed_entries.db based on publication_date
with self.get_connection('all_feeds', row_factory=False) as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM feed_entries
WHERE published_date IS NOT NULL
AND TRIM(published_date) != ''
AND DATE(published_date) BETWEEN DATE(?) AND DATE(?)
""",
(start_date, end_date),
)
deleted_count = cursor.rowcount
logger.info(f"Purged {deleted_count} entries from all_feed_entries.db")
# Purge from matched_entries_history.db based on published_date
with self.get_connection('history', row_factory=False) as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM matched_entries
WHERE published_date IS NOT NULL
AND TRIM(published_date) != ''
AND DATE(published_date) BETWEEN DATE(?) AND DATE(?)
""",
(start_date, end_date),
)
deleted_count = cursor.rowcount
logger.info(f"Purged {deleted_count} entries from matched_entries_history.db")
# Purge from papers.db based on published_date
with self.get_connection('current', row_factory=False) as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM entries
WHERE published_date IS NOT NULL
AND TRIM(published_date) != ''
AND DATE(published_date) BETWEEN DATE(?) AND DATE(?)
""",
(start_date, end_date),
)
deleted_count = cursor.rowcount
logger.info(f"Purged {deleted_count} entries from papers.db")
def _extract_authors(self, entry: Dict[str, Any]) -> str:
"""Extract authors string from entry."""
authors = entry.get('authors', [])
if authors:
return ', '.join(author.get('name', '') for author in authors)
return entry.get('author', '')
def _format_published_date(self, entry: Dict[str, Any]) -> str:
"""Ensure published date is in YYYY-MM-DD format."""
import time
# Try to get parsed date first
entry_published = entry.get('published_parsed') or entry.get('updated_parsed')
if entry_published and isinstance(entry_published, time.struct_time):
return datetime.date(*entry_published[:3]).isoformat()
# Try string dates
published_str = entry.get('published') or entry.get('updated', '')
if published_str:
try:
# Try parsing common date formats
for fmt in ['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S%z', '%a, %d %b %Y %H:%M:%S %z']:
try:
dt = datetime.datetime.strptime(published_str[:19], fmt[:19])
return dt.date().isoformat()
except ValueError:
continue
# If all parsing fails, try to extract YYYY-MM-DD if present
import re
match = re.search(r'(\d{4}-\d{2}-\d{2})', published_str)
if match:
return match.group(1)
except (ValueError, TypeError, re.error) as e:
logger.debug(f"Failed to parse published date '{published_str}': {e}")
# Fallback to current date
return datetime.date.today().isoformat()
def _extract_doi(self, entry: Dict[str, Any]) -> Optional[str]:
"""Best-effort DOI extraction from common RSS fields.
Enhanced to also scan text-bearing fields often used by publishers,
including 'summary', 'summary_detail.value', and 'content[].value'.
Returns a DOI string if found, otherwise None.
"""
return extract_doi_from_entry(entry)
[docs]
@contextmanager
def get_connection(self, db_key: str = 'current', row_factory: bool = True):
"""Context manager for database connections with automatic commit/rollback.
Args:
db_key: Which database to connect to ('current', 'history', 'all_feeds')
row_factory: If True, use sqlite3.Row factory for dict-like row access
Yields:
sqlite3.Connection: Database connection
Example:
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM entries")
# Auto-commits on success, auto-closes always
"""
conn = sqlite3.connect(self.db_paths[db_key])
if row_factory:
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
[docs]
def iter_targets(self, topic: Optional[str] = None, min_rank: Optional[float] = None) -> Iterator[sqlite3.Row]:
"""Iterator for entries that need abstract fetching.
Args:
topic: Optional topic filter (if None, fetches all topics)
min_rank: Optional minimum rank score filter
Yields:
sqlite3.Row: Database rows with dict-like access
"""
with self.get_connection('current') as conn:
cursor = conn.cursor()
query = """
SELECT id, topic, doi, abstract, rank_score, title,
feed_name, summary, link, authors, published_date
FROM entries
WHERE 1=1
"""
params = []
if topic:
query += " AND topic = ?"
params.append(topic)
if min_rank is not None:
query += " AND rank_score >= ?"
params.append(min_rank)
query += " ORDER BY rank_score DESC"
cursor.execute(query, params)
for row in cursor:
yield row
[docs]
def update_abstracts_batch(self, updates: List[tuple]) -> int:
"""Batch update abstracts for multiple entries.
Args:
updates: List of (abstract, doi, entry_id, topic) tuples
Returns:
Number of rows updated
"""
if not updates:
return 0
with self.get_connection('current') as conn:
cursor = conn.cursor()
cursor.executemany(
"UPDATE entries SET abstract = ?, doi = ? WHERE id = ? AND topic = ?",
updates
)
return cursor.rowcount
[docs]
def update_history_abstracts_batch(self, updates: List[tuple]) -> int:
"""Batch update abstracts in history database.
Args:
updates: List of (abstract, doi, entry_id) tuples
Returns:
Number of rows updated
"""
if not updates:
return 0
with self.get_connection('history') as conn:
cursor = conn.cursor()
cursor.executemany(
"UPDATE matched_entries SET abstract = ?, doi = ? WHERE entry_id = ?",
updates
)
return cursor.rowcount
[docs]
def get_entries_by_criteria(self,
topic: Optional[str] = None,
min_rank: Optional[float] = None,
status: Optional[str] = None,
has_doi: Optional[bool] = None,
order_by: str = 'rank_score DESC') -> List[sqlite3.Row]:
"""Flexible query builder for entries with various criteria.
Args:
topic: Optional topic filter
min_rank: Optional minimum rank score
status: Optional status filter
has_doi: If True, only entries with DOI; if False, only without DOI
order_by: ORDER BY clause (default: 'rank_score DESC')
Returns:
List of sqlite3.Row objects with dict-like access
"""
with self.get_connection('current') as conn:
cursor = conn.cursor()
query = "SELECT * FROM entries WHERE 1=1"
params = []
if topic:
query += " AND topic = ?"
params.append(topic)
if min_rank is not None:
query += " AND rank_score >= ?"
params.append(min_rank)
if status:
query += " AND status = ?"
params.append(status)
if has_doi is True:
query += " AND doi IS NOT NULL AND doi != ''"
elif has_doi is False:
query += " AND (doi IS NULL OR doi = '')"
query += f" ORDER BY {order_by}"
cursor.execute(query, params)
return cursor.fetchall()
[docs]
def iter_history_entries(self, entry_ids: List[str]) -> Iterator[sqlite3.Row]:
"""Iterator for history entries by ID.
Args:
entry_ids: List of entry IDs to fetch
Yields:
sqlite3.Row: Database rows with dict-like access
"""
if not entry_ids:
return
with self.get_connection('history') as conn:
cursor = conn.cursor()
placeholders = ','.join(['?'] * len(entry_ids))
query = f"""
SELECT entry_id, feed_name, topics, title, link, summary,
doi, matched_date, abstract, rank_score
FROM matched_entries
WHERE entry_id IN ({placeholders})
"""
cursor.execute(query, entry_ids)
for row in cursor:
yield row
# ------------------------------------------------------------------
# General-purpose query (used by the ``query`` CLI command)
# ------------------------------------------------------------------
_TABLE_MAP = {
'current': 'entries',
'history': 'matched_entries',
'all_feeds': 'feed_entries',
}
_DATE_COL_MAP = {
'current': 'published_date',
'history': 'published_date',
'all_feeds': 'published_date',
}
[docs]
def query_entries(
self,
db_key: str = 'current',
topic: Optional[str] = None,
min_rank: Optional[float] = None,
status: Optional[str] = None,
has_doi: Optional[bool] = None,
has_abstract: Optional[bool] = None,
since: Optional[str] = None,
until: Optional[str] = None,
search: Optional[str] = None,
fuzzy: Optional[str] = None,
order_by: str = 'rank_score DESC',
limit: int = 20,
offset: int = 0,
) -> tuple:
"""General-purpose query across any of the three databases.
Args:
db_key: ``'current'``, ``'history'``, or ``'all_feeds'``
topic: Topic filter (exact match for current, LIKE for history)
min_rank: Minimum rank_score threshold
status: Status filter (current DB only)
has_doi: If True only entries with DOI, if False only without
has_abstract: If True only entries with abstract
since: Published on or after this date (YYYY-MM-DD)
until: Published on or before this date (YYYY-MM-DD)
search: FTS5 keyword search on title + abstract/summary (supports
phrases ``"..."``, prefix ``term*``, boolean ``AND/OR/NOT``)
fuzzy: Fuzzy text search via FTS5 trigram (min 3 chars, mutually
exclusive with *search*)
order_by: SQL ORDER BY clause
limit: Max rows (0 = unlimited)
offset: Skip first N rows
Returns:
``(rows, total_count)`` where *rows* is a list of dicts and
*total_count* is the count before LIMIT/OFFSET.
"""
if search and fuzzy:
raise ValueError("--search and --fuzzy are mutually exclusive")
if fuzzy and len(fuzzy) < 3:
raise ValueError("--fuzzy requires at least 3 characters (trigram tokenizer minimum)")
table = self._TABLE_MAP[db_key]
date_col = self._DATE_COL_MAP[db_key]
conditions: list[str] = []
params: list = []
# Topic
if topic:
if db_key == 'history':
conditions.append("topics LIKE ?")
params.append(f"%{topic}%")
elif db_key == 'current':
conditions.append("topic = ?")
params.append(topic)
# all_feeds has no topic column — silently ignored
# Rank
if min_rank is not None:
conditions.append("rank_score >= ?")
params.append(min_rank)
# Status
if status:
conditions.append("status = ?")
params.append(status)
# DOI
if has_doi is True:
conditions.append("doi IS NOT NULL AND doi != ''")
elif has_doi is False:
conditions.append("(doi IS NULL OR doi = '')")
# Abstract
if has_abstract is True:
conditions.append("abstract IS NOT NULL AND abstract != ''")
# Date range
if since:
conditions.append(f"{date_col} >= ?")
params.append(since)
if until:
conditions.append(f"{date_col} <= ?")
params.append(until)
# Keyword search via FTS5 (porter stemming, BM25 ranking)
kw_table = f"{table}_kw"
if search:
conditions.append(
f"rowid IN (SELECT rowid FROM {kw_table} WHERE {kw_table} MATCH ?)"
)
params.append(search)
# Fuzzy search via FTS5 trigram
fts_table = f"{table}_fts"
if fuzzy:
# Escape double quotes in the search term for FTS5 phrase matching
escaped = fuzzy.replace('"', '""')
conditions.append(
f"rowid IN (SELECT rowid FROM {fts_table} WHERE {fts_table} MATCH ?)"
)
params.append(f'"{escaped}"')
where = (" WHERE " + " AND ".join(conditions)) if conditions else ""
with self.get_connection(db_key) as conn:
cursor = conn.cursor()
# Total count (before limit/offset)
cursor.execute(f"SELECT COUNT(*) FROM {table}{where}", params)
total = cursor.fetchone()[0]
# Fetch rows
query = f"SELECT * FROM {table}{where} ORDER BY {order_by}"
if limit:
query += f" LIMIT {int(limit)}"
if offset:
query += f" OFFSET {int(offset)}"
cursor.execute(query, params)
columns = [desc[0] for desc in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
# Attach BM25 scores when keyword search is active
if search and rows:
rowids = [r.get("rowid") for r in rows]
# rows from SELECT * may not include rowid; fetch separately
id_col = "id" if "id" in rows[0] else None
if id_col and not any(r.get("rowid") for r in rows):
# Retrieve rowids for returned rows via their id
placeholders = ",".join("?" * len(rows))
id_vals = [r[id_col] for r in rows]
cursor.execute(
f"SELECT rowid, {id_col} FROM {table} WHERE {id_col} IN ({placeholders})",
id_vals,
)
id_to_rowid = {row[1]: row[0] for row in cursor.fetchall()}
rowids = [id_to_rowid.get(r[id_col]) for r in rows]
if rowids and rowids[0] is not None:
bm25_map: dict[int, float] = {}
for rid in rowids:
cursor.execute(
f"SELECT rank FROM {kw_table} WHERE {kw_table} MATCH ? AND rowid = ?",
(search, rid),
)
bm25_row = cursor.fetchone()
if bm25_row:
bm25_map[rid] = bm25_row[0]
for r, rid in zip(rows, rowids):
r["bm25_score"] = bm25_map.get(rid)
return rows, total
[docs]
def close_all_connections(self):
"""Close any open database connections (placeholder for connection pooling)."""
pass