"""Configuration management for YAML-based config files."""
import os
import logging
import re
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from .paths import get_data_dir, get_system_path
logger = logging.getLogger(__name__)
DEFAULT_CONFIG_DIR = get_data_dir() / "config"
DEFAULT_CONFIG_PATH = DEFAULT_CONFIG_DIR / "config.yaml"
_TEMPLATE_DIR = get_system_path("config")
_TEMPLATE_CONFIG = _TEMPLATE_DIR / "config.yaml"
_TEMPLATE_TOPICS_DIR = _TEMPLATE_DIR / "topics"
_TEMPLATE_SECRETS_DIR = _TEMPLATE_DIR / "secrets"
_DEFAULT_EMAIL_SECRET = "# Placeholder SMTP password file. Replace with real credentials.\n"
_DEFAULT_CONFIG_TEMPLATE = """# Auto-generated default configuration for paper-firehose
database:
path: "papers.db"
all_feeds_path: "all_feed_entries.db"
history_path: "matched_entries_history.db"
feeds:
cond-mat:
name: "arXiv cond-mat"
url: "https://rss.arxiv.org/rss/cond-mat"
enabled: true
priority_journals: []
defaults:
time_window_days: 365
top_n_per_topic: 10
rank_threshold: 0.3
ranking_negative_penalty: 0.25
"""
_DEFAULT_TOPIC_TEMPLATE = """name: "example"
description: "Auto-generated starter topic. Update the regex and feeds for your workflow."
feeds:
- "cond-mat"
filter:
pattern: "graphene"
fields: ["title", "summary"]
ranking:
query: >
graphene
condensed matter
"""
def _write_template(path: Path, content: str) -> None:
"""Write templated YAML content to disk with a trailing newline."""
path.write_text(content.strip() + "\n", encoding="utf-8")
def _copy_tree(src: Path, dest: Path) -> bool:
"""Copy files from *src* to *dest* without overwriting existing files."""
if not src.exists():
return False
created = False
for item in src.iterdir():
target = dest / item.name
if item.is_dir():
target.mkdir(parents=True, exist_ok=True)
if _copy_tree(item, target):
created = True
else:
if not target.exists():
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(item, target)
created = True
return created
# Known configuration keys — anything not listed here triggers a warning.
# Top-level keys map to sets of allowed sub-keys (None = no sub-key check).
_KNOWN_MAIN_KEYS: Dict[str, Optional[Dict[str, Any]]] = {
"database": {"path", "all_feeds_path", "history_path"},
"feeds": None, # dynamic feed names, each checked separately
"priority_journals": None,
"priority_journal_boost": None,
"defaults": {
"time_window_days": None,
"top_n_per_topic": None,
"rank_threshold": None,
"ranking_negative_penalty": None,
"abstracts": {"mailto", "rps", "max_retries"},
},
"email": {
"recipients_file": None,
"subject_prefix": None,
"from": None,
"smtp": {"host", "port", "username", "password_file"},
},
}
_KNOWN_FEED_KEYS = {"name", "url", "enabled"}
_KNOWN_TOPIC_KEYS: Dict[str, Optional[Dict[str, Any]]] = {
"name": None,
"description": None,
"feeds": None,
"filter": {"pattern", "fields"},
"ranking": {
"query", "model", "negative_queries", "preferred_authors",
"priority_author_boost", "negative_penalty",
},
"abstract_fetch": {"enabled", "rank_threshold"},
"paperqa": {
"download_rank_threshold", "rps", "max_retries",
"llm", "summary_llm", "prompt",
},
"output": {"filename", "filename_ranked", "filename_summary", "archive"},
}
def _check_keys(data: Dict[str, Any], known: Dict[str, Any],
prefix: str) -> List[str]:
"""Return warnings for keys in *data* that are not in *known*.
*known* maps key names to either ``None`` (leaf — no sub-key check),
a ``set`` of allowed sub-key names (flat section), or a ``dict``
mapping sub-key names to their own allowed sub-keys (nested section).
"""
warnings: List[str] = []
if not isinstance(data, dict):
return warnings
for key in data:
full = f"{prefix}.{key}" if prefix else key
if key not in known:
warnings.append(f"Unknown key '{full}'")
continue
spec = known[key]
if spec is None:
continue
child = data[key]
if not isinstance(child, dict):
continue
if isinstance(spec, set):
for sub in child:
if sub not in spec:
warnings.append(f"Unknown key '{full}.{sub}'")
elif isinstance(spec, dict):
warnings.extend(_check_keys(child, spec, full))
return warnings
[docs]
class ConfigManager:
"""Manages loading and validation of YAML configuration files."""
def __init__(self, config_path: Optional[str] = None):
"""Initialize the manager and ensure baseline config/topic files exist."""
path = Path(config_path or DEFAULT_CONFIG_PATH).expanduser()
if not path.is_absolute():
path = path.resolve()
self.config_path = str(path)
self.base_dir = str(path.parent)
self._config = None
self._topics = {}
self._ensure_default_config()
[docs]
def load_config(self) -> Dict[str, Any]:
"""Load the main configuration file."""
if self._config is None:
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
self._config = yaml.safe_load(f)
logger.info(f"Loaded configuration from {self.config_path}")
except Exception as e:
logger.error(f"Failed to load config from {self.config_path}: {e}")
raise
return self._config
def _resolve_topic_path(self, topic_name: str) -> Path:
"""Return the filesystem path for *topic_name* supporting .yaml and .yml."""
topics_dir = Path(self.base_dir) / "topics"
candidates = [topics_dir / f"{topic_name}.yaml", topics_dir / f"{topic_name}.yml"]
for candidate in candidates:
if candidate.exists():
return candidate
# Final fallback: scan the directory in case the caller used mixed case
# or the file includes extra dots in its name (e.g., topic.test.yaml).
pattern = f"{topic_name}.*"
for candidate in topics_dir.glob(pattern):
if candidate.suffix.lower() in {".yaml", ".yml"}:
return candidate
raise FileNotFoundError(
f"Topic configuration file for '{topic_name}' not found (.yaml or .yml) in {topics_dir}"
)
[docs]
def load_topic_config(self, topic_name: str) -> Dict[str, Any]:
"""Load a topic-specific configuration file."""
if topic_name not in self._topics:
topic_path = self._resolve_topic_path(topic_name)
try:
with open(topic_path, 'r', encoding='utf-8') as f:
self._topics[topic_name] = yaml.safe_load(f)
logger.info("Loaded topic config for '%s' from %s", topic_name, topic_path)
except Exception as e:
logger.error("Failed to load topic config from %s: %s", topic_path, e)
raise
return self._topics[topic_name]
def _ensure_default_config(self) -> None:
"""Create default configuration files if they are missing."""
config_file = Path(self.config_path)
config_file.parent.mkdir(parents=True, exist_ok=True)
if not config_file.exists():
if _TEMPLATE_CONFIG.exists():
try:
shutil.copyfile(_TEMPLATE_CONFIG, config_file)
logger.info("Created default config.yaml at %s", config_file)
except Exception as exc:
logger.warning("Failed to copy template config: %s", exc)
_write_template(config_file, _DEFAULT_CONFIG_TEMPLATE)
else:
_write_template(config_file, _DEFAULT_CONFIG_TEMPLATE)
logger.info("Created fallback default config.yaml at %s", config_file)
topics_dir = Path(self.base_dir) / "topics"
secrets_dir = Path(self.base_dir) / "secrets"
# Only seed templates if directories don't exist (one-time initialization)
topics_existed = topics_dir.exists()
secrets_existed = secrets_dir.exists()
topics_dir.mkdir(parents=True, exist_ok=True)
secrets_dir.mkdir(parents=True, exist_ok=True)
created_topic = False
if not topics_existed:
try:
if _copy_tree(_TEMPLATE_TOPICS_DIR, topics_dir):
created_topic = True
except Exception as exc:
logger.warning("Failed to copy topics template tree: %s", exc)
if not secrets_existed:
try:
_copy_tree(_TEMPLATE_SECRETS_DIR, secrets_dir)
except Exception as exc:
logger.warning("Failed to copy secrets template tree: %s", exc)
# Ensure critical secret placeholders exist even if the template tree lacks them
placeholders = {
"email_password.env": _DEFAULT_EMAIL_SECRET,
}
for filename, content in placeholders.items():
target = secrets_dir / filename
if target.exists():
continue
try:
target.write_text(content, encoding="utf-8")
except Exception as exc:
logger.warning("Failed to create placeholder secret %s: %s", target, exc)
if not any(topics_dir.glob("*.yml")) and not any(topics_dir.glob("*.yaml")):
default_topic_path = topics_dir / "example.yaml"
_write_template(default_topic_path, _DEFAULT_TOPIC_TEMPLATE)
created_topic = True
logger.info("Created fallback default topic config at %s", default_topic_path)
if _TEMPLATE_DIR.exists():
for item in _TEMPLATE_DIR.iterdir():
if not item.is_dir() or item.name in {"topics", "secrets"}:
continue
dest_dir = Path(self.base_dir) / item.name
# Only seed templates if directory doesn't exist (one-time initialization)
dest_existed = dest_dir.exists()
dest_dir.mkdir(parents=True, exist_ok=True)
if not dest_existed:
try:
_copy_tree(item, dest_dir)
except Exception as exc:
logger.warning("Failed to copy template directory %s: %s", item, exc)
if created_topic:
self._topics.clear()
[docs]
def get_available_topics(self) -> List[str]:
"""Get list of available topic configuration files."""
topics_dir = os.path.join(self.base_dir, "topics")
if not os.path.exists(topics_dir):
return []
topics = []
for filename in os.listdir(topics_dir):
if filename.endswith('.yaml') or filename.endswith('.yml'):
topic_name = os.path.splitext(filename)[0]
topics.append(topic_name)
return topics
# Note: `get_feeds_for_topic` removed as unused by current code paths.
[docs]
def get_enabled_feeds(self) -> Dict[str, Dict[str, Any]]:
"""Get all enabled feeds from the main configuration."""
config = self.load_config()
feeds = config.get('feeds', {})
enabled_feeds = {}
for feed_name, feed_config in feeds.items():
if feed_config.get('enabled', True):
enabled_feeds[feed_name] = feed_config
return enabled_feeds
[docs]
def get_priority_journals(self) -> List[str]:
"""Get the list of priority journals."""
config = self.load_config()
return config.get('priority_journals', [])
[docs]
def check_unknown_keys(self) -> List[str]:
"""Return warnings for unrecognised keys in main and topic configs."""
warnings: List[str] = []
try:
config = self.load_config()
except Exception:
return warnings
# Main config top-level and nested keys
warnings.extend(
_check_keys(config, _KNOWN_MAIN_KEYS, "config")
)
# Per-feed sub-keys
feeds = config.get("feeds")
if isinstance(feeds, dict):
for feed_name, feed_cfg in feeds.items():
if isinstance(feed_cfg, dict):
for sub in feed_cfg:
if sub not in _KNOWN_FEED_KEYS:
warnings.append(
f"Unknown key 'config.feeds.{feed_name}.{sub}'"
)
# Topic configs
for topic_name in self.get_available_topics():
try:
topic_cfg = self.load_topic_config(topic_name)
except Exception:
continue
warnings.extend(
_check_keys(topic_cfg, _KNOWN_TOPIC_KEYS,
f"topic[{topic_name}]")
)
return warnings
[docs]
def validate_config(self) -> bool:
"""Validate the configuration files."""
try:
# Validate main config
config = self.load_config()
required_sections = ['database', 'feeds']
for section in required_sections:
if section not in config:
logger.error(f"Missing required section '{section}' in main config")
return False
# Validate database paths
db_config = config['database']
required_db_keys = ['path', 'all_feeds_path', 'history_path']
for key in required_db_keys:
if key not in db_config:
logger.error(f"Missing required database path '{key}'")
return False
# Validate priority_journals keys and optional boost type
priority_journals = config.get('priority_journals', [])
if priority_journals is not None and not isinstance(priority_journals, list):
logger.error("'priority_journals' must be a list of feed keys in config.yaml")
return False
if isinstance(priority_journals, list):
available_feeds = list(config['feeds'].keys())
for feed_key in priority_journals:
if feed_key not in available_feeds:
logger.warning(f"priority_journals contains unknown feed key '{feed_key}'")
# Optional global boost
if 'priority_journal_boost' in config:
pj_boost = config.get('priority_journal_boost')
if not isinstance(pj_boost, (int, float)):
logger.error("'priority_journal_boost' must be a number (int/float)")
return False
# Validate topic configs
topics = self.get_available_topics()
for topic in topics:
topic_config = self.load_topic_config(topic)
# Check required fields
required_topic_keys = ['name', 'feeds', 'filter']
for key in required_topic_keys:
if key not in topic_config:
logger.error(f"Missing required key '{key}' in topic '{topic}'")
return False
# Validate feeds exist in main config
topic_feeds = topic_config['feeds']
available_feeds = list(config['feeds'].keys())
for feed in topic_feeds:
if feed not in available_feeds:
logger.error(f"Topic '{topic}' references unknown feed '{feed}'")
return False
# Validate filter pattern presence and compilability
filter_cfg = topic_config.get('filter', {})
pattern = filter_cfg.get('pattern')
if not isinstance(pattern, str) or not pattern.strip():
logger.error(f"Topic '{topic}' filter.pattern must be a non-empty string")
return False
try:
re.compile(pattern, re.IGNORECASE)
except re.error as e:
logger.error(f"Topic '{topic}' filter.pattern is not a valid regex: {e}")
return False
# Optional ranking config validation
ranking_cfg = topic_config.get('ranking', {}) or {}
if ranking_cfg:
neg = ranking_cfg.get('negative_queries')
if neg is not None:
if not isinstance(neg, list) or not all(isinstance(x, str) for x in neg):
logger.error(f"Topic '{topic}' ranking.negative_queries must be a list of strings")
return False
pref = ranking_cfg.get('preferred_authors')
if pref is not None:
if not isinstance(pref, list) or not all(isinstance(x, str) for x in pref):
logger.error(f"Topic '{topic}' ranking.preferred_authors must be a list of strings")
return False
pab = ranking_cfg.get('priority_author_boost')
if pab is not None and not isinstance(pab, (int, float)):
logger.error(f"Topic '{topic}' ranking.priority_author_boost must be a number (int/float)")
return False
logger.info("Configuration validation passed")
return True
except (yaml.YAMLError, KeyError, TypeError, ValueError, OSError) as e:
logger.error(f"Configuration validation failed: {e}")
return False
__all__ = [
"ConfigManager",
"DEFAULT_CONFIG_PATH",
"DEFAULT_CONFIG_DIR",
]