Source code for paper_firehose.processors.emailer

"""
Email rendering and sending utilities for Paper Firehose.

Generates a simple, email-friendly HTML digest from papers.db and sends it via
SMTP (SSL) based on configuration stored in the runtime data directory
(`config/config.yaml` under the path resolved by PAPER_FIREHOSE_DATA_DIR).

Uses only the Python standard library.
"""

from __future__ import annotations

from typing import Dict, Any, List, Optional, Tuple
import datetime
import html
import os
import smtplib
import ssl
from email.message import EmailMessage
from html.parser import HTMLParser
from pathlib import Path


def _fmt_score_badge(score: Optional[float]) -> str:
    """Render a small inline badge showing the rank score, or empty string on failure."""
    if score is None:
        return ""
    try:
        s = float(score)
        trunc = int(s * 100) / 100.0
        return f'<span style="background:#eef;border:1px solid #99c;border-radius:6px;padding:2px 6px;margin-left:6px;font-size:12px;color:#224;">Score {trunc:.2f}</span>'
    except Exception:
        return ""


[docs] class EmailRenderer: """Create compact HTML suitable for email clients (no external JS/CSS).""" def __init__(self) -> None: """Construct the renderer; currently stateless but kept for symmetry.""" pass
[docs] def render_topic_digest( self, topic_display_name: str, entries: List[Dict[str, Any]], *, max_items: Optional[int] = None, ) -> str: """Return HTML body for a single topic. Entries expected to contain keys: title, link, authors, published_date, feed_name, abstract, summary, rank_score. """ today = datetime.date.today().isoformat() # Sort by rank desc if scores present sorted_entries = list(entries) try: sorted_entries.sort(key=lambda e: (e.get('rank_score') or 0.0), reverse=True) except Exception: pass if max_items is not None: sorted_entries = sorted_entries[: max_items] parts: List[str] = [] parts.append( f"<h2 style=\"margin:16px 0 8px;\">{html.escape(topic_display_name)}{today}</h2>" ) if not sorted_entries: parts.append('<p style="font-style:italic;color:#555;">No entries.</p>') return "\n".join(parts) for e in sorted_entries: title = html.escape((e.get('title') or '').strip() or 'No title') link = (e.get('link') or '#').strip() authors = html.escape((e.get('authors') or '').strip()) published = html.escape((e.get('published_date') or '').strip()) feed_name = html.escape((e.get('feed_name') or '').strip()) score_badge = _fmt_score_badge(e.get('rank_score')) # pick content: abstract -> summary body = (e.get('abstract') or '').strip() or (e.get('summary') or '').strip() content_html = html.escape(body) if body else '<em>No abstract/summary.</em>' parts.append( f""" <div style=\"margin:12px 0 18px;\">\n <div style=\"font-size:16px;line-height:1.35;\">\n <a href=\"{link}\" target=\"_blank\" style=\"color:#18457a;text-decoration:none;\">{title}</a> {score_badge} </div>\n <div style=\"color:#333;margin:6px 0;\"><strong>Authors:</strong> {authors}</div>\n <div style=\"color:#333;margin:6px 0;\">{content_html}</div>\n <div style=\"color:#666;font-size:12px;\"><strong>{feed_name}</strong> — <em>Published: {published}</em></div>\n </div> """ ) return "\n".join(parts)
# --- HTML sanitization for abstracts (allow <img>) --- def _sanitize_abstract_html(self, html_text: str) -> str: """Return a sanitized HTML string suitable for email, preserving <img>. - Allows a small whitelist of tags: b,strong,i,em,u,sub,sup,br,p,ul,ol,li,span,a,img - For <a>, only http/https href; adds rel and target - For <img>, only http/https src; forces style max-width:100%; height:auto - Escapes all text and disallowed tags/attributes """ if not html_text or ('<' not in html_text and '>' not in html_text): # No tags likely present; escape and return return html.escape(html_text or '') allowed_tags = { 'b', 'strong', 'i', 'em', 'u', 'sub', 'sup', 'br', 'p', 'ul', 'ol', 'li', 'span', 'a', 'img' } allowed_attrs = { 'a': {'href'}, 'img': {'src', 'alt', 'width', 'height'}, 'span': {'style'}, 'p': {'style'}, } def is_http_url(url: str) -> bool: """Return True when the URL is an http(s) link; reject mailto/javascript/etc.""" u = (url or '').strip().lower() return u.startswith('http://') or u.startswith('https://') out: list[str] = [] skip_stack: list[str] = [] skip_tags = {'cite', 'footer'} # drop content fully inside these class Sanitizer(HTMLParser): def handle_starttag(self, tag, attrs): """Emit sanitized start tags or replace with safe alternatives.""" # If entering a skip-only tag, push and ignore until endtag if tag in skip_tags: skip_stack.append(tag) return if tag not in allowed_tags: return if tag == 'a': href = '' for k, v in attrs: if k == 'href' and is_http_url(v): href = html.escape(v, quote=True) break if href: out.append(f'<a href="{href}" target="_blank" rel="noopener noreferrer">') else: out.append('<span>') elif tag == 'img': src = '' alt = '' width = '' height = '' for k, v in attrs: if k == 'src' and is_http_url(v): src = html.escape(v, quote=True) elif k == 'alt': alt = html.escape(v or '', quote=True) elif k == 'width': width = html.escape(v or '', quote=True) elif k == 'height': height = html.escape(v or '', quote=True) if src: style = 'max-width:100%;height:auto;' dim = '' if width: dim += f' width="{width}"' if height: dim += f' height="{height}"' out.append(f'<img src="{src}" alt="{alt}" style="{style}"{dim}>') else: # Generic allowed tag; filter attrs to allowed ones, escape values attrs_map = {k: v for k, v in attrs if k in allowed_attrs.get(tag, set())} attr_str = ''.join([f' {k}="{html.escape(v or "", quote=True)}"' for k, v in attrs_map.items()]) out.append(f'<{tag}{attr_str}>') def handle_endtag(self, tag): """Emit matching end tags for allowed elements, respecting replacements.""" if skip_stack and tag == skip_stack[-1]: skip_stack.pop() return if tag not in allowed_tags: return # If we replaced <a> with <span>, close span here gracefully; it's okay to emit </a> or </span> if tag == 'a': out.append('</a>') elif tag in ('img', 'br'): # already self-closed or no close tag required return else: out.append(f'</{tag}>') def handle_data(self, data): """Append escaped text content, dropping boilerplate like DOI references.""" # Skip data if we're inside a skipped tag if skip_stack: return # Drop common publisher footer lines like DOI d = data.strip() if not d: return low = d.lower() if low.startswith('doi:') or low.startswith('https://doi.org'): return out.append(html.escape(data)) def handle_entityref(self, name): """Preserve HTML entity references such as &alpha;.""" out.append(f'&{name};') def handle_charref(self, name): """Preserve numeric character references such as &#8217;.""" out.append(f'&#{name};') try: Sanitizer().feed(html_text) return ''.join(out) except Exception: # On any parse error, escape whole content return html.escape(html_text)
[docs] def render_full_email( self, title: str, sections: List[Tuple[str, str]], ) -> str: """Return a complete HTML email with a title and named sections. sections: list of (section_title, section_html) """ safe_title = html.escape(title) # Basic, inline CSS only; avoid external assets for maximum deliverability. head = f""" <!DOCTYPE html> <html> <head> <meta charset=\"UTF-8\"> <title>{safe_title}</title> <style> body {{ font-family: Arial, sans-serif; margin: 12px 16px; color: #111; }} h1 {{ color: #153e75; font-size: 22px; margin: 0 0 12px; }} h2 {{ color: #1e5aa8; font-size: 18px; margin: 16px 0 8px; }} a {{ color: #18457a; }} hr {{ border: none; border-top: 1px solid #ddd; margin: 12px 0; }} </style> </head> <body> <h1>{safe_title}</h1> """ body_parts: List[str] = [head] for sec_title, sec_html in sections: body_parts.append(f"<h2>{html.escape(sec_title)}</h2>") body_parts.append(sec_html) body_parts.append("<hr>") body_parts.append("</body></html>") return "\n".join(body_parts)
def _format_pqa_summary(self, pqa_raw: Optional[str]) -> Optional[str]: """Format paper_qa_summary JSON for email. Returns a compact HTML block with Summary and Methods. Falls back to plain escaped text if not JSON. """ if not pqa_raw: return None try: import json data = json.loads(pqa_raw) if not isinstance(data, dict): raise ValueError("not an object") summary_val = data.get('summary') or '' methods_val = data.get('methods') or '' # CRITICAL FIX: Check for double-encoded JSON # If summary_val looks like a JSON string, try parsing it if summary_val and isinstance(summary_val, str) and summary_val.strip().startswith('{'): try: nested_data = json.loads(summary_val) if isinstance(nested_data, dict): summary_val = nested_data.get('summary', summary_val) # Only use nested methods if current methods_val is empty if not methods_val: methods_val = nested_data.get('methods', methods_val) except (json.JSONDecodeError, ValueError): # Not valid JSON, use as-is pass summary = html.escape(summary_val) methods = html.escape(methods_val) parts: List[str] = [] if summary: parts.append(f"<div><strong>Summary:</strong> {summary}</div>") if methods: parts.append(f"<div><strong>Methods:</strong> {methods}</div>") return "\n".join(parts) if parts else None except Exception: # Fallback to plain text return html.escape(pqa_raw)
[docs] def render_ranked_entries( self, topic_display_name: str, entries: List[Dict[str, Any]], *, max_items: Optional[int] = None, ) -> str: """Render a ranked-style section for email with minimal, inline CSS. Entry layout: - Title (link) with Score badge - Authors - Feed name - Abstract if present; otherwise summary if available """ # Defensive copy and ordering by score desc items = list(entries) try: items.sort(key=lambda e: (e.get('rank_score') or 0.0), reverse=True) except Exception: pass if max_items is not None: items = items[: max_items] parts: List[str] = [] # Do not include a section header here; the caller provides the header. if not items: return "" for e in items: title = html.escape((e.get('title') or '').strip() or 'No title') link = (e.get('link') or '#').strip() authors = html.escape((e.get('authors') or '').strip()) feed_name = html.escape((e.get('feed_name') or '').strip()) score_badge = _fmt_score_badge(e.get('rank_score')) abstract_raw = (e.get('abstract') or '').strip() summary_raw = (e.get('summary') or '').strip() content_src = abstract_raw or summary_raw if content_src: body_text = self._sanitize_abstract_html(content_src) else: body_text = '<em>No abstract/summary.</em>' pqa_block = self._format_pqa_summary(e.get('paper_qa_summary')) if pqa_block: pqa_html = ( '<div style="background:#fff8d5;border-left:4px solid #d4b106;padding:8px 10px;margin:8px 0;">\n' '<div style="font-weight:bold;color:#8a6d3b;margin-bottom:4px;">Fulltext summary</div>' f"{pqa_block}" '</div>' ) else: pqa_html = '' parts.append( f""" <div style=\"margin:12px 0 18px;\">\n <div style=\"font-size:16px;line-height:1.35;\">\n <a href=\"{link}\" target=\"_blank\" style=\"color:#18457a;text-decoration:none;\">{title}</a> {score_badge} </div>\n <div style=\"color:#333;margin:6px 0;\"><strong>Authors:</strong> {authors}</div>\n <div style=\"color:#333;margin:6px 0;\"><strong>{feed_name}</strong></div>\n <div style=\"color:#333;margin:6px 0;\">{body_text}</div>\n {pqa_html} </div> """ ) return "\n".join(parts)
[docs] class SMTPSender: """Send emails via SMTP (SSL) using settings under config['email']['smtp'].""" def __init__(self, smtp_cfg: Dict[str, Any], config_dir: Optional[str] = None) -> None: """Initialize SMTP connection parameters and optional password lookup directory.""" self.host = str(smtp_cfg.get('host') or '') self.port = int(smtp_cfg.get('port') or 465) self.username = str(smtp_cfg.get('username') or '') self.password = str(smtp_cfg.get('password') or '') # discouraged; prefer file self.password_file = smtp_cfg.get('password_file') self._config_dir = Path(config_dir).expanduser().resolve() if config_dir else None def _load_password(self) -> str: """Fetch SMTP password via inline config, password file, or environment fallback.""" if self.password: return self.password if self.password_file: candidate = Path(str(self.password_file)).expanduser() if not candidate.is_absolute() and self._config_dir: candidate = (self._config_dir / candidate).resolve() if os.path.exists(candidate): with open(candidate, 'r', encoding='utf-8') as f: return f.read().strip() # Last resort: env var based on username env_name = 'SMTP_PASSWORD' return os.environ.get(env_name, '')
[docs] def send(self, *, subject: str, from_addr: str, to_addrs: List[str], html_body: str, text_body: Optional[str] = None) -> None: """Send a multipart email with HTML alternative using SMTP over SSL.""" if not self.host or not self.port or not self.username: raise RuntimeError("SMTP configuration incomplete: host/port/username required") password = self._load_password() if not password: raise RuntimeError("SMTP password not found. Set email.smtp.password_file or email.smtp.password in config.") msg = EmailMessage() msg['Subject'] = subject msg['From'] = from_addr msg['To'] = ", ".join(to_addrs) msg['Reply-To'] = from_addr # Add anti-spam headers msg['X-Mailer'] = 'Paper Firehose Research Digest' msg['List-Unsubscribe'] = f'<mailto:{from_addr}?subject=unsubscribe>' msg['Precedence'] = 'bulk' # Generate proper plain text version if not provided if not text_body: text_body = self._html_to_text(html_body) msg.set_content(text_body) msg.add_alternative(html_body, subtype='html') context = ssl.create_default_context() with smtplib.SMTP_SSL(self.host, self.port, context=context) as server: server.login(self.username, password) server.send_message(msg)
def _html_to_text(self, html_body: str) -> str: """Convert HTML email body to plain text for multipart email.""" import re # Remove HTML tags but preserve structure text = html_body # Replace headers with text equivalents text = re.sub(r'<h1[^>]*>(.*?)</h1>', r'\n\1\n' + '='*50 + '\n', text, flags=re.DOTALL) text = re.sub(r'<h2[^>]*>(.*?)</h2>', r'\n\n\1\n' + '-'*40 + '\n', text, flags=re.DOTALL) # Replace links with [text](url) format text = re.sub(r'<a[^>]*href=["\']([^"\']+)["\'][^>]*>(.*?)</a>', r'\2 (\1)', text, flags=re.DOTALL) # Remove style tags and their content text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL) # Remove all remaining HTML tags text = re.sub(r'<[^>]+>', ' ', text) # Clean up whitespace text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) # Multiple blank lines to double text = re.sub(r' +', ' ', text) # Multiple spaces to single text = text.strip() return text