Skip to content

Commit

Permalink
Allow dataframes to be broken into chunks and uploaded. Handles error…
Browse files Browse the repository at this point in the history
… to retry and report errors and save failed collection
  • Loading branch information
gitstart-nimhdsst committed Aug 9, 2024
1 parent 22712b8 commit c0a33b7
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 115 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[project]
[project]
name = "osm"
description = "Open Science Metrics (OSM) client for tracking scientific transparency and reproducibility metrics"
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.9" # Confirm the required python version since [tool.black] is using a low version
keywords = [
"open science",
"altmetrics",
Expand Down
88 changes: 70 additions & 18 deletions scripts/invocation_upload.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import asyncio
import logging
import os
import pickle
import tempfile
from pathlib import Path
from typing import List

import numpy as np
import pandas as pd
import requests

# from motor.motor_tornado import MotorClient
from motor.motor_asyncio import AsyncIOMotorClient
from odmantic import AIOEngine
from pydantic import ValidationError

from osm.schemas import Client, Invocation, Work
Expand All @@ -27,7 +28,7 @@
}


def transform_data(df: pd.DataFrame) -> List[Invocation]:
def transform_data(df) -> List[Invocation]:
"""Handles data transformation as well as mapping"""
invocations = []
for index, row in df.iterrows():
Expand Down Expand Up @@ -69,24 +70,53 @@ def read_data(data_path: str):
if data_path.startswith("https"):
csv = download_csv(data_path)
df = pd.read_csv(csv)
elif data_path.endswith("csv"):
df = pd.read_csv(data_path)
else:
df = pd.read_feather(data_path)
return df
except FileNotFoundError as e:
raise e


async def upload_data(invocations: List[Invocation], mongo_uri: str, db_name: str):
ERROR_CSV_PATH = "error_log.csv"
ERROR_LOG_PATH = "error.log"


def write_error_to_file(invocation: Invocation, error: Exception):
with open(ERROR_CSV_PATH, "a") as csv_file, open(ERROR_LOG_PATH, "a") as log_file:
# Write the problematic invocation data to the CSV
row_dict = {
**invocation.metrics,
**invocation.work.dict(),
**invocation.client.dict(),
}
pd.DataFrame([row_dict]).to_csv(
csv_file, header=csv_file.tell() == 0, index=False
)

# Log the error details
log_file.write(f"Error processing invocation: {invocation}\nError: {error}\n\n")


async def upload_data(
invocations: List[Invocation], mongo_uri: str, db_name: str, count
):
"""upload invocations to MongoDB one after the other to prevent timeout"""
motor_client = AsyncIOMotorClient(mongo_uri)
engine = AIOEngine(client=motor_client, database=db_name)
try:
engine = motor_client(client=motor_client, database=db_name)
engine.save_all(invocations)
await engine.save_all(invocations)
logger.info(f"Upload successful {count}")
except (TypeError, Exception) as e:
if isinstance(e, TypeError):
raise TypeError(e)
elif isinstance(e, Exception):
raise Exception(f"Failed to upload data: {e}")
breakpoint()
logger.error(f"Error uploading batch: {e} {count}")
for inv in invocations:
try:
await engine.save(inv)
except Exception as e:
write_error_to_file(inv, e)
raise e
finally:
motor_client.close()

Expand All @@ -109,7 +139,16 @@ def download_csv(url):
raise e


def main(data_path="all_indicators.feather"):
def break_into_chunks(dataframe):
batch_size = 1000 # Define your batch size
number_of_batches = int(np.ceil(len(dataframe) / batch_size))

# Use np.array_split to create batches
chunks = np.array_split(dataframe, number_of_batches)
return chunks


def main(db_url: str, db_name: str, data_path="all_indicators.feather"):
try:
transformed_pickle = Path("invocations.pkl")
if transformed_pickle.exists():
Expand All @@ -118,19 +157,32 @@ def main(data_path="all_indicators.feather"):
breakpoint()
df = read_data(data_path)
if not df.empty:
invocations = transform_data(df)
transformed_pickle.write_bytes(pickle.dumps(invocations))
chunks = break_into_chunks(df)
ind = 1
for chunk in chunks:
invocations = transform_data(chunk)

logger.info(f"Uploading data to {db_url} {ind}/{len(chunks)}")
asyncio.run(
upload_data(
invocations, db_url, db_name, f"{ind}/{len(chunks)}"
)
)
ind = ind + 1

# transformed_pickle.write_bytes(pickle.dumps(invocations))
else:
raise Exception("Dataframe is empty")
db_url = os.getenv("DATABASE_URL", None)
db_name = os.getenv("DATABASE_NAME", None)
logger.warning(f"Uploading data to {db_url}")
upload_data(invocations, db_url, db_name)
except Exception as e:
breakpoint()
logger.error(f"Failed to process data: {e}")
raise e


if __name__ == "__main__":
main()
url = os.getenv(
"DATABASE_URL",
None,
)
name = os.getenv("DATABASE_NAME", None)
main(url, name)
Loading

0 comments on commit c0a33b7

Please sign in to comment.