Source code for paper_firehose.commands.filter

"""
Filter command implementation.
Fetches RSS feeds, applies regex filters, and writes results to databases.
HTML rendering is handled exclusively by the `html` command.
"""

import os
import logging
from typing import Any, Dict, Optional

from ..core.config import ConfigManager
from ..core.database import DatabaseManager
from ..core.command_utils import resolve_topics
from ..processors.feed_processor import FeedProcessor

logger = logging.getLogger(__name__)


[docs] def run( config_path: str, topic: Optional[str] = None, *, output_json: bool = False, ) -> Optional[Dict[str, Any]]: """Run the filtering pipeline for one or all topics. Args: config_path: Path to the main configuration file topic: Optional specific topic to process (if ``None``, process every topic) output_json: When True, suppress log noise and return a result dict. Returns: Result dict when *output_json* is True, otherwise None. """ if output_json: logging.getLogger("paper_firehose").setLevel(logging.WARNING) logger.info("Starting filter command") try: # Initialize components config_manager = ConfigManager(config_path) # Validate configuration if not config_manager.validate_config(): raise ValueError("Configuration validation failed") # Load main config config = config_manager.load_config() # Initialize database manager db_manager = DatabaseManager(config) # Local safety: backup important databases before we modify them db_manager.backup_important_databases() # Clear current run database db_manager.clear_current_db() # Initialize processors feed_processor = FeedProcessor(db_manager, config_manager) # Determine topics to process topics_to_process = resolve_topics(config_manager, topic) if topic: logger.info(f"Processing specific topic: {topic}") else: logger.info(f"Processing all topics: {topics_to_process}") # Process each topic all_processed_entries = {} # Track all entries for saving to dedup DB later topic_counts: Dict[str, int] = {} for topic_name in topics_to_process: try: logger.info(f"Processing topic: {topic_name}") # Load topic configuration topic_config = config_manager.load_topic_config(topic_name) # Fetch feeds first (don't save to dedup DB yet) entries_per_feed = feed_processor.fetch_feeds(topic_name) # Debug: summarize fetched counts per feed try: fetched_total = sum(len(v) for v in entries_per_feed.values()) logger.info(f"Fetched {fetched_total} new entries across {len(entries_per_feed)} feeds for topic '{topic_name}'") for fk, lst in entries_per_feed.items(): logger.debug(f" Feed '{fk}' fetched {len(lst)} new entries (post-dedup)") except Exception: pass # Collect all entries for later saving to dedup DB for feed_name, entries in entries_per_feed.items(): if feed_name not in all_processed_entries: all_processed_entries[feed_name] = [] all_processed_entries[feed_name].extend(entries) # Apply filters and save to papers.db/history.db as appropriate matched_entries = feed_processor.apply_filters(entries_per_feed, topic_name) topic_counts[topic_name] = len(matched_entries) logger.info(f"Completed processing topic '{topic_name}': {len(matched_entries)} entries") except Exception as e: logger.error(f"Error processing topic '{topic_name}': {e}") continue # Save ALL processed entries to deduplication database if all_processed_entries: feed_processor.save_all_entries_to_dedup_db(all_processed_entries) # Close database connections db_manager.close_all_connections() logger.info("Filter command completed successfully") if output_json: return { "command": "filter", "topics": topic_counts, "total_matched": sum(topic_counts.values()), } except Exception as e: logger.error(f"Filter command failed: {e}") raise return None
[docs] def purge(config_path: str, days: Optional[int] = None, all_data: bool = False) -> None: """ Purge old entries from databases. Args: config_path: Path to the main configuration file days: Number of days to keep (if None and not all_data, keep all) all_data: If True, clear all databases completely """ logger.info("Starting purge command") try: # Initialize components config_manager = ConfigManager(config_path) config = config_manager.load_config() db_manager = DatabaseManager(config) # Safety: backup important databases before purge db_manager.backup_important_databases() if all_data: logger.info("Purging all data from databases") # Clear all databases for db_path in db_manager.db_paths.values(): if os.path.exists(db_path): os.remove(db_path) logger.info(f"Removed database: {db_path}") # Reinitialize databases db_manager._init_databases() logger.info("Databases reinitialized") elif days is not None: logger.info(f"Purging entries from the most recent {days} days (including today)") db_manager.purge_old_entries(days) logger.info(f"Purge completed for entries from the most recent {days} days") else: logger.warning("No purge action specified (use --days X or --all)") db_manager.close_all_connections() logger.info("Purge command completed") except Exception as e: logger.error(f"Purge command failed: {e}") raise