diff --git a/.gitignore b/.gitignore index e671b5ac..e6f0a174 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ osm_output .terraform .terraform.lock.hcl .public_dns +tempdata diff --git a/compose.development.override.yaml b/compose.development.override.yaml index 015c12da..25dded28 100644 --- a/compose.development.override.yaml +++ b/compose.development.override.yaml @@ -8,6 +8,7 @@ services: - ./external_components/rtransparent:/app web_api: + container_name: web_api environment: - MONGODB_URI=mongodb://db:27017/test build: @@ -17,12 +18,14 @@ services: - 80:80 volumes: - ./web/api:/app/app + - ./osm:/opt/osm/osm working_dir: /app/app command: ["fastapi","dev","--host","0.0.0.0","--port","80"] depends_on: - db dashboard: + container_name: dashboard build: context: . dockerfile: ./web/dashboard/Dockerfile @@ -31,8 +34,12 @@ services: working_dir: /app ports: - "8501:8501" + volumes: + - ./web/dashboard:/app + - ./osm:/opt/osm/osm db: + container_name: db # use old version of mongo to avoid Apple Instruction set error image: mongo:4.4.6 ports: diff --git a/compose.yaml b/compose.yaml index 8ff191b9..4130e65c 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,9 +1,11 @@ services: sciencebeam: + container_name: sciencebeam image: elifesciences/sciencebeam-parser ports: - "8070:8070" rtransparent: + container_name: rtransparent image: nimhdsst/rtransparent:staging ports: - "8071:8071" diff --git a/osm/schemas/custom_fields.py b/osm/schemas/custom_fields.py new file mode 100644 index 00000000..75c3bea0 --- /dev/null +++ b/osm/schemas/custom_fields.py @@ -0,0 +1,98 @@ +from typing import Any, ClassVar, Generic, TypeVar, Union + +import odmantic +from pydantic.annotated_handlers import GetCoreSchemaHandler +from pydantic.json_schema import JsonSchemaValue +from pydantic_core import CoreSchema, core_schema + +T = TypeVar("T", str, bytes) + + +def _display(value: T) -> str: + if isinstance(value, bytes): + return b"..." if value else b"" + return f"{value[:10]}..." if value else "" + + +class LongField(Generic[T]): + _inner_schema: ClassVar[CoreSchema] + _error_kind: ClassVar[str] + + @classmethod + def __get_pydantic_core_schema__( + cls, source: type[Any], handler: GetCoreSchemaHandler + ) -> CoreSchema: + def serialize( + value: "LongField[T]", info: core_schema.SerializationInfo + ) -> Union[str, "LongField[T]"]: + if info.mode == "json": + return _display(value.get_value()) + else: + return value + + def get_json_schema( + _core_schema: CoreSchema, handler: GetCoreSchemaHandler + ) -> JsonSchemaValue: + json_schema = handler(cls._inner_schema) + return json_schema + + json_schema = core_schema.no_info_after_validator_function( + source, # construct the type + cls._inner_schema, + ) + + def get_schema(strict: bool) -> CoreSchema: + return core_schema.json_or_python_schema( + python_schema=core_schema.union_schema( + [ + core_schema.is_instance_schema(source), + json_schema, + ], + custom_error_type=cls._error_kind, + strict=strict, + ), + json_schema=json_schema, + serialization=core_schema.plain_serializer_function_ser_schema( + serialize, + info_arg=True, + return_schema=core_schema.str_schema(), + when_used="json", + ), + ) + + return core_schema.lax_or_strict_schema( + lax_schema=get_schema(strict=False), + strict_schema=get_schema(strict=True), + ) + + def __init__(self, value: T): + self._value = value + + def get_value(self) -> T: + return self._value + + def __repr__(self) -> str: + return '""' # Always return an empty string representation + + def __str__(self) -> str: + return _display(self._value) + + +class LongStr(LongField[str]): + """A string that displays '...' instead of the full content in logs or tracebacks.""" + + _inner_schema: ClassVar[CoreSchema] = core_schema.str_schema() + _error_kind: ClassVar[str] = "string_type" + + +class LongBytes(LongField[bytes]): + """A bytes type that displays '...' instead of the full content in logs or tracebacks.""" + + _inner_schema: ClassVar[CoreSchema] = core_schema.bytes_schema() + _error_kind: ClassVar[str] = "bytes_type" + + +class FilePlaceholder(odmantic.EmbeddedModel): + content: LongBytes = odmantic.Field( + default=b"", json_schema_extra={"exclude": True} + ) diff --git a/osm/schemas/metrics_schemas.py b/osm/schemas/metrics_schemas.py index 7aa83934..5b82cba9 100644 --- a/osm/schemas/metrics_schemas.py +++ b/osm/schemas/metrics_schemas.py @@ -1,10 +1,13 @@ +import types from typing import Optional from odmantic import EmbeddedModel +from pydantic import field_validator -# The rtransparent tool can extract from parsed pdfs or from XML directly from pubmed central. The latter has many more fields. +from .custom_fields import LongStr +# The rtransparent tool can extract from parsed pdfs or from XML directly from pubmed central. The latter has many more fields. # all_indicators.csv from the rtransparent publication has both but has the following extra fields: # code_text,com_code,com_data_availibility,com_file_formats,com_general_db,com_github_data,com_specific_db,com_suppl_code,com_supplemental_data,data_text,dataset,eigenfactor_score,field,is_art,is_code_pred,is_data_pred,is_relevant_code,is_relevant_data,jif,n_cite,score,year, class RtransparentMetrics(EmbeddedModel): @@ -13,19 +16,19 @@ class RtransparentMetrics(EmbeddedModel): is_open_data: Optional[bool] # Optional fields - year: Optional[float] = None + year: Optional[int] = None filename: Optional[str] = None pmcid_pmc: Optional[int] = None - pmid: Optional[float] = None + pmid: Optional[int] = None doi: Optional[str] = None - year_epub: Optional[float] = None - year_ppub: Optional[float] = None + year_epub: Optional[int] = None + year_ppub: Optional[int] = None journal: Optional[str] = None publisher: Optional[str] = None affiliation_country: Optional[str] = None affiliation_institution: Optional[str] = None type: Optional[str] = None - data_text: Optional[str] = None + data_text: Optional[LongStr] = None is_relevant_data: Optional[bool] = None com_specific_db: Optional[str] = None com_general_db: Optional[str] = None @@ -34,18 +37,18 @@ class RtransparentMetrics(EmbeddedModel): com_file_formats: Optional[str] = None com_supplemental_data: Optional[str] = None com_data_availibility: Optional[str] = None - code_text: Optional[str] = None + code_text: Optional[LongStr] = None is_relevant_code: Optional[bool] = None com_code: Optional[str] = None com_suppl_code: Optional[str] = None is_coi_pred: Optional[bool] = None - coi_text: Optional[str] = None + coi_text: Optional[LongStr] = None is_coi_pmc_fn: Optional[bool] = None - is_coi_pmc_title: Optional[str] = None + is_coi_pmc_title: Optional[bool] = None is_relevant_coi: Optional[bool] = None is_relevant_coi_hi: Optional[bool] = None is_relevant_coi_lo: Optional[bool] = None - is_explicit_coi: Optional[str] = None + is_explicit_coi: Optional[bool] = None coi_1: Optional[bool] = None coi_2: Optional[bool] = None coi_disclosure_1: Optional[bool] = None @@ -66,7 +69,7 @@ class RtransparentMetrics(EmbeddedModel): board_1: Optional[bool] = None no_coi_1: Optional[bool] = None no_funder_role_1: Optional[bool] = None - fund_text: Optional[str] = None + fund_text: Optional[LongStr] = None fund_pmc_institute: Optional[str] = None fund_pmc_source: Optional[str] = None fund_pmc_anysource: Optional[str] = None @@ -109,37 +112,37 @@ class RtransparentMetrics(EmbeddedModel): acknow_1: Optional[bool] = None disclosure_1: Optional[bool] = None disclosure_2: Optional[bool] = None - fund_ack: Optional[str] = None - project_ack: Optional[str] = None + fund_ack: Optional[bool] = None + project_ack: Optional[bool] = None is_register_pred: Optional[bool] = None - register_text: Optional[str] = None + register_text: Optional[LongStr] = None is_research: Optional[bool] = None is_review: Optional[bool] = None is_reg_pmc_title: Optional[bool] = None is_relevant_reg: Optional[bool] = None is_method: Optional[bool] = None is_NCT: Optional[bool] = None - is_explicit_reg: Optional[str] = None - prospero_1: Optional[str] = None - registered_1: Optional[str] = None - registered_2: Optional[str] = None - registered_3: Optional[str] = None - registered_4: Optional[str] = None - registered_5: Optional[str] = None - not_registered_1: Optional[str] = None - registration_1: Optional[str] = None - registration_2: Optional[str] = None - registration_3: Optional[str] = None - registration_4: Optional[str] = None - registry_1: Optional[str] = None - reg_title_1: Optional[str] = None - reg_title_2: Optional[str] = None - reg_title_3: Optional[str] = None - reg_title_4: Optional[str] = None - funded_ct_1: Optional[str] = None - ct_2: Optional[str] = None - ct_3: Optional[str] = None - protocol_1: Optional[str] = None + is_explicit_reg: Optional[bool] = None + prospero_1: Optional[bool] = None + registered_1: Optional[bool] = None + registered_2: Optional[bool] = None + registered_3: Optional[bool] = None + registered_4: Optional[bool] = None + registered_5: Optional[bool] = None + not_registered_1: Optional[bool] = None + registration_1: Optional[bool] = None + registration_2: Optional[bool] = None + registration_3: Optional[bool] = None + registration_4: Optional[bool] = None + registry_1: Optional[bool] = None + reg_title_1: Optional[bool] = None + reg_title_2: Optional[bool] = None + reg_title_3: Optional[bool] = None + reg_title_4: Optional[bool] = None + funded_ct_1: Optional[bool] = None + ct_2: Optional[bool] = None + ct_3: Optional[bool] = None + protocol_1: Optional[bool] = None is_success: Optional[bool] = None is_art: Optional[bool] = None field: Optional[str] = None @@ -150,13 +153,13 @@ class RtransparentMetrics(EmbeddedModel): # some extra fields affiliation_aff_id: Optional[str] = None affiliation_all: Optional[str] = None - article: Optional[int] = None + article: Optional[str] = None author: Optional[str] = None author_aff_id: Optional[str] = None correspondence: Optional[str] = None date_epub: Optional[str] = None date_ppub: Optional[str] = None - funding_text: Optional[str] = None + funding_text: Optional[LongStr] = None is_explicit: Optional[bool] = None is_fund_pred: Optional[bool] = None is_funded_pred: Optional[bool] = None @@ -174,9 +177,9 @@ class RtransparentMetrics(EmbeddedModel): n_ref: Optional[str] = None n_table_body: Optional[str] = None n_table_floats: Optional[str] = None - open_code_statements: Optional[str] = None - open_data_category: Optional[str] = None - open_data_statements: Optional[str] = None + open_code_statements: Optional[LongStr] = None + open_data_category: Optional[LongStr] = None + open_data_statements: Optional[LongStr] = None pii: Optional[str] = None pmcid_uid: Optional[str] = None publisher_id: Optional[str] = None @@ -185,6 +188,18 @@ class RtransparentMetrics(EmbeddedModel): is_data_pred: Optional[bool] = None is_code_pred: Optional[bool] = None + @field_validator("article") + def coerce_to_string(cls, v): + if isinstance(v, (int, float, bool)): + return str(v) + elif isinstance(v, types.NoneType): + return None + elif not isinstance(v, str): + raise ValueError( + "string required or a type that can be coerced to a string" + ) + return v + # Tried to define programmatically but both ways seemed to yield a model class without type annotated fields... diff --git a/osm/schemas/schemas.py b/osm/schemas/schemas.py index 0516c3a2..0585d03c 100644 --- a/osm/schemas/schemas.py +++ b/osm/schemas/schemas.py @@ -1,19 +1,33 @@ +import base64 +from datetime import datetime from typing import Optional -from odmantic import EmbeddedModel, Model -from pydantic import EmailStr +from odmantic import EmbeddedModel, Field, Model +from pydantic import EmailStr, field_validator +from .custom_fields import LongBytes from .metrics_schemas import RtransparentMetrics class Component(EmbeddedModel): + model_config = { + "extra": "forbid", + "json_encoders": { + LongBytes: lambda v: base64.b64encode(v.get_value()).decode("utf-8"), + }, + } name: str version: str docker_image: Optional[str] = None docker_image_id: Optional[str] = None + sample: Optional[LongBytes] = Field( + default=b"", + json_schema_extra={"exclude": True, "select": False, "write_only": True}, + ) class Client(EmbeddedModel): + model_config = {"extra": "forbid"} compute_context_id: int email: Optional[EmailStr] = None @@ -28,15 +42,25 @@ class Work(EmbeddedModel): a pdf) provided as part of each "Invocation" or cli call. """ + model_config = {"extra": "forbid"} user_defined_id: str - pmid: Optional[str] = None + pmid: Optional[int] = None doi: Optional[str] = None openalex_id: Optional[str] = None scopus_id: Optional[str] = None filename: str = "" - file: Optional[str] = None content_hash: Optional[str] = None + @field_validator("user_defined_id") + def coerce_to_string(cls, v): + if isinstance(v, (int, float, bool)): + return str(v) + elif not isinstance(v, str): + raise ValueError( + "string required or a type that can be coerced to a string" + ) + return v + class Invocation(Model): """ @@ -44,15 +68,13 @@ class Invocation(Model): for the Odmantic document model used to interact with mongodb. """ - work: Work + model_config = {"extra": "forbid"} metrics: RtransparentMetrics - - components: list[Component] + components: Optional[list[Component]] = None + work: Work client: Client - user_comment: Optional[str] = "" + user_comment: str = "" osm_version: str - # Potentially link to other databases for extra metadata but for now will just use component outputs - - -# Rtransparent: Component.construct(name="rtransparent", version="0.13", docker_image="nimh-dsst/rtransparent:0.13", docker_image_id="dsjfkldsjflkdsjlf2jkl23j") -# ScibeamParser: Component.construct(name="scibeam-parser", version="0.5.1", docker_image="elife/scibeam-parser:0.5.1",docker_image_id="dsjfkldsjflkdsjlf2jkl23j") + funder: Optional[list[str]] = None + data_tags: list[str] = [] + created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/scripts/invocation_upload.py b/scripts/invocation_upload.py index 780ff696..6aec9a9f 100644 --- a/scripts/invocation_upload.py +++ b/scripts/invocation_upload.py @@ -1,89 +1,141 @@ +import argparse import logging import os -from typing import List +from pathlib import Path import pandas as pd +import pyarrow.dataset as ds import pymongo from pydantic import ValidationError -from osm.schemas import Client, Invocation, Work +from osm import schemas DB_NAME = os.environ["DB_NAME"] -MONGO_URI = os.environ["MONGO_URI"] -ERROR_CSV_PATH = "error_log.csv" -ERROR_LOG_PATH = "error.log" -# NOTICE: output of rt_all without corresponding values in the all_indicators.csv from Rtransparent publication -unmapped = { - "article": "", - "is_relevant": None, - "is_explicit": None, -} +MONGODB_URI = os.environ["MONGODB_URI"] +ERROR_CSV_PATH = Path("error_log.csv") +ERROR_LOG_PATH = Path("error.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) +cols_mapping = {} -def transform_data(df) -> List[Invocation]: +def custom_irp_data_processing(row): + return row + + +def custom_rtransparent_pub_data_processing(row): + row["is_open_code"] = row.pop("is_code_pred") + row["is_open_data"] = row.pop("is_data_pred") + row["pmid"] = None if pd.isnull(row.pmid) else int(row["pmid"]) + row["year_ppub"] = None if pd.isnull(row.year_ppub) else int(row["year_ppub"]) + row["year_epub"] = None if pd.isnull(row.year_epub) else int(row["year_epub"]) + row["year"] = None if pd.isnull(row.year) else int(row["year"]) + row["doi"] = None if pd.isnull(row.doi) else row["doi"] + return row + + +def transform_data(df, tags=None, custom_processing=None) -> list[schemas.Invocation]: """Convert the dataframe to a list of Invocation objects""" + tags = tags or [] for index, row in df.iterrows(): try: - work = Work( - user_defined_id=str(row["doi"]), - pmid=str(row.pop("pmid")), - doi=str(row.pop("doi")), + if custom_processing is not None: + func = globals()[custom_processing] + row = func(row) + work = schemas.Work( + user_defined_id=row.get("doi") or str(row.get("pmid")), + pmid=row.get("pmid"), + doi=row.get("doi"), openalex_id=None, scopus_id=None, - filename=row.pop("filename"), - file=None, + filename=row.get("filename") or "", content_hash=None, ) - client = Client(compute_context_id=999, email=None) + client = schemas.Client(compute_context_id=999, email=None) - metrics = {**unmapped, **row.to_dict()} - invocation = Invocation( + metrics = {**row.to_dict()} + invocation = schemas.Invocation( metrics=metrics, osm_version="0.0.1", client=client, work=work, - user_comment="Initial database seeding with data from Rtransparent publication", + user_comment="Initial database seeding with publications from the NIH IRP", + data_tags=["bulk_upload", *tags], + funder=row.get("funder"), + components=[ + schemas.Component(name="Scibeam-parser/Rtransparent", version="NA") + ], ) - yield invocation.dict() + yield invocation.model_dump(mode="json", exclude="id") - except (KeyError, ValidationError) as e: + except (KeyError, ValidationError, Exception) as e: + breakpoint() logger.error(f"Error processing row {index}") - write_error_to_file(invocation, e) - - -def write_error_to_file(invocation: Invocation, error: Exception): - with open(ERROR_CSV_PATH, "a") as csv_file, open(ERROR_LOG_PATH, "a") as log_file: - # Write the problematic invocation data to the CSV - row_dict = { - **invocation.metrics, - **invocation.work.dict(), - **invocation.client.dict(), - } - pd.DataFrame([row_dict]).to_csv( - csv_file, header=csv_file.tell() == 0, index=False + write_error_to_file(row, e) + + +def write_error_to_file(row: pd.Series, error: Exception): + with ERROR_CSV_PATH.open("a") as csv_file, ERROR_LOG_PATH.open("a") as log_file: + # Write the problematic row data to the CSV, add header if not yet populated. + row.to_csv( + csv_file, + header=not ERROR_CSV_PATH.exists() or ERROR_CSV_PATH.stat().st_size == 0, + index=False, ) - # Log the error details - log_file.write(f"Error processing invocation: {invocation}\nError: {error}\n\n") + # Drop string values as they tend to be too long + display_row = ( + row.apply(lambda x: x if not isinstance(x, str) else None) + .dropna() + .to_dict() + ) + log_file.write(f"Error processing data:\n {display_row}\nError: {error}\n\n") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Invocation Upload") + parser.add_argument( + "-i", + "--input_file", + required=True, + help="Path to the input file if it is feather (or a directory for chunked parquet)", + ) + parser.add_argument( + "-t", + "--tags", + nargs="+", + help="Tags to apply to the uploaded data for filtering etc.", + ) + parser.add_argument( + "-c", + "--custom-processing", + help="Name of function that applies custom processing to the data", + ) + return parser.parse_args() def main(): - df = pd.read_feather("all_indicators.feather", dtype_backend="pyarrow") + args = parse_args() + file_in = Path(args.input_file) + if file_in.is_dir(): + df = ds.dataset(file_in, format="parquet").to_table().to_pandas() + else: + df = pd.read_feather(file_in, dtype_backend="pyarrow") + if df.empty: raise Exception("Dataframe is empty") + try: - db = pymongo.MongoClient(MONGO_URI).osm - db.invocation.insert_many(transform_data(df)) + db = pymongo.MongoClient(MONGODB_URI).osm + db.invocation.insert_many( + transform_data(df, tags=args.tags, custom_processing=args.custom_processing) + ) except Exception as e: - breakpoint() logger.error(f"Failed to process data: {e}") - # raise e - breakpoint() + raise e if __name__ == "__main__": diff --git a/scripts/merge_funder.py b/scripts/merge_funder.py new file mode 100644 index 00000000..7e138d1b --- /dev/null +++ b/scripts/merge_funder.py @@ -0,0 +1,158 @@ +import argparse +import logging +from pathlib import Path + +import pandas as pd +import pyarrow as pa +import pyarrow.dataset as ds +import pyarrow.parquet as pq + +from osm import schemas + +logging.basicConfig(level=logging.INFO) + + +def odmantic_to_pyarrow(schema_dict): + type_mapping = { + "integer": pa.int64(), + "number": pa.float64(), + "string": pa.string(), + "boolean": pa.bool_(), + "null": pa.string(), # We will handle None manually + "array": pa.list_(pa.string()), + "object": pa.struct([]), + } + + fields = [] + for prop, details in schema_dict["properties"].items(): + primary_type = "string" + nullable = False + + if "anyOf" in details: + primary_type = next( + (t["type"] for t in details["anyOf"] if t["type"] != "null"), "string" + ) + nullable = True + else: + primary_type = details.get("type", "string") + + pyarrow_type = type_mapping.get(primary_type, pa.string()) + fields.append(pa.field(prop, pyarrow_type, nullable=nullable)) + return pa.schema(fields) + + +def read_parquet_chunks_and_combine(chunk_dir, pyarrow_schema): + dataset = ds.dataset(chunk_dir, format="parquet") + + # Select and order columns as per the pyarrow schema + table = dataset.to_table(columns=pyarrow_schema.names) + + # Convert the PyArrow table to a Pandas DataFrame + return table.to_pandas() + + +def save_combined_df_as_feather(df, output_file): + df.reset_index(drop=True).to_feather(output_file) + + +def setup(dset_path, funder_path, merge_col): + dataset = pd.read_feather(dset_path, dtype_backend="pyarrow") + + if str(dset_path) == "tempdata/sharestats.feather": + dataset = dataset.rename(columns={"article": "pmid"}) + + # Read the CSV, allowing pmid to be float64 to handle .0 cases + df = pd.read_csv(funder_path, dtype={merge_col: float}) + + # Convert merge_col to nullable integer type after handling any potential NaNs + df[merge_col] = pd.to_numeric( + df[merge_col], downcast="integer", errors="coerce" + ).astype("Int64") + + funder_columns = df.columns.difference([merge_col]) + df["funder"] = df[funder_columns].apply( + lambda x: funder_columns[x].tolist(), axis=1 + ) + funder = df.loc[df["funder"].astype(bool), [merge_col, "funder"]].set_index( + merge_col + ) + + return dataset, funder, dset_path + + +def clean_funder_column(merged_df): + def clean_funder(funder): + if isinstance(funder, list): + return sorted([str(f) for f in funder]) + return [] + + merged_df["funder"] = merged_df["funder"].apply(clean_funder) + return merged_df + + +def merge_funder(df, funder, merge_col): + merged_df = df.merge(funder, on=merge_col, how="left") + merged_df = clean_funder_column(merged_df) + return merged_df + + +def subset_schema_to_dataframe(schema, df): + fields = [field for field in schema if field.name in df.columns] + # Adjust the schema again to include the 'funder' column + funder_field = pa.field("funder", pa.list_(pa.string()), nullable=True) + fields.append(funder_field) + + return pa.schema(fields) + + +def get_user_args(): + parser = argparse.ArgumentParser() + parser.add_argument("dataset_path", help="Path to the dataset file") + parser.add_argument("funder_path", help="Path to the funders file") + parser.add_argument("merge_col", default="pmcid_pmc", help="Column to merge on") + return parser.parse_args() + + +def main(): + args = get_user_args() + dset_path = Path(args.dataset_path) + odmantic_schema_json = schemas.RtransparentMetrics.model_json_schema() + pyarrow_schema = odmantic_to_pyarrow(odmantic_schema_json) + dataset, funder, dset_path = setup(dset_path, args.funder_path, args.merge_col) + + # Convert float NaNs to None and enforce correct data types + dataset = dataset.where(pd.notna(dataset), None) + + # Convert specific columns to nullable integers, ensuring proper conversion of NA to None + for col in ["year", "year_ppub", "year_epub", "pmid"]: + if col in dataset.columns: + dataset[col] = pd.to_numeric(dataset[col], errors="coerce").astype("Int64") + + # Explicitly replace pd.NA with None after conversion + dataset[col] = dataset[col].apply(lambda x: None if pd.isna(x) else int(x)) + + adjusted_schema = subset_schema_to_dataframe(pyarrow_schema, dataset) + + output_dir = Path(f"tempdata/{dset_path.stem}-chunks") + output_dir.mkdir(parents=True, exist_ok=True) + + chunk_size = 400000 + for i in range(0, len(dataset), chunk_size): + print(f"{i} / {len(dataset)}") + chunk = dataset.iloc[i : i + chunk_size] + chunk = merge_funder(chunk, funder, args.merge_col) + chunk_file = output_dir / f"chunk_{i // chunk_size}.parquet" + pq.write_table( + pa.Table.from_pandas(chunk, schema=adjusted_schema), + chunk_file, + compression="snappy", + ) + + df_out = read_parquet_chunks_and_combine(output_dir, adjusted_schema) + save_combined_df_as_feather( + df_out, dset_path.parent / f"{dset_path.stem}-with-funder.feather" + ) + + +if __name__ == "__main__": + main() diff --git a/tests/scripts/test_invocation_upload.py b/tests/scripts/test_invocation_upload.py index cd9a5c61..790cb228 100644 --- a/tests/scripts/test_invocation_upload.py +++ b/tests/scripts/test_invocation_upload.py @@ -272,12 +272,12 @@ def test_transform_data_validation_error(): async def test_upload_data_success(mock_database): # Define test data invocation_list = [] - mongo_uri = "mongodb://test_uri" + MONGODB_URI = "mongodb://test_uri" db_name = "test_db" # If an exception is raised in the above call, the test will fail. # There's no need for a 'with not pytest.raises(Exception):' block. - await upload_data(invocation_list, mongo_uri, db_name, "1/2") + await upload_data(invocation_list, MONGODB_URI, db_name, "1/2") @pytest.mark.asyncio diff --git a/web/dashboard/Dockerfile b/web/dashboard/Dockerfile index 3d2a786b..0b4e035c 100644 --- a/web/dashboard/Dockerfile +++ b/web/dashboard/Dockerfile @@ -2,8 +2,9 @@ FROM tiangolo/uvicorn-gunicorn:python3.11 WORKDIR /app -ENV LOCAL_DATA_PATH=/opt/dashboard_data.feather -COPY ./dashboard_data.feather /opt/dashboard_data.feather +RUN mkdir -p /opt/data +ENV LOCAL_DATA_PATH=/opt/data/dashboard_data.feather +COPY ./tempdata/dashboard_data.feather /opt/data/dashboard_data.feather # Create the environment RUN pip install holoviews panel pymongo odmantic pandas pyarrow pydantic[email] diff --git a/web/dashboard/app.py b/web/dashboard/app.py index db29529f..197aa712 100644 --- a/web/dashboard/app.py +++ b/web/dashboard/app.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pandas as pd import panel as pn @@ -28,8 +29,9 @@ def flatten_dict(d): def load_data(): - if "LOCAL_DATA_PATH" in os.environ: - return pd.read_feather(os.environ["LOCAL_DATA_PATH"]) + local_path = os.environ.get("LOCAL_DATA_PATH") + if local_path is not None and Path(local_path).exists(): + return pd.read_feather(local_path) client = MongoClient(os.environ["MONGODB_URI"]) engine = SyncEngine(client=client, database="osm") matches = ( @@ -40,33 +42,31 @@ def load_data(): "$match": { "osm_version": {"$eq": "0.0.1"}, # "work.pmid": {"$regex":r"^2"}, - "metrics.year": {"$gt": 2000}, + # "metrics.year": {"$gt": 2000}, # "metrics.is_data_pred": {"$eq": True}, }, }, { "$project": { # "osm_version": True, - # "user_comment": True, - # "client.compute_context_id": True, - "work.user_defined_id": True, + "funder": True, + "data_tags": True, + "work.pmid": True, "metrics.year": True, "metrics.is_code_pred": True, "metrics.is_data_pred": True, "metrics.affiliation_country": True, - "metrics.score": True, - "metrics.eigenfactor_score": True, - "metrics.fund_pmc_anysource": True, - "metrics.fund_pmc_institute": True, - "metrics.fund_pmc_source": True, "metrics.journal": True, + "created_at": True, }, }, ] ) .__iter__() ) - return pd.DataFrame(flatten_dict(match) for match in matches) + df = pd.DataFrame(flatten_dict(match) for match in matches) + df.to_feather(local_path) + return df def dashboard_page():