diff --git a/scripts/invocation_upload.py b/scripts/invocation_upload.py index 253d540c..ae2d7e20 100644 --- a/scripts/invocation_upload.py +++ b/scripts/invocation_upload.py @@ -1,26 +1,28 @@ -from motor.motor_asyncio import AsyncIOMotorClient -from odmantic import AIOEngine -from pydantic import ValidationError +import logging +import os +import pickle +import tempfile +from pathlib import Path from typing import List -import requests + import pandas as pd -import tempfile -import os -from osm.schemas import Work, Client, Invocation -from osm._utils import get_compute_context_id -import asyncio -import logging +import requests +from motor import MotorClient +from odmantic import SyncEngine +from pydantic import ValidationError + +from osm.schemas import Client, Invocation, Work -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) logger = logging.getLogger(__name__) -# NOTICE: items without mapping to the dataframe +# NOTICE: output of rt_all without corresponding values in the all_indicators.csv from Rtransparent publication unmapped = { "article": "", "is_relevant": None, "is_explicit": None, - "user_comment": "", - "osm_version": "" } @@ -31,74 +33,23 @@ def transform_data(df: pd.DataFrame) -> List[Invocation]: try: work = Work( user_defined_id=str(row["doi"]), - pmid=str(row["pmid"]), - doi=str(row["doi"]), + pmid=str(row.pop("pmid")), + doi=str(row.pop("doi")), openalex_id=None, scopus_id=None, - filename=row["filename"], + filename=row.pop("filename"), file=None, content_hash=None, ) - client = Client( - compute_context_id=get_compute_context_id(), - email=None - ) - - metrics = { - "article": unmapped["article"], - "pmid": row["pmid"], - "is_coi_pred": row["is_coi_pred"], - "coi_text": row["coi_text"], - "is_funded_pred": row["is_fund_pred"], - "funding_text": row["fund_text"], - "support_1": row["support_1"], - "support_3": row["support_3"], - "support_4": row["support_4"], - "support_5": row["support_5"], - "support_6": row["support_6"], - "support_7": row["support_7"], - "support_8": row["support_8"], - "support_9": row["support_9"], - "support_10": row["support_10"], - "developed_1": row["developed_1"], - "received_1": row["received_1"], - "received_2": row["received_2"], - "recipient_1": row["recipient_1"], - "authors_1": row["authors_1"], - "authors_2": row["authors_2"], - "thank_1": row["thank_1"], - "thank_2": row["thank_2"], - "fund_1": row["fund_1"], - "fund_2": row["fund_2"], - "fund_3": row["fund_3"], - "supported_1": row["supported_1"], - "financial_1": row["financial_1"], - "financial_2": row["financial_2"], - "financial_3": row["financial_3"], - "grant_1": row["grant_1"], - "french_1": row["french_1"], - "common_1": row["common_1"], - "common_2": row["common_2"], - "common_3": row["common_3"], - "common_4": row["common_4"], - "common_5": row["common_5"], - "acknow_1": row["acknow_1"], - "disclosure_1": row["disclosure_1"], - "disclosure_2": row["disclosure_2"], - "is_register_pred": row["is_register_pred"], - "register_text": row["register_text"], - "is_relevant": unmapped["is_relevant"], - "is_method": row["is_method"], - "is_NCT": row["is_NCT"], - "is_explicit": unmapped["is_explicit"] - } + client = Client(compute_context_id=999, email=None) + metrics = {**unmapped, **row.to_dict()} invocation = Invocation( metrics=metrics, - osm_version=unmapped["osm_version"], + osm_version="0.0.1", client=client, work=work, - user_comment=unmapped["user_comment"], + user_comment="Initial database seeding with data from Rtransparent publication", ) invocations.append(invocation) @@ -126,14 +77,10 @@ def read_data(data_path: str): async def upload_data(invocations: List[Invocation], mongo_uri: str, db_name: str): """upload invocations to MongoDB one after the other to prevent timeout""" - motor_client = AsyncIOMotorClient(mongo_uri) + motor_client = MotorClient(mongo_uri) try: - engine = AIOEngine(client=motor_client, database=db_name) - for invocation in invocations: - await engine.save(invocation) - logger.info({ - "message": "upload successful" - }) + engine = SyncEngine(client=motor_client, database=db_name) + engine.save_all(invocations) except (TypeError, Exception) as e: if isinstance(e, TypeError): raise TypeError(e) @@ -148,29 +95,37 @@ def download_csv(url): try: response = requests.get(url) if response.status_code == 200: - temp_file, temp_file_path = tempfile.mkstemp(suffix='.csv') + temp_file, temp_file_path = tempfile.mkstemp(suffix=".csv") os.close(temp_file) # Close the file descriptor - with open(temp_file_path, 'wb') as file: + with open(temp_file_path, "wb") as file: file.write(response.content) return temp_file_path else: - raise Exception(f"Failed to download CSV. Status code: {response.status_code}") + raise Exception( + f"Failed to download CSV. Status code: {response.status_code}" + ) except Exception as e: raise e def main(data_path="all_indicators.feather"): try: - df = read_data(data_path) - if not df.empty: - invocations = transform_data(df) - db_url = os.getenv("DATABASE_URL", None) - db_name = os.getenv("DATABASE_NAME", None) - asyncio.run(upload_data(invocations, - db_url, db_name)) + transformed_pickle = Path("invocations.pkl") + if transformed_pickle.exists(): + df = pickle.loads(transformed_pickle.read_bytes()) else: - raise Exception("Dataframe is empy") + df = read_data(data_path) + if not df.empty: + invocations = transform_data(df) + transformed_pickle.write_bytes(pickle.dumps(invocations)) + else: + raise Exception("Dataframe is empty") + db_url = os.getenv("DATABASE_URL", None) + db_name = os.getenv("DATABASE_NAME", None) + logger.warning(f"Uploading data to {db_url}") + upload_data(invocations, db_url, db_name) except Exception as e: + breakpoint() logger.error(f"Failed to process data: {e}") raise e