Skip to content

Commit

Permalink
Merge pull request #167 from shikshalokam/release-5.1.0
Browse files Browse the repository at this point in the history
Memory issue addressed
  • Loading branch information
aks30 authored Dec 27, 2023
2 parents 28d2ea9 + 1d42ec2 commit 017fda4
Showing 1 changed file with 69 additions and 102 deletions.
171 changes: 69 additions & 102 deletions urgent_data_metrics/imp_project_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str],
]), True),
])



def searchEntities(url,ids_list):
try:
returnData = {}
apiSuccessFlag = False
headers = {
'Authorization': config.get('API_HEADERS', 'authorization_access_token'),
'content-Type': 'application/json'
Expand All @@ -188,7 +188,7 @@ def searchEntities(url,ids_list):
response = requests.request("POST", url, headers=headers, data=payload)
delta_ids = []
entity_name_mapping = {}

if response.status_code == 200:
# convert the response to dictionary
response = response.json()
Expand All @@ -205,65 +205,19 @@ def searchEntities(url,ids_list):

# check with the input data to make sure there are no missing data from loc search
delta_ids = list(set(ids_list) - set(ids_from_api))
apiSuccessFlag = True
else :
delta_ids = ids_list
# if there are missing data , fetch the data from mongo

if len(delta_ids) > 0 :
# aggregate mongo query to fetch data from mongo
delta_loc = projectsCollec.aggregate([
{
"$match": {
"userProfile.userLocations": {
"$elemMatch": {
"id": {
"$in": delta_ids
}
}
}
}
},
{
"$unwind": "$userProfile.userLocations"
},
{
"$match": {
"userProfile.userLocations.id": {
"$in": delta_ids
}
}
},
{
"$sort": {
"createdAt": -1
}
},
{
"$group": {
"_id": "$userProfile.userLocations.id",
"mostRecentDocument": { "$first": "$$ROOT" }
}
},
{
"$replaceRoot": { "newRoot": "$mostRecentDocument" }
},
{
"$project": {
"_id": 1,
"userProfile.userLocations": 1
}
}
])
# add delta entities to master variable
for index in delta_loc:
entity_name_mapping[index['userProfile']["userLocations"]['id']] = index['userProfile']["userLocations"]['name']

return entity_name_mapping
returnData['mapping'] = entity_name_mapping
returnData['apiSuccessFlag'] = apiSuccessFlag
returnData['delta'] = delta_ids
return returnData

except Exception as e:
errorLogger.error(e,exc_info=True)

projects_df = spark.createDataFrame(projects_cursorMongo,projects_schema)

projects_df = projects_df.withColumn(
"project_evidence_status",
F.when(
Expand Down Expand Up @@ -309,6 +263,20 @@ def searchEntities(url,ids_list):
projects_df["district"],
projects_df["state"],
)
# DataFrame for user locations values of State and Districts only
userLocations_df = melt(projects_df,
id_vars=["_id","exploded_userLocations.name","exploded_userLocations.type","exploded_userLocations.id"],
value_vars=["exploded_userLocations.code"]
).select("_id","id","name","value","type").filter((col("type") == "state") | (col("type") == "district")).dropDuplicates()

# Fetch only Latest Data of Locations from the DF
userLocations_df = userLocations_df.groupBy("id").agg(
first("_id", ignorenulls=True).alias("projectId"),
first("name", ignorenulls=True).alias("name"),
first("value", ignorenulls=True).alias("value"),
first("type", ignorenulls=True).alias("type")
)

projects_df_final = projects_df_final.dropDuplicates()

district_final_df = projects_df_final.groupBy("state","district")\
Expand All @@ -335,50 +303,49 @@ def searchEntities(url,ids_list):
# call function to get the entity from location master
response = searchEntities(config.get("API_ENDPOINTS", "base_url") + config.get("API_ENDPOINTS", "location_search"),ids_list)

if response :
# Convert dictionary to list of tuples
data_tuples = list(response.items())

# Define the schema
state_schema = StructType([StructField("id", StringType(), True), StructField("state_name", StringType(), True)])
district_schema = StructType([StructField("id", StringType(), True), StructField("district_name", StringType(), True)])

# Create a DataFrame
state_id_mapping = spark.createDataFrame(data_tuples, schema=state_schema)

# Create a DataFrame
district_id_mapping = spark.createDataFrame(data_tuples, schema=district_schema)

district_final_df = district_final_df.join(state_id_mapping, district_final_df["state"] == state_id_mapping["id"], "left")

district_final_df = district_final_df.join(district_id_mapping, district_final_df["district"] == district_id_mapping["id"], "left")

final_data_to_csv = district_final_df.select("state_name","district_name","Total_Micro_Improvement_Projects","Total_Micro_Improvement_Started","Total_Micro_Improvement_InProgress","Total_Micro_Improvement_Submitted","Total_Micro_Improvement_Submitted_With_Evidence").sort("state_name","district_name")

# DF To file
local_path = config.get("COMMON", "nvsk_imp_projects_data_local_path")
blob_path = config.get("COMMON", "nvsk_imp_projects_data_blob_path")
final_data_to_csv.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(local_path)
final_data_to_csv.unpersist()

# Renaming a file
path = local_path
extension = 'csv'
os.chdir(path)
result = glob.glob(f'*.{extension}')
os.rename(f'{path}' + f'{result[0]}', f'{path}' + 'data.csv')

# Uploading file to Cloud
cloud_init.upload_to_cloud(blob_Path = blob_path, local_Path = local_path, file_Name = 'data.csv')

print("file got uploaded to Cloud.")
print("DONE")

else:
try:
raise ValueError("Entity Search API failed.")
except Exception as e:
# Log the custom error message along with exception information
error_message = "API error: {}".format(e)
errorLogger.error(error_message, exc_info=True)

data_tuples = [] #empty List for creating the DF

# if Location search API is success get the mapping details from API
if response['apiSuccessFlag']:
# Convert dictionary to list of tuples
data_tuples = list(response['mapping'].items())

# if any delta ids found , fetch the details from DF
if response['delta']:
delta_ids_from_response = userLocations_df.filter(col("id").isin(response['delta']))
for row in delta_ids_from_response.collect() :
data_tuples.append((row['id'],row['name']))

# Define the schema for State details
state_schema = StructType([StructField("id", StringType(), True), StructField("state_name", StringType(), True)])

# Define the schema for District details
district_schema = StructType([StructField("id", StringType(), True), StructField("district_name", StringType(), True)])

# Create a DataFrame for State
state_id_mapping = spark.createDataFrame(data_tuples, schema=state_schema)

# Create a DataFrame for District
district_id_mapping = spark.createDataFrame(data_tuples, schema=district_schema)

# Join to get the State names from State ids
district_final_df = district_final_df.join(state_id_mapping, district_final_df["state"] == state_id_mapping["id"], "left")
# Join to get the State names from District ids
district_final_df = district_final_df.join(district_id_mapping, district_final_df["district"] == district_id_mapping["id"], "left")
# Select only relevant fields to prepare the final DF , Sort it wrt state names
final_data_to_csv = district_final_df.select("state_name","district_name","Total_Micro_Improvement_Projects","Total_Micro_Improvement_Started","Total_Micro_Improvement_InProgress","Total_Micro_Improvement_Submitted","Total_Micro_Improvement_Submitted_With_Evidence").sort("state_name","district_name")
# DF To file
local_path = config.get("COMMON", "nvsk_imp_projects_data_local_path")
blob_path = config.get("COMMON", "nvsk_imp_projects_data_blob_path")
final_data_to_csv.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(local_path)
final_data_to_csv.unpersist()
# Renaming a file
path = local_path
extension = 'csv'
os.chdir(path)
result = glob.glob(f'*.{extension}')
os.rename(f'{path}' + f'{result[0]}', f'{path}' + 'data.csv')
# Uploading file to Cloud
cloud_init.upload_to_cloud(blob_Path = blob_path, local_Path = local_path, file_Name = 'data.csv')
print("file got uploaded to Cloud.")
print("DONE")

0 comments on commit 017fda4

Please sign in to comment.