diff --git a/README.md b/README.md index 57976355..47fcfc87 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,14 @@ osm -f path/to/pdf-or-xml2 -u uuid2 --user-managed-compose # Contributing -N.B. On Apple silicon you must use emulation i.e. `export DOCKER_DEFAULT_PLATFORM=linux/amd64` +N.B. On Apple silicon you must use emulation and download the mongo container in advance: -If you wish to contribute to this project you can set up a development environment with the following: +``` +export DOCKER_DEFAULT_PLATFORM=linux/amd64 +docker pull mongo:4.4.6 +``` + +To contribute to the project please run the following commands to set up a development environment: ``` pip install -e . diff --git a/compose.development.override.yaml b/compose.development.override.yaml index 25dded28..933fd03c 100644 --- a/compose.development.override.yaml +++ b/compose.development.override.yaml @@ -10,7 +10,7 @@ services: web_api: container_name: web_api environment: - - MONGODB_URI=mongodb://db:27017/test + - MONGODB_URI=mongodb://db:27017/osm build: context: . dockerfile: ./web/api/Dockerfile @@ -30,7 +30,7 @@ services: context: . dockerfile: ./web/dashboard/Dockerfile environment: - - MONGODB_URI=mongodb://db:27017/test + - MONGODB_URI=mongodb://db:27017/osm working_dir: /app ports: - "8501:8501" @@ -45,4 +45,4 @@ services: ports: - 27017:27017 environment: - - MONGO_INITDB_DATABASE=test + - MONGO_INITDB_DATABASE=osm diff --git a/compose.yaml b/compose.yaml index 4130e65c..037890bf 100644 --- a/compose.yaml +++ b/compose.yaml @@ -9,8 +9,3 @@ services: image: nimhdsst/rtransparent:staging ports: - "8071:8071" - healthcheck: - test: ["CMD", "curl", "--include", "--request", "GET", "http://localhost:8071/health"] - interval: 1s - timeout: 3s - retries: 3 diff --git a/external_components/rtransparent/app.py b/external_components/rtransparent/app.py index 1f40b6a2..164173b1 100644 --- a/external_components/rtransparent/app.py +++ b/external_components/rtransparent/app.py @@ -5,7 +5,7 @@ import pandas as pd import psutil import rpy2.robjects as ro -from fastapi import FastAPI, HTTPException, Query, Request, status +from fastapi import FastAPI, File, HTTPException, Query, UploadFile, status from fastapi.responses import JSONResponse from pydantic import BaseModel from rpy2.robjects import pandas2ri @@ -51,15 +51,14 @@ def rtransparent_metric_extraction( future = importr("future") future.plan(future.multisession, workers=workers) - # Write the XML content to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".xml") as temp_xml_file: temp_xml_file.write(xml_content) temp_xml_file_path = temp_xml_file.name - if parser == "pmc": + if parser == "PMCParser": + # XML files from pubmedcentral can have extra metadata exracted df = extract_from_pmc_xml(temp_xml_file_path, rtransparent) else: df = extract_from_xml(temp_xml_file_path, rtransparent) - # Clean up the temporary file temp_xml_file.close() Path(temp_xml_file_path).unlink() return df @@ -81,7 +80,11 @@ def extract_from_xml(temp_xml_file_path, rtransparent): def extract_from_pmc_xml(temp_xml_file_path, rtransparent): raise NotImplementedError( - "Not all XML files provided at pubmedcentral include the datasharing statements." + """ + Not all XML files provided at pubmedcentral include the datasharing + statements so this is a not a priority. The data returned contains R Na + types which need to be converted to an appropriate python type. + """ ) # dfs = {} # with (ro.default_converter + pandas2ri.converter).context(): @@ -105,24 +108,17 @@ def extract_from_pmc_xml(temp_xml_file_path, rtransparent): @app.post("/extract-metrics/") -async def extract_metrics(request: Request, parser: str = Query("other")): +async def extract_metrics(file: UploadFile = File(...), parser: str = Query("other")): try: - # Attempt to read the XML content from the request body - xml_content = await request.body() + xml_content = await file.read() if not xml_content: raise NotImplementedError( """For now the XML content must be provided. Check the output of the parsing stage.""" ) - metrics_df = rtransparent_metric_extraction(xml_content, parser) - - # Log the extracted metrics - logger.info(metrics_df) - - # Return the first row as a JSON response + logger.info(metrics_df.info()) return JSONResponse(content=metrics_df.iloc[0].to_dict(), status_code=200) except Exception as e: - # Handle exceptions and return a 500 Internal Server Error raise HTTPException(status_code=500, detail=str(e)) diff --git a/osm/_utils.py b/osm/_utils.py index aad09d5a..aef17108 100644 --- a/osm/_utils.py +++ b/osm/_utils.py @@ -1,11 +1,11 @@ import argparse +import datetime import logging import os -import shlex -import subprocess import time import types from pathlib import Path +from time import sleep import pandas as pd import requests @@ -47,6 +47,12 @@ def _get_text_dir(output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path: return text_dir +def _get_logs_dir(output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path: + logs_dir = Path(output_dir) / "logs" + logs_dir.mkdir(parents=True, exist_ok=True) + return logs_dir + + def _existing_file(path_string): path = Path(path_string) if not path.exists(): @@ -59,6 +65,10 @@ def get_compute_context_id(): def wait_for_containers(): + """ + A hack for now, on Apple Silicon, the parser container fails. Ideally we + would just use the wait kwargs for docker compose up + """ while True: try: response = requests.get("http://localhost:8071/health") @@ -71,19 +81,24 @@ def wait_for_containers(): def compose_up(): - cmd = shlex.split("docker compose up -d --build") - subprocess.run( - cmd, - check=True, - ) + from python_on_whales import docker + + logger.info("Waiting for containers to be ready...") + print("Waiting for containers to be ready...") + docker.compose.up(detach=True, wait=True) + print("Containers ready!") + sleep(5) def compose_down(): - cmd = shlex.split("docker compose down") - subprocess.run( - cmd, - check=True, + from python_on_whales import docker + + docker_log = _get_logs_dir() / ( + datetime.datetime.now().strftime("docker_log_%Y%m%d_%H%M%S.txt") ) + docker_log.write_text(docker.compose.logs()) + docker.compose.down() + print(f"Logs of docker containers are saved at {docker_log}") def _setup(args): @@ -104,17 +119,15 @@ def _setup(args): metrics_path = _get_metrics_dir() / f"{args.uid}.json" if metrics_path.exists(): raise FileExistsError(metrics_path) + # create logs directory if necessary + _ = _get_logs_dir() if not args.user_managed_compose: compose_up() - - logger.info("Waiting for containers to be ready...") - print("Waiting for containers to be ready...") - wait_for_containers() - print("Containers ready!") return xml_path, metrics_path def coerce_to_string(v): + "Can be useful for schemas with permissive string fields" if isinstance(v, (int, float, bool)): return str(v) elif isinstance(v, types.NoneType): diff --git a/osm/cli.py b/osm/cli.py index e6107025..3109a71f 100644 --- a/osm/cli.py +++ b/osm/cli.py @@ -3,12 +3,13 @@ from osm._utils import DEFAULT_OUTPUT_DIR, _existing_file, _setup, compose_down from osm.pipeline.core import Pipeline, Savers from osm.pipeline.extractors import RTransparentExtractor -from osm.pipeline.parsers import NoopParser, ScienceBeamParser +from osm.pipeline.parsers import NoopParser, PMCParser, ScienceBeamParser from osm.pipeline.savers import FileSaver, JSONSaver, OSMSaver PARSERS = { "sciencebeam": ScienceBeamParser, "no-op": NoopParser, + "pmc": PMCParser, } EXTRACTORS = { "rtransparent": RTransparentExtractor, @@ -41,7 +42,7 @@ def parse_args(): choices=PARSERS.keys(), default=["sciencebeam"], nargs="+", - help="Select the tool for parsing the input document. Default is 'sciencebeam'.", + help="Select the tool for parsing the input document. Default is 'sciencebeam'. Choose 'pmc' for xml files from pubmed central.", ) parser.add_argument( "--metrics-type", @@ -76,7 +77,7 @@ def main(): xml_path, metrics_path = _setup(args) pipeline = Pipeline( - filepath=args.filepath, + input_path=args.filepath, xml_path=xml_path, metrics_path=metrics_path, parsers=[PARSERS[p]() for p in args.parser], diff --git a/osm/pipeline/core.py b/osm/pipeline/core.py index de62bd0c..5fd70f8c 100644 --- a/osm/pipeline/core.py +++ b/osm/pipeline/core.py @@ -15,9 +15,14 @@ def __init__(self, version: str = "0.0.1"): self._orm_model = None @abstractmethod - def run(self, data: Any, **kwargs) -> Any: + def _run(self, data: bytes, **kwargs) -> Any: + """Abstract method that subclasses must implement.""" pass + def run(self, data: bytes, *args, **kwargs) -> Any: + print(f"{self.name} (version {self.version}) is running.") + return self._run(data, *args, **kwargs) + def _get_orm_fields(self) -> dict[str, Any]: fields = {} for fieldname in self.orm_model_class.model_fields.keys(): @@ -60,19 +65,19 @@ def __iter__(self): yield self.osm_saver def save_file(self, data: str, path: Path): - self.file_saver.run(data, path) + self.file_saver.run(data, path=path) def save_json(self, data: dict, path: Path): - self.json_saver.run(data, path) + self.json_saver.run(data, path=path) def save_osm( self, - file_in: bytes, + data: bytes, metrics: dict, components: list, ): # Call the method to save or upload the data - self.osm_saver.run(file_in, metrics, components) + self.osm_saver.run(data, metrics=metrics, components=components) class Pipeline: @@ -82,14 +87,14 @@ def __init__( parsers: list[Component], extractors: list[Component], savers: Savers, - filepath: str, + input_path: str, xml_path: Optional[str] = None, metrics_path: Optional[str] = None, ): self.parsers = parsers self.extractors = extractors self.savers = savers - self.filepath = filepath + self.input_path = input_path self._file_data = None self.xml_path = xml_path self.metrics_path = metrics_path @@ -100,22 +105,21 @@ def run(self): if isinstance(parsed_data, str): self.savers.save_file(parsed_data, self.xml_path) for extractor in self.extractors: - # extracted_metrics = extractor.run(parsed_data,parser=parser.name) - extracted_metrics = extractor.run(parsed_data) + extracted_metrics = extractor.run(parsed_data, parser=parser.name) self.savers.save_osm( - file_in=self.file_data, + data=self.file_data, metrics=extracted_metrics, components=[*self.parsers, *self.extractors, *self.savers], ) self.savers.save_json(extracted_metrics, self.metrics_path) @staticmethod - def read_file(filepath: str) -> bytes: - with open(filepath, "rb") as file: + def read_file(input_path: str) -> bytes: + with open(input_path, "rb") as file: return file.read() @property def file_data(self): if not self._file_data: - self._file_data = self.read_file(self.filepath) + self._file_data = self.read_file(self.input_path) return self._file_data diff --git a/osm/pipeline/extractors.py b/osm/pipeline/extractors.py index 33cdea48..6d804a92 100644 --- a/osm/pipeline/extractors.py +++ b/osm/pipeline/extractors.py @@ -1,26 +1,34 @@ +import io import logging import requests +from osm.schemas.custom_fields import LongBytes + from .core import Component logger = logging.getLogger(__name__) class RTransparentExtractor(Component): - def run(self, data: str, parser: str = None) -> dict: - headers = {"Content-Type": "application/octet-stream"} + def _run(self, data: bytes, parser: str = None) -> dict: + self.sample = LongBytes(data) + + # Prepare the file to be sent as a part of form data + files = {"file": ("input.xml", io.BytesIO(data), "application/xml")} + + # Send the request with the file response = requests.post( - "http://localhost:8071/extract-metrics", - data=data, - headers=headers, + "http://localhost:8071/extract-metrics/", + files=files, params={"parser": parser}, ) + if response.status_code == 200: metrics = response.json() # pmid only exists when input filename is correct - metrics.pop("pmid") - # replace bizarre sentinel value + metrics.pop("pmid", None) # Use .pop() with a default to avoid KeyError + # Replace NA value for k, v in metrics.items(): if v == -2147483648: metrics[k] = None diff --git a/osm/pipeline/parsers.py b/osm/pipeline/parsers.py index 8a96e588..fd718604 100644 --- a/osm/pipeline/parsers.py +++ b/osm/pipeline/parsers.py @@ -10,13 +10,24 @@ class NoopParser(Component): """Used if the input is xml and so needs no parsing.""" - def run(self, data: bytes) -> str: - self.sample = LongBytes(data) - return data.decode("utf-8") + def _run(self, data: bytes) -> bytes: + return data + + +class PMCParser(NoopParser): + """ + Used if the input is a PMC derived XML and so needs no parsing. PMC + parsed XMLs can have unique features. For example RTransparent extracts + additional metadata from them aided by the document structure. Sometimes + data sharing statements etc. may not be in the PMC parsed XML despite being + in the pdf version of the publication. + """ + + pass class ScienceBeamParser(Component): - def run(self, data: bytes) -> str: + def _run(self, data: bytes) -> str: self.sample = LongBytes(data) headers = {"Accept": "application/tei+xml", "Content-Type": "application/pdf"} response = requests.post(SCIENCEBEAM_URL, data=data, headers=headers) diff --git a/osm/pipeline/savers.py b/osm/pipeline/savers.py index 943696f9..b19880b0 100644 --- a/osm/pipeline/savers.py +++ b/osm/pipeline/savers.py @@ -1,8 +1,10 @@ import base64 +import getpass import hashlib import json import logging import os +import traceback from pathlib import Path import dill @@ -18,10 +20,14 @@ logger = logging.getLogger(__name__) +def format_error_message() -> str: + return traceback.format_exc().replace(getpass.getuser(), "USER") + + class FileSaver(Component): """Basic saver that writes data to a file.""" - def run(self, data: str, path: Path): + def _run(self, data: str, path: Path): """Write data to a file. Args: @@ -35,7 +41,7 @@ def run(self, data: str, path: Path): class JSONSaver(Component): """Saver that writes JSON data to a file.""" - def run(self, data: dict, path: Path): + def _run(self, data: dict, path: Path): """Write output metrics to a JSON file for the user. Args: @@ -66,31 +72,41 @@ def __init__(self, comment, email, user_defined_id, filename): self.user_defined_id = user_defined_id self.filename = filename - def run(self, file_in: bytes, metrics: dict, components: list[schemas.Component]): + def _run(self, data: bytes, metrics: dict, components: list[schemas.Component]): """Save the extracted metrics to the OSM API. Args: - file_in (bytes): Component input. - metrics (dict): Schema conformant metrics. - components (list[schemas.Component]): parsers, extractors, and savers that constitute the pipeline. + data: Component input. + metrics: Schema conformant metrics. + components: parsers, extractors, and savers that constitute the pipeline. """ osm_api = os.environ.get("OSM_API", "https://osm.pythonaisolutions.com/api") + print(f"Using OSM API: {osm_api}") # Build the payload - payload = { - "osm_version": __version__, - "user_comment": self.comment, - "work": { - "user_defined_id": self.user_defined_id, - "filename": self.filename, - "content_hash": hashlib.sha256(file_in).hexdigest(), - }, - "client": { - "compute_context_id": self.compute_context_id, - "email": self.email, - }, - "metrics": schemas.RtransparentMetrics(**metrics), - "components": [comp.orm_model for comp in components], - } + try: + payload = { + "osm_version": __version__, + "user_comment": self.comment, + "work": { + "user_defined_id": self.user_defined_id, + "filename": self.filename, + "content_hash": hashlib.sha256(data).hexdigest(), + }, + "client": { + "compute_context_id": self.compute_context_id, + "email": self.email, + }, + "metrics": metrics, + "components": [comp.orm_model for comp in components], + } + except Exception as e: + requests.put( + f"{osm_api}/payload_error/", + json=schemas.PayloadError( + error_message=format_error_message(), + ).model_dump(mode="json", exclude=["id"]), + ) + raise e try: # Validate the payload validated_data = schemas.Invocation(**payload) @@ -108,22 +124,22 @@ def run(self, file_in: bytes, metrics: dict, components: list[schemas.Component] raise ValueError( f"Failed to upload invocation data: \n {response.text}" ) + except requests.exceptions.ConnectionError: + raise EnvironmentError(f"Cannot connect to OSM API ({osm_api})") except (ValidationError, ValueError) as e: try: # Quarantine the failed payload failure = schemas.Quarantine( payload=base64.b64encode(dill.dumps(payload)).decode("utf-8"), - error_message=str(e), + error_message=format_error_message(), ).model_dump(mode="json", exclude=["id"]) response = requests.put(f"{osm_api}/quarantine/", json=failure) response.raise_for_status() - raise e - except requests.exceptions.RequestException as qe: + except Exception: requests.put( - f"{osm_api}/quarantine/", - json=schemas.Quarantine( - error_message=str(e), - recovery_message=str(qe), - ).model_dump(mode="json", exclude=["id"]), + f"{osm_api}/quarantine2/", + files={"file": dill.dumps(payload)}, + data={"error_message": format_error_message()}, ) + finally: raise e diff --git a/osm/schemas/__init__.py b/osm/schemas/__init__.py index c00cca22..eeb4eaa9 100644 --- a/osm/schemas/__init__.py +++ b/osm/schemas/__init__.py @@ -1,6 +1,7 @@ from .schemas import Client as Client from .schemas import Component as Component from .schemas import Invocation as Invocation +from .schemas import PayloadError as PayloadError from .schemas import Quarantine as Quarantine from .schemas import RtransparentMetrics as RtransparentMetrics from .schemas import Work as Work diff --git a/osm/schemas/schemas.py b/osm/schemas/schemas.py index 7f42251f..43efbbd1 100644 --- a/osm/schemas/schemas.py +++ b/osm/schemas/schemas.py @@ -85,6 +85,15 @@ class Invocation(Model): class Quarantine(Model): - payload: str = "" + payload: bytes = b"" error_message: str - recovery_message: Optional[str] = None + created_at: datetime.datetime = Field( + default_factory=lambda: datetime.datetime.now(datetime.UTC) + ) + + +class PayloadError(Model): + error_message: str + created_at: datetime.datetime = Field( + default_factory=lambda: datetime.datetime.now(datetime.UTC) + ) diff --git a/pyproject.toml b/pyproject.toml index 05ddf36a..1808d06b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "pyarrow", "pydantic", "pydantic[email]", + "python_on_whales", "odmantic", "requests", ] diff --git a/web/api/main.py b/web/api/main.py index 95073d0c..08b122eb 100644 --- a/web/api/main.py +++ b/web/api/main.py @@ -1,36 +1,20 @@ """ Sets up a web API for uploading osm metrics to a centralized database - -Usage along the lines of: - -curl -X POST "http://localhost:80/upload/" \ - -H "Content-Type: application/json" \ - -d '{ - "osm_version": "1.0", - "timestamp": "2024-07-24T12:00:00Z", - "user_comment": "example comment", - "work": { - "user_defined_id": "123", - "pmid": "pmid_example", - "file": "example_file_content_base64_encoded", - "content_hash": "example_hash", - "timestamp": "2024-07-24T12:00:00Z" - } - }' """ import os import motor.motor_asyncio -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from fastapi.responses import JSONResponse from odmantic import AIOEngine, ObjectId -from osm.schemas import Invocation, Quarantine +from osm.schemas import Invocation, PayloadError, Quarantine app = FastAPI() dburi = os.environ.get("MONGODB_URI", "mongodb://localhost:27017") client = motor.motor_asyncio.AsyncIOMotorClient(dburi) -engine = AIOEngine(client=client, database="test") +engine = AIOEngine(client=client, database="osm") @app.put("/upload/", response_model=Invocation) @@ -39,12 +23,26 @@ async def upload_invocation(invocation: Invocation): return invocation +@app.put("/payload_error/", response_model=PayloadError) +async def upload_failed_payload_construction(payload_error: PayloadError): + await engine.save(payload_error) + return payload_error + + @app.put("/quarantine/", response_model=Quarantine) async def upload_failed_invocation(quarantine: Quarantine): await engine.save(quarantine) return quarantine +@app.put("/quarantine2/") +async def upload_failed_quarantine( + file: UploadFile = File(...), error_message: str = Form(...) +): + await engine.save(Quarantine(payload=file.file.read(), error_message=error_message)) + return JSONResponse(content={"message": "Success"}, status_code=200) + + @app.get("/invocations/{id}", response_model=Invocation) async def get_invocation_by_id(id: ObjectId): invocation = await engine.find_one(Invocation, Invocation.id == id)