"""
Multi-source abstract fetcher with fallback logic.
Orchestrates fetching abstracts from multiple sources (Crossref, Semantic Scholar,
OpenAlex, PubMed) with intelligent fallback strategies based on journal/domain.
"""
from __future__ import annotations
import time
import logging
from typing import Optional, Dict, Any, Iterable
import requests
from ..core.database import DatabaseManager
from ..core.apis import (
get_crossref_abstract,
search_crossref_abstract_by_title,
get_semantic_scholar_abstract,
get_openalex_abstract,
get_pubmed_abstract_by_doi,
)
from ..core.text_utils import clean_abstract_for_db
from ..core.abstract_source import AbstractSource, get_default_sources, get_biomedical_sources
logger = logging.getLogger(__name__)
[docs]
def try_abstract_sources(
sources: list[AbstractSource],
doi: Optional[str],
title: Optional[str],
*,
mailto: str,
session: Optional[requests.Session]
) -> Optional[str]:
"""Try fetching abstract from a list of sources in order.
Args:
sources: List of AbstractSource instances to try in order
doi: Digital Object Identifier (optional)
title: Paper title (optional)
mailto: Contact email for API calls
session: requests.Session for API calls
Returns:
Abstract text or None if not found from any source
"""
for source in sources:
source_name = source.__class__.__name__
try:
result = source.fetch_abstract(
doi=doi, title=title, mailto=mailto, session=session
)
if result:
logger.debug(f"Abstract fetched successfully from {source_name}")
return result
except Exception as e:
logger.warning(f"Failed to fetch abstract from {source_name}: {e}")
# Continue to next source on error
continue
logger.debug(f"No abstract found from {len(sources)} sources (doi={doi}, title={title[:50] if title else None}...)")
return None
[docs]
def try_publisher_apis(
doi: Optional[str],
feed_name: str,
link: str,
*,
mailto: str,
session: Optional[requests.Session]
) -> Optional[str]:
"""Try publisher/aggregator APIs based on journal or domain.
Order (by common coverage): Semantic Scholar, OpenAlex; for PNAS (or biomedical), try PubMed.
Args:
doi: Digital Object Identifier (optional)
feed_name: Name of the RSS feed source
link: URL to the paper
mailto: Contact email for API calls
session: requests.Session for API calls
Returns:
Abstract text or None if not found
"""
fn = (feed_name or '').lower()
domain = (link or '').lower()
# Choose appropriate source list based on journal type
if 'pnas' in fn or 'pnas.org' in domain:
sources = get_biomedical_sources()
else:
sources = get_default_sources()
return try_abstract_sources(sources, doi, None, mailto=mailto, session=session)
[docs]
def iter_targets(
db: DatabaseManager,
topic: str,
threshold: float
) -> Iterable[Dict[str, Any]]:
"""Yield ranked DB rows lacking abstracts for the given topic, highest score first.
Args:
db: DatabaseManager instance
topic: Topic name to filter by
threshold: Minimum rank score to include
Yields:
Dictionary representing each database row
"""
# Use DatabaseManager's iter_targets method with additional abstract filtering
for row in db.iter_targets(topic=topic, min_rank=threshold):
# Filter out rows that already have abstracts
abstract = row['abstract']
if abstract is None or (isinstance(abstract, str) and abstract.strip() == ''):
yield dict(row)
[docs]
def fill_arxiv_summaries(
db: DatabaseManager,
topics: Optional[list[str]] = None
) -> int:
"""First pass: fill abstracts from summary for arXiv/cond-mat entries, no threshold.
Args:
db: DatabaseManager instance
topics: Optional list of topics to process (None = all topics)
Returns:
Number of rows updated
"""
with db.get_connection('current') as conn:
cur = conn.cursor()
params: list = []
topic_filter = ""
if topics:
placeholders = ",".join(["?"] * len(topics))
topic_filter = f" AND topic IN ({placeholders})"
params.extend(topics)
cur.execute(
f"""
SELECT id, topic, feed_name, link, summary
FROM entries
WHERE (abstract IS NULL OR TRIM(abstract) = '')
AND (
LOWER(COALESCE(feed_name, '')) LIKE '%cond-mat%'
OR LOWER(COALESCE(feed_name, '')) LIKE '%arxiv%'
OR LOWER(COALESCE(link, '')) LIKE '%arxiv.org%'
)
{topic_filter}
""",
params,
)
rows = cur.fetchall()
# Collect all updates for batch processing
papers_updates = []
history_updates = []
for row in rows:
id_ = row['id']
tpc = row['topic']
summary = row['summary']
if not summary:
continue
cleaned = clean_abstract_for_db(summary)
if cleaned:
# Note: DOI stays None for these arXiv entries
papers_updates.append((cleaned, None, id_, tpc))
history_updates.append((cleaned, None, id_))
# Batch update papers.db using DatabaseManager method
if papers_updates:
db.update_abstracts_batch(papers_updates)
# Batch update history DB (best-effort)
if history_updates:
try:
db.update_history_abstracts_batch(history_updates)
except Exception as e:
logger.warning(f"Failed to update history database in fill_arxiv_summaries: {e}", exc_info=True)
return len(papers_updates)
[docs]
def crossref_pass(
db: DatabaseManager,
topic: str,
threshold: float,
*,
mailto: str,
session: requests.Session,
min_interval: float,
max_per_topic: Optional[int],
max_retries: int = 3
) -> int:
"""Second pass: Crossref only (DOI first, then title) for entries above threshold.
Args:
db: DatabaseManager instance
topic: Topic name to process
threshold: Minimum rank score to include
mailto: Contact email for Crossref API
session: requests.Session for API calls
min_interval: Minimum seconds between API calls
max_per_topic: Optional maximum fetches per topic
max_retries: Maximum retry attempts for failed requests
Returns:
Number of abstracts fetched
"""
# Collect all updates for batch processing
papers_updates = []
history_updates = []
fetched = 0
for row in iter_targets(db, topic, threshold):
doi = row.get('doi')
abstract: Optional[str] = None
if doi:
abstract = get_crossref_abstract(doi, mailto=mailto, session=session, max_retries=max_retries)
time.sleep(min_interval)
if not abstract:
abstract = search_crossref_abstract_by_title(row.get('title') or '', mailto=mailto, session=session, max_retries=max_retries)
time.sleep(min_interval)
if abstract:
abstract = clean_abstract_for_db(abstract)
papers_updates.append((abstract, doi, row['id'], topic))
history_updates.append((abstract, doi, row['id']))
fetched += 1
if max_per_topic is not None and fetched >= max_per_topic:
break
# Batch update papers.db using DatabaseManager method
if papers_updates:
db.update_abstracts_batch(papers_updates)
# Batch update history DB (best-effort)
if history_updates:
try:
db.update_history_abstracts_batch(history_updates)
except Exception as e:
logger.warning(f"Failed to update history database: {e}", exc_info=True)
return fetched
[docs]
def fallback_pass(
db: DatabaseManager,
topic: str,
threshold: float,
*,
mailto: str,
session: requests.Session,
min_interval: float,
max_per_topic: Optional[int]
) -> int:
"""Third pass: remaining above-threshold entries → Semantic Scholar / OpenAlex / PubMed.
Args:
db: DatabaseManager instance
topic: Topic name to process
threshold: Minimum rank score to include
mailto: Contact email for API calls
session: requests.Session for API calls
min_interval: Minimum seconds between API calls
max_per_topic: Optional maximum fetches per topic
Returns:
Number of abstracts fetched
"""
# Collect all updates for batch processing
papers_updates = []
history_updates = []
fetched = 0
for row in iter_targets(db, topic, threshold):
# Skip rows already filled by previous passes
# iter_targets already filters abstract IS NULL or empty
doi = row.get('doi')
abstract = try_publisher_apis(doi, row.get('feed_name') or '', row.get('link') or '', mailto=mailto, session=session)
if abstract:
abstract = clean_abstract_for_db(abstract)
papers_updates.append((abstract, doi, row['id'], topic))
history_updates.append((abstract, doi, row['id']))
fetched += 1
time.sleep(min_interval)
if max_per_topic is not None and fetched >= max_per_topic:
break
# Batch update papers.db using DatabaseManager method
if papers_updates:
db.update_abstracts_batch(papers_updates)
# Batch update history DB (best-effort)
if history_updates:
try:
db.update_history_abstracts_batch(history_updates)
except Exception as e:
logger.warning(f"Failed to update history database: {e}", exc_info=True)
return fetched