Source code for paper_firehose.commands.export_recent
"""
Export recent command implementation.
Creates a smaller database containing only recent entries from the history database.
"""
import os
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import Optional
from ..core.config import ConfigManager
from ..core.paths import resolve_data_file
logger = logging.getLogger(__name__)
[docs]
def run(config_path: str, days: int = 60, output_name: Optional[str] = None) -> None:
"""Export recent entries from matched_entries_history.db to a smaller file.
Args:
config_path: Path to the main configuration file
days: Number of days to include (default: 60)
output_name: Optional output filename (default: matched_entries_history.recent.db)
"""
logger.info(f"Starting export-recent command (last {days} days)")
try:
# Initialize components
config_manager = ConfigManager(config_path)
config = config_manager.load_config()
# Resolve database paths
history_db_path = str(resolve_data_file(config['database']['history_path']))
if output_name:
output_db_path = str(resolve_data_file(output_name, ensure_parent=True))
else:
# Default: matched_entries_history.recent.db in same directory as history DB
history_dir = os.path.dirname(history_db_path)
output_db_path = os.path.join(history_dir, 'matched_entries_history.recent.db')
# Check source database exists
if not os.path.exists(history_db_path):
logger.error(f"Source database not found: {history_db_path}")
return
# Calculate cutoff date
cutoff_date = datetime.now() - timedelta(days=days)
cutoff_str = cutoff_date.strftime('%Y-%m-%d')
logger.info(f"Cutoff date: {cutoff_str}")
# Connect to source database
src_conn = sqlite3.connect(history_db_path)
src_conn.row_factory = sqlite3.Row
src_cursor = src_conn.cursor()
# Get schema from source database
src_cursor.execute("PRAGMA table_info(matched_entries)")
columns_info = src_cursor.fetchall()
columns = [col[1] for col in columns_info]
logger.info(f"Source database schema has {len(columns)} columns")
# Count total entries
src_cursor.execute("SELECT COUNT(*) FROM matched_entries")
total_entries = src_cursor.fetchone()[0]
logger.info(f"Total entries in source database: {total_entries}")
# Query recent entries
# matched_date format is typically 'YYYY-MM-DD HH:MM:SS' or 'YYYY-MM-DD'
query = """
SELECT * FROM matched_entries
WHERE matched_date >= ?
ORDER BY matched_date DESC
"""
src_cursor.execute(query, (cutoff_str,))
recent_entries = src_cursor.fetchall()
recent_count = len(recent_entries)
logger.info(f"Found {recent_count} entries from the last {days} days")
if recent_count == 0:
logger.warning(f"No entries found in the last {days} days")
# Create destination database
if os.path.exists(output_db_path):
os.remove(output_db_path)
logger.info(f"Removed existing output database: {output_db_path}")
dest_conn = sqlite3.connect(output_db_path)
dest_cursor = dest_conn.cursor()
# Create table with same schema
src_cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='matched_entries'")
create_table_sql = src_cursor.fetchone()[0]
dest_cursor.execute(create_table_sql)
# Create indexes
src_cursor.execute("SELECT sql FROM sqlite_master WHERE type='index' AND tbl_name='matched_entries'")
index_sqls = src_cursor.fetchall()
for (index_sql,) in index_sqls:
if index_sql: # Some indexes may be auto-created (NULL sql)
dest_cursor.execute(index_sql)
logger.info("Created destination database with matching schema")
# Copy filtered entries
if recent_entries:
placeholders = ','.join(['?' for _ in columns])
insert_sql = f"INSERT INTO matched_entries ({','.join(columns)}) VALUES ({placeholders})"
# Convert Row objects to tuples
rows_to_insert = [tuple(row[col] for col in columns) for row in recent_entries]
dest_cursor.executemany(insert_sql, rows_to_insert)
dest_conn.commit()
logger.info(f"Copied {recent_count} entries to destination database")
# Get file sizes
src_size_mb = os.path.getsize(history_db_path) / (1024 * 1024)
dest_size_mb = os.path.getsize(output_db_path) / (1024 * 1024)
# Log statistics
logger.info("=" * 60)
logger.info("Export Summary:")
logger.info(f" Source: {history_db_path}")
logger.info(f" Destination: {output_db_path}")
logger.info(f" Time range: Last {days} days (since {cutoff_str})")
logger.info(f" Total entries: {total_entries}")
logger.info(f" Recent entries: {recent_count}")
logger.info(f" Percentage: {(recent_count/total_entries*100) if total_entries > 0 else 0:.1f}%")
logger.info(f" Source size: {src_size_mb:.2f} MB")
logger.info(f" Output size: {dest_size_mb:.2f} MB")
logger.info(f" Size reduction: {((src_size_mb - dest_size_mb)/src_size_mb*100) if src_size_mb > 0 else 0:.1f}%")
logger.info("=" * 60)
# Close connections
src_conn.close()
dest_conn.close()
logger.info("Export-recent command completed successfully")
except Exception as e:
logger.error(f"Export-recent command failed: {e}")
raise