Skip to content

Commit

Permalink
Merge pull request #30 from nimh-dsst/CLAI-1.2
Browse files Browse the repository at this point in the history
CLAI-1 Contd - Specification for Uploading Dataset to MongoDB
  • Loading branch information
leej3 authored Aug 9, 2024
2 parents 22712b8 + 03069b8 commit bbafba2
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 182 deletions.
128 changes: 41 additions & 87 deletions scripts/invocation_upload.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
import logging
import os
import pickle
import tempfile
from pathlib import Path
from typing import List

import pandas as pd
import requests

# from motor.motor_tornado import MotorClient
from motor.motor_asyncio import AsyncIOMotorClient
import pymongo
from pydantic import ValidationError

from osm.schemas import Client, Invocation, Work

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

DB_NAME = os.environ["DB_NAME"]
MONGO_URI = os.environ["MONGO_URI"]
ERROR_CSV_PATH = "error_log.csv"
ERROR_LOG_PATH = "error.log"
# NOTICE: output of rt_all without corresponding values in the all_indicators.csv from Rtransparent publication
unmapped = {
"article": "",
"is_relevant": None,
"is_explicit": None,
}

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

def transform_data(df: pd.DataFrame) -> List[Invocation]:
"""Handles data transformation as well as mapping"""
invocations = []

def transform_data(df) -> List[Invocation]:
"""Convert the dataframe to a list of Invocation objects"""
for index, row in df.iterrows():
try:
work = Work(
Expand All @@ -52,84 +49,41 @@ def transform_data(df: pd.DataFrame) -> List[Invocation]:
work=work,
user_comment="Initial database seeding with data from Rtransparent publication",
)
yield invocation.dict()

invocations.append(invocation)
except (KeyError, ValidationError) as e:
if isinstance(e, KeyError):
raise KeyError(f"Error key not found in row {index}: {e}")
elif isinstance(e, ValidationError):
raise e

return invocations


def read_data(data_path: str):
"""Checks to see if url is a path or https to download or read file"""
try:
if data_path.startswith("https"):
csv = download_csv(data_path)
df = pd.read_csv(csv)
else:
df = pd.read_feather(data_path)
return df
except FileNotFoundError as e:
raise e


async def upload_data(invocations: List[Invocation], mongo_uri: str, db_name: str):
"""upload invocations to MongoDB one after the other to prevent timeout"""
motor_client = AsyncIOMotorClient(mongo_uri)
try:
engine = motor_client(client=motor_client, database=db_name)
engine.save_all(invocations)
except (TypeError, Exception) as e:
if isinstance(e, TypeError):
raise TypeError(e)
elif isinstance(e, Exception):
raise Exception(f"Failed to upload data: {e}")
finally:
motor_client.close()


def download_csv(url):
"""downloads file and store in a temp location"""
try:
response = requests.get(url)
if response.status_code == 200:
temp_file, temp_file_path = tempfile.mkstemp(suffix=".csv")
os.close(temp_file) # Close the file descriptor
with open(temp_file_path, "wb") as file:
file.write(response.content)
return temp_file_path
else:
raise Exception(
f"Failed to download CSV. Status code: {response.status_code}"
)
except Exception as e:
raise e


def main(data_path="all_indicators.feather"):
logger.error(f"Error processing row {index}")
write_error_to_file(invocation, e)


def write_error_to_file(invocation: Invocation, error: Exception):
with open(ERROR_CSV_PATH, "a") as csv_file, open(ERROR_LOG_PATH, "a") as log_file:
# Write the problematic invocation data to the CSV
row_dict = {
**invocation.metrics,
**invocation.work.dict(),
**invocation.client.dict(),
}
pd.DataFrame([row_dict]).to_csv(
csv_file, header=csv_file.tell() == 0, index=False
)

# Log the error details
log_file.write(f"Error processing invocation: {invocation}\nError: {error}\n\n")


def main():
df = pd.read_feather("all_indicators.feather", dtype_backend="pyarrow")
if df.empty:
raise Exception("Dataframe is empty")
try:
transformed_pickle = Path("invocations.pkl")
if transformed_pickle.exists():
df = pickle.loads(transformed_pickle.read_bytes())
else:
breakpoint()
df = read_data(data_path)
if not df.empty:
invocations = transform_data(df)
transformed_pickle.write_bytes(pickle.dumps(invocations))
else:
raise Exception("Dataframe is empty")
db_url = os.getenv("DATABASE_URL", None)
db_name = os.getenv("DATABASE_NAME", None)
logger.warning(f"Uploading data to {db_url}")
upload_data(invocations, db_url, db_name)
db = pymongo.MongoClient(MONGO_URI).osm
db.invocation.insert_many(transform_data(df))
except Exception as e:
breakpoint()
logger.error(f"Failed to process data: {e}")
raise e
# raise e
breakpoint()


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit bbafba2

Please sign in to comment.