Skip to content

Commit

Permalink
NiFi: updated cohort export script (chunking issue with low num of cp…
Browse files Browse the repository at this point in the history
…u threads).
  • Loading branch information
vladd-bit committed May 29, 2024
1 parent 452e998 commit d3adea5
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions nifi/user-scripts/cogstack_cohort_generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
from datetime import datetime
import os
import traceback
import regex
import multiprocess

from multiprocess import Pool, Queue
from collections import defaultdict, Counter
from utils.ethnicity_map import ethnicity_map
from utils.generic import chunk, dict2json_file, dict2json_truncate_add_to_file
from utils.generic import chunk, dict2json_truncate_add_to_file

# default values from /deploy/nifi.env
USER_SCRIPT_LOGS_DIR = os.getenv("USER_SCRIPT_LOGS_DIR", "")
Expand Down Expand Up @@ -138,7 +137,7 @@ def _process_annotation_records(annotation_records: list, _doc2ptt: dict):
_cui2ptt_tsp = defaultdict(lambda: defaultdict(int))

try:

# for each part of the MedCAT output (e.g., part_0.pickle)
for annotation_record in annotation_records:
annotation_entity = annotation_record
Expand All @@ -150,7 +149,10 @@ def _process_annotation_records(annotation_records: list, _doc2ptt: dict):
patient_id = _doc2ptt[str(docid)]
cui = annotation_entity["nlp.cui"]

if annotation_entity["nlp.meta_anns"]["Subject"]["value"] == "Patient" and annotation_entity["nlp.meta_anns"]["Presence"]["value"] == "True" and annotation_entity["nlp.meta_anns"]["Time"]["value"] != "Future":
if annotation_entity["nlp.meta_anns"]["Subject"]["value"] == "Patient" and \
annotation_entity["nlp.meta_anns"]["Presence"]["value"] == "True" and \
annotation_entity["nlp.meta_anns"]["Time"]["value"] != "Future":

_cui2ptt_pos[cui][patient_id] += 1

if "timestamp" in annotation_entity.keys():
Expand Down Expand Up @@ -185,7 +187,7 @@ def multiprocess_patient_records(input_patient_record_data: dict):
with Pool(processes=CPU_THREADS) as patient_process_pool:
rec_que = Queue()

record_chunks = list(chunk(input_patient_record_data, CPU_THREADS))
record_chunks = list(chunk(input_patient_record_data, int(len(input_patient_record_data) / CPU_THREADS)))

counter = 0
for record_chunk in record_chunks:
Expand All @@ -197,7 +199,7 @@ def multiprocess_patient_records(input_patient_record_data: dict):
for result in patient_process_pool_results:
result_data = result.get(timeout=TIMEOUT)
_ptt2sex, _ptt2eth, _ptt2dob, _ptt2age, _ptt2dod, _doc2ptt = result_data[0][0], result_data[0][1], result_data[0][2], result_data[0][3], result_data[0][4], result_data[0][5]

ptt2sex.update(_ptt2sex)
ptt2eth.update(_ptt2eth)
ptt2dob.update(_ptt2dob)
Expand All @@ -206,13 +208,14 @@ def multiprocess_patient_records(input_patient_record_data: dict):
doc2ptt.update(_doc2ptt)

except Exception as exception:
time = datetime.datetime.now()
time = datetime.now()
with open(log_file_path, "a+") as log_file:
log_file.write("\n" + str(time) + ": " + str(exception))
log_file.write("\n" + str(time) + ": " + traceback.format_exc())

return doc2ptt, ptt2dod, ptt2age, ptt2dob, ptt2eth, ptt2sex


def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):

# cui2ptt_pos.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a count {<cui>: {<patient_id>:<count>, ...}}\n...
Expand All @@ -224,10 +227,9 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):
annotation_process_pool_results = []

with Pool(processes=CPU_THREADS) as annotations_process_pool:

rec_que = Queue()

record_chunks = list(chunk(input_annotations, CPU_THREADS))
record_chunks = list(chunk(input_annotations, int(len(input_annotations) / CPU_THREADS)))

counter = 0
for record_chunk in record_chunks:
Expand Down Expand Up @@ -255,12 +257,11 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):
#############################################



# for testing
# OUTPUT_FOLDER_PATH = "../../data/cogstack-cohort/"
# INPUT_FOLDER_PATH = "../../data/cogstack-cohort/"
# INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN = "medical_reports_anns_"
# INPUT_PATIENT_RECORD_FILE_NAME_PATTERN = "medical_reports_text__"
#OUTPUT_FOLDER_PATH = "../../data/cogstack-cohort/"
#INPUT_FOLDER_PATH = "../../data/cogstack-cohort/"
#INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN = "medical_reports_anns_"
#INPUT_PATIENT_RECORD_FILE_NAME_PATTERN = "medical_reports_text__"

global_doc2ptt = {}

Expand Down

0 comments on commit d3adea5

Please sign in to comment.