Source code for paper_firehose.commands.rank

"""
Rank command: compute and write rank scores into papers.db (``rank_score``).

Initial minimal version
-----------------------

- Read per-topic ranking config (query, model).
- Fetch entries with ``status='filtered'`` for the topic(s).
- Compute cosine similarity (Sentence-Transformers) between query and title.
- Write scores to ``rank_score`` (no status change).

Notes
-----

- If Sentence-Transformers is unavailable or model download fails, the command logs
  and skips scoring without raising.
"""

from __future__ import annotations

# Set before any heavy imports to silence HF tokenizers warning.
import os as _os
_os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

import logging
from typing import Optional, List, Dict, Any
import unicodedata
import re

from ..core.config import ConfigManager
from ..core.database import DatabaseManager
from ..core.command_utils import resolve_topics
from ..core.text_utils import strip_accents, normalize_name, parse_name_parts, names_match
from ..core.model_manager import ensure_local_model
from ..processors.st_ranker import STRanker

logger = logging.getLogger(__name__)


def _build_entry_text(entry: Dict[str, Any]) -> str:
    """Return the text to be ranked for an entry (title-only for now)."""
    # Keep minimal as requested; can switch to title+summary later
    return (entry.get("title") or "").strip()


def _entry_has_preferred_author(entry: Dict[str, Any], preferred_authors: List[str]) -> bool:
    """Return True when entry authors overlap with the preferred author patterns."""
    if not preferred_authors:
        return False
    authors_blob = entry.get("authors") or ""
    parts = re.split(r"[,;]", authors_blob)
    authors = [p.strip() for p in parts if p.strip()]
    if not authors:
        return False
    for want in preferred_authors:
        for have in authors:
            if names_match(have, want):
                return True
    return False


[docs] def run( config_path: str, topic: Optional[str] = None, *, output_json: bool = False, ) -> Optional[Dict[str, Any]]: """ Compute rank scores and write them into papers.db (rank_score). Args: config_path: Path to main config topic: Optional topic name; if None, process all topics output_json: When True, suppress log noise and return a result dict. Returns: Result dict when *output_json* is True, otherwise None. """ if output_json: logging.getLogger("paper_firehose").setLevel(logging.WARNING) logger.info("Starting rank command (write scores only)") cfg_mgr = ConfigManager(config_path) if not cfg_mgr.validate_config(): raise ValueError("Configuration validation failed") config = cfg_mgr.load_config() db = DatabaseManager(config) topics = resolve_topics(cfg_mgr, topic) topic_results: Dict[str, Dict[str, int]] = {} for topic_name in topics: try: tcfg = cfg_mgr.load_topic_config(topic_name) except Exception as e: logger.error("Failed to load topic '%s': %s", topic_name, e) continue ranking_cfg = (tcfg.get("ranking") or {}) if isinstance(tcfg, dict) else {} query = ranking_cfg.get("query") or "" model_spec = ranking_cfg.get("model") or "all-MiniLM-L6-v2" # Ensure local vendored model (best-effort); falls back to spec on failure model_name = ensure_local_model(model_spec) if model_name != model_spec: logger.info("Topic '%s': using local model at %s", topic_name, model_name) negative_terms = [ t.strip() for t in (ranking_cfg.get("negative_queries") or []) if isinstance(t, str) and t.strip() ] preferred_authors = [ t.strip() for t in (ranking_cfg.get("preferred_authors") or []) if isinstance(t, str) and t.strip() ] author_boost = float(ranking_cfg.get("priority_author_boost") or 0.0) # Global priority journal boost prio_keys = set(config.get("priority_journals", []) or []) feeds_cfg = (config.get("feeds") or {}) prio_display_names = set() for k in prio_keys: feed = feeds_cfg.get(k) if isinstance(feed, dict): name = feed.get("name") if name: prio_display_names.add(str(name)) journal_boost = float(config.get("priority_journal_boost") or 0.0) if not query: logger.warning("Topic '%s' has no ranking.query; skipping.", topic_name) continue # Load candidate entries from papers.db entries = db.get_current_entries(topic=topic_name, status="filtered") if not entries: logger.info("No filtered entries for topic '%s'", topic_name) continue # Prepare ranker ranker = STRanker(model_name=model_name) if not ranker.available(): logger.warning("Ranker unavailable for topic '%s'; skipping.", topic_name) continue # Build batch (id, topic, text) batch = [(e["id"], e["topic"], _build_entry_text(e)) for e in entries] scores = ranker.score_entries(query, batch) # Apply simple downweight for entries containing any negative term in title or summary if negative_terms: neg_set = {t.lower() for t in negative_terms} # Build quick lookup from (id, topic) -> entry for text access entry_by_key = {(e["id"], e["topic"]): e for e in entries} adjusted: list[tuple[str, str, float]] = [] penalized = 0 # Negative penalty configurable: topic.ranking.negative_penalty or defaults.ranking_negative_penalty (global), default 0.25 global_neg_pen = float((config.get("defaults") or {}).get("ranking_negative_penalty", 0.25)) neg_penalty = float(ranking_cfg.get("negative_penalty", global_neg_pen)) for eid, tname, score in scores: entry = entry_by_key.get((eid, tname)) or {} title = (entry.get("title") or "").lower() summary = (entry.get("summary") or "").lower() blob = f"{title} {summary}" has_negative = any(term in blob for term in neg_set) if has_negative: # Subtract a configurable penalty and clamp to [0, 1] new_score = max(0.0, float(score) - neg_penalty) penalized += 1 else: new_score = float(score) adjusted.append((eid, tname, new_score)) logger.info( "Topic '%s': applied negative term penalty to %d entries", topic_name, penalized ) scores = adjusted # Write scores with boosts updated = 0 boosted_auth = 0 boosted_jour = 0 entry_by_key = {(e["id"], e["topic"]): e for e in entries} for eid, tname, score in scores: s = float(score) entry = entry_by_key.get((eid, tname)) or {} # Preferred author boost if preferred_authors and author_boost > 0 and _entry_has_preferred_author(entry, preferred_authors): s += author_boost boosted_auth += 1 # Priority journal boost by display name if journal_boost > 0: feed_name = (entry.get("feed_name") or "").strip() if feed_name in prio_display_names: s += journal_boost boosted_jour += 1 s = max(0.0, min(1.0, s)) try: db.update_entry_rank(eid, tname, s) try: db.update_history_rank(eid, s) except Exception as history_err: logger.debug( "Topic '%s': failed to persist rank_score to history for %s: %s", tname, eid[:8], history_err, ) updated += 1 except Exception as e: logger.error("Failed to update rank for %s/%s: %s", eid[:8], tname, e) if preferred_authors and author_boost > 0: logger.info( "Topic '%s': applied preferred author boost to %d entries (+%.2f)", topic_name, boosted_auth, author_boost, ) if journal_boost > 0: logger.info( "Topic '%s': applied priority journal boost to %d entries (+%.2f)", topic_name, boosted_jour, journal_boost, ) logger.info("Topic '%s': wrote rank_score for %d entries", topic_name, updated) topic_results[topic_name] = { "ranked": updated, "boosted_author": boosted_auth, "boosted_journal": boosted_jour, } # HTML generation moved to the standalone `html` command. db.close_all_connections() logger.info("Rank command completed") if output_json: return { "command": "rank", "topics": topic_results, "total_ranked": sum(t["ranked"] for t in topic_results.values()), } return None