Source code for scitex_web._search_pubmed

#!/usr/bin/env python3
# Time-stamp: "2024-11-13 14:30:43 (ywatanabe)"
# File: ./scitex_repo/src/scitex/web/_search_pubmed.py

"""
1. Functionality:
   - Searches PubMed database for scientific articles
   - Retrieves detailed information about matched articles
   - Displays article metadata including title, authors, journal, year, and abstract
2. Input:
   - Search query string (e.g., "epilepsy prediction")
   - Optional parameters for batch size and result limit
3. Output:
   - Formatted article information displayed to stdout
   - BibTeX file with official citations
4. Prerequisites:
   - Internet connection
   - requests package
   - scitex package
"""

"""Imports"""
import argparse
import asyncio
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional, Union

import aiohttp
import requests

"""Functions & Classes"""


# Tiny stand-in for scitex.str.printc (colored print) — replaces the umbrella
# dep with a 10-line ANSI helper that respects NO_COLOR + TTY detection.
def _printc(text: str, c: str = "white") -> None:
    import os as _os
    import sys as _sys

    if _os.environ.get("NO_COLOR") or not _sys.stdout.isatty():
        print(text)
        return
    codes = {
        "red": "\033[31m",
        "green": "\033[32m",
        "yellow": "\033[33m",
        "blue": "\033[34m",
        "magenta": "\033[35m",
        "cyan": "\033[36m",
        "white": "\033[37m",
    }
    print(f"{codes.get(c, '')}{text}\033[0m")


class _ScitexShim:
    class str:
        printc = staticmethod(_printc)


scitex = _ScitexShim()


_EUTILS_BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
_CROSSREF_BASE_URL = "https://api.crossref.org/works/"


def _search_pubmed(
    query: str, retmax: int = 300, *, base_url: str = _EUTILS_BASE_URL
) -> Dict[str, Any]:
    try:
        search_url = f"{base_url}esearch.fcgi"
        params = {
            "db": "pubmed",
            "term": query,
            "retmax": retmax,
            "retmode": "json",
            "usehistory": "y",
        }

        response = requests.get(search_url, params=params, timeout=10)
        if not response.ok:
            scitex.str.printc("PubMed API request failed", c="red")
            return {}
        return response.json()
    except requests.exceptions.RequestException as e:
        scitex.str.printc(f"Network error: {e}", c="red")
        return {}


def _fetch_details(
    webenv: str,
    query_key: str,
    retstart: int = 0,
    retmax: int = 100,
    *,
    base_url: str = _EUTILS_BASE_URL,
) -> Dict[str, Any]:
    """Fetches detailed information including abstracts for articles.

    Parameters
    ----------
    [Previous parameters remain the same]

    Returns
    -------
    Dict[str, Any]
        Dictionary containing article details and abstracts
    """
    # Fetch abstracts
    efetch_url = f"{base_url}efetch.fcgi"
    efetch_params = {
        "db": "pubmed",
        "query_key": query_key,
        "WebEnv": webenv,
        "retstart": retstart,
        "retmax": retmax,
        "retmode": "xml",
        "rettype": "abstract",
        "field": "abstract,mesh",
    }

    abstract_response = requests.get(efetch_url, params=efetch_params)

    # Fetch metadata
    fetch_url = f"{base_url}esummary.fcgi"
    params = {
        "db": "pubmed",
        "query_key": query_key,
        "WebEnv": webenv,
        "retstart": retstart,
        "retmax": retmax,
        "retmode": "json",
    }

    details_response = requests.get(fetch_url, params=params)

    if not all([abstract_response.ok, details_response.ok]):
        # print(f"Error fetching data")
        return {}

    return {
        "abstracts": abstract_response.text,
        "details": details_response.json(),
    }


def _parse_abstract_xml(xml_text: str) -> Dict[str, tuple]:
    """Parses XML response to extract abstracts.

    Parameters
    ----------
    xml_text : str
        XML response from PubMed

    Returns
    -------
    Dict[str, str]
        Dictionary mapping PMIDs to abstracts
    """
    root = ET.fromstring(xml_text)
    results = {}

    for article in root.findall(".//PubmedArticle"):
        pmid = article.find(".//PMID").text
        abstract_element = article.find(".//Abstract/AbstractText")
        abstract = abstract_element.text if abstract_element is not None else ""

        # DOI
        doi_element = article.find(".//ArticleId[@IdType='doi']")
        doi = doi_element.text if doi_element is not None else ""

        # Get MeSH terms
        keywords = []
        mesh_terms = article.findall(".//MeshHeading/DescriptorName")
        keywords = [term.text for term in mesh_terms if term is not None]

        results[pmid] = (abstract, keywords, doi)

    return results


def _get_citation(pmid: str, *, base_url: str = _EUTILS_BASE_URL) -> str:
    """Gets official citation in BibTeX format.

    Parameters
    ----------
    pmid : str
        PubMed ID

    Returns
    -------
    str
        Official BibTeX citation
    """
    cite_url = f"{base_url}efetch.fcgi"
    params = {
        "db": "pubmed",
        "id": pmid,
        "rettype": "bibtex",
        "retmode": "text",
    }
    response = requests.get(cite_url, params=params)
    return response.text if response.ok else ""


[docs] def get_crossref_metrics( doi: str, api_key: Optional[str] = None, email: Optional[str] = None, *, base_url: str = _CROSSREF_BASE_URL, ) -> Dict[str, Any]: """Get article metrics from CrossRef using DOI.""" import os # Use provided email or fallback to environment variables if not email: email = ( os.getenv("SCITEX_SCHOLAR_CROSSREF_EMAIL") or os.getenv("SCITEX_CROSSREF_EMAIL") or os.getenv("SCITEX_SCHOLAR_PUBMED_EMAIL") or os.getenv("SCITEX_PUBMED_EMAIL", "research@example.com") ) headers = {"User-Agent": f"SciTeX/1.0 (mailto:{email})"} # Add API key as query parameter if provided params = {} if api_key: params["key"] = api_key try: response = requests.get( f"{base_url}{doi}", headers=headers, params=params, timeout=10 ) if response.ok: data = response.json()["message"] return { "citations": data.get("is-referenced-by-count", 0), "type": data.get("type", ""), "publisher": data.get("publisher", ""), "references": len(data.get("reference", [])), "doi": data.get("DOI", ""), } except Exception as e: print(f"CrossRef API error for DOI {doi}: {e}") return {}
def save_bibtex( papers: Dict[str, Any], abstracts: Dict[str, str], output_file: str, *, citation_fn=None, format_fn=None, ) -> None: """Saves paper metadata as BibTeX file with abstracts. ``citation_fn`` / ``format_fn`` default to :func:`_get_citation` / :func:`format_bibtex`; injectable so the writing logic is testable without network access. """ if citation_fn is None: citation_fn = _get_citation if format_fn is None: format_fn = format_bibtex with open(output_file, "w", encoding="utf-8") as bibtex_file: for pmid, paper in papers.items(): if pmid == "uids": continue citation = citation_fn(pmid) if citation: bibtex_file.write(citation) else: # Use default tuple if pmid not in abstracts default_data = ("", [], "") # abstract, keywords, doi bibtex_entry = format_fn(paper, pmid, abstracts.get(pmid, default_data)) bibtex_file.write(bibtex_entry + "\n") scitex.str.printc(f"Saved to: {str(bibtex_file)}", c="yellow") def format_bibtex( paper: Dict[str, Any], pmid: str, abstract_data: tuple, *, metrics_fn=None ) -> str: # ``metrics_fn`` defaults to get_crossref_metrics; injectable for tests. if metrics_fn is None: metrics_fn = get_crossref_metrics abstract, keywords, doi = abstract_data # Get CrossRef and Scimago metrics crossref_metrics = metrics_fn(doi) if doi else {} journal = paper.get("source", "Unknown Journal") # journal_metrics = get_journal_metrics(journal) authors = paper.get("authors", [{"name": "Unknown"}]) author_names = " and ".join(author["name"] for author in authors) pubdate = paper.get("pubdate", "") year = pubdate.split()[0] if pubdate.strip() else "" title = paper.get("title", "No Title") # Name formatting first_author = authors[0]["name"] first_name = first_author.split()[0] last_name = first_author.split()[-1] clean_first_name = "".join(c for c in first_name if c.isalnum()) clean_last_name = "".join(c for c in last_name if c.isalnum()) # Title words title_words = title.split() first_title_word = "".join(c.lower() for c in title_words[0] if c.isalnum()) second_title_word = ( "".join(c.lower() for c in title_words[1] if c.isalnum()) if len(title_words) > 1 else "" ) citation_key = f"{clean_first_name}.{clean_last_name}_{year}_{first_title_word}_{second_title_word}" entry = f"""@article{{{citation_key}, author = {{{author_names}}}, title = {{{title}}}, journal = {{{journal}}}, year = {{{year}}}, pmid = {{{pmid}}}, doi = {{{doi}}}, publisher = {{{crossref_metrics.get("publisher", "")}}}, references = {{{crossref_metrics.get("references", 0)}}}, keywords = {{{", ".join(keywords)}}}, abstract = {{{abstract}}} }} """ return entry async def fetch_async( session: aiohttp.ClientSession, url: str, params: Dict ) -> Union[Dict, str]: """Asynchronous fetch helper.""" async with session.get(url, params=params) as response: if response.status == 200: if params.get("retmode") == "xml": return await response.text() elif params.get("retmode") == "json": return await response.json() return await response.text() return {} async def batch__fetch_details(pmids: List[str], batch_size: int = 20) -> List[Dict]: """Fetches details for multiple PMIDs concurrently. Parameters ---------- pmids : List[str] List of PubMed IDs batch_size : int, optional Size of each batch for concurrent requests Returns ------- List[Dict] List of response data """ base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" async with aiohttp.ClientSession() as session: tasks = [] for i in range(0, len(pmids), batch_size): batch_pmids = pmids[i : i + batch_size] # Fetch both details and citations concurrently efetch_params = { "db": "pubmed", "id": ",".join(batch_pmids), "retmode": "xml", "rettype": "abstract", } esummary_params = { "db": "pubmed", "id": ",".join(batch_pmids), "retmode": "json", } tasks.append(fetch_async(session, f"{base_url}efetch.fcgi", efetch_params)) tasks.append( fetch_async(session, f"{base_url}esummary.fcgi", esummary_params) ) results = await asyncio.gather(*tasks) return results
[docs] def search_pubmed( query: str, n_entries: int = 10, *, search_fn=None, fetch_fn=None ) -> int: # search_fn / fetch_fn default to _search_pubmed / async batch fetcher; # injectable so the orchestration is testable offline. search_results = (search_fn or _search_pubmed)(query) if not search_results: # print("No results found or error occurred") return 1 pmids = search_results["esearchresult"]["idlist"] count = len(pmids) # print(f"Found {count:,} results") output_file = f"pubmed_{query.replace(' ', '_')}.bib" # print(f"Saving results to: {output_file}") # Process in larger batches asynchronously results = (fetch_fn or (lambda ids: asyncio.run(batch__fetch_details(ids))))( pmids[:n_entries] ) # here, results seems long string # Process results and save with open(output_file, "w", encoding="utf-8") as f: for i in range(0, len(results), 2): xml_response = results[i] json_response = results[i + 1] if isinstance(xml_response, str): abstracts = _parse_abstract_xml(xml_response) if isinstance(json_response, dict) and "result" in json_response: details = json_response["result"] save_bibtex(details, abstracts, output_file) # Process results and save temp_bibtex = [] for i in range(0, len(results), 2): xml_response = results[i] json_response = results[i + 1] if isinstance(xml_response, str): abstracts = _parse_abstract_xml(xml_response) if isinstance(json_response, dict) and "result" in json_response: details = json_response["result"] for pmid in details: if pmid != "uids": citation = _get_citation(pmid) if citation: temp_bibtex.append(citation) else: entry = format_bibtex( details[pmid], pmid, abstracts.get(pmid, "") ) temp_bibtex.append(entry) # Write all entries at once with open(output_file, "w", encoding="utf-8") as f: f.write("\n".join(temp_bibtex)) return 0
def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="PubMed article search and retrieval tool" ) parser.add_argument( "--query", "-q", type=str, help='Search query (default: "epilepsy prediction")', ) parser.add_argument( "--n_entries", "-n", type=int, default=10, help='Search query (default: "epilepsy prediction")', ) args = parser.parse_args() scitex.str.printc(args, c="yellow") return args def run_main() -> None: global CONFIG import sys import matplotlib.pyplot as plt import scitex CONFIG, sys.stdout, sys.stderr, plt, CC = scitex.session.start( sys, verbose=False, ) args = parse_args() exit_status = search_pubmed(args.query, args.n_entries) scitex.session.close( CONFIG, verbose=False, notify=False, message="", exit_status=exit_status, ) if __name__ == "__main__": run_main() # EOF