From d3adea54a8698911e961854f8520facb706824b2 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 29 May 2024 22:39:19 +0100 Subject: [PATCH] NiFi: updated cohort export script (chunking issue with low num of cpu threads). --- .../cogstack_cohort_generate_data.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/nifi/user-scripts/cogstack_cohort_generate_data.py b/nifi/user-scripts/cogstack_cohort_generate_data.py index f056c7ca..020d6de9 100644 --- a/nifi/user-scripts/cogstack_cohort_generate_data.py +++ b/nifi/user-scripts/cogstack_cohort_generate_data.py @@ -4,13 +4,12 @@ from datetime import datetime import os import traceback -import regex import multiprocess from multiprocess import Pool, Queue from collections import defaultdict, Counter from utils.ethnicity_map import ethnicity_map -from utils.generic import chunk, dict2json_file, dict2json_truncate_add_to_file +from utils.generic import chunk, dict2json_truncate_add_to_file # default values from /deploy/nifi.env USER_SCRIPT_LOGS_DIR = os.getenv("USER_SCRIPT_LOGS_DIR", "") @@ -138,7 +137,7 @@ def _process_annotation_records(annotation_records: list, _doc2ptt: dict): _cui2ptt_tsp = defaultdict(lambda: defaultdict(int)) try: - + # for each part of the MedCAT output (e.g., part_0.pickle) for annotation_record in annotation_records: annotation_entity = annotation_record @@ -150,7 +149,10 @@ def _process_annotation_records(annotation_records: list, _doc2ptt: dict): patient_id = _doc2ptt[str(docid)] cui = annotation_entity["nlp.cui"] - if annotation_entity["nlp.meta_anns"]["Subject"]["value"] == "Patient" and annotation_entity["nlp.meta_anns"]["Presence"]["value"] == "True" and annotation_entity["nlp.meta_anns"]["Time"]["value"] != "Future": + if annotation_entity["nlp.meta_anns"]["Subject"]["value"] == "Patient" and \ + annotation_entity["nlp.meta_anns"]["Presence"]["value"] == "True" and \ + annotation_entity["nlp.meta_anns"]["Time"]["value"] != "Future": + _cui2ptt_pos[cui][patient_id] += 1 if "timestamp" in annotation_entity.keys(): @@ -185,7 +187,7 @@ def multiprocess_patient_records(input_patient_record_data: dict): with Pool(processes=CPU_THREADS) as patient_process_pool: rec_que = Queue() - record_chunks = list(chunk(input_patient_record_data, CPU_THREADS)) + record_chunks = list(chunk(input_patient_record_data, int(len(input_patient_record_data) / CPU_THREADS))) counter = 0 for record_chunk in record_chunks: @@ -197,7 +199,7 @@ def multiprocess_patient_records(input_patient_record_data: dict): for result in patient_process_pool_results: result_data = result.get(timeout=TIMEOUT) _ptt2sex, _ptt2eth, _ptt2dob, _ptt2age, _ptt2dod, _doc2ptt = result_data[0][0], result_data[0][1], result_data[0][2], result_data[0][3], result_data[0][4], result_data[0][5] - + ptt2sex.update(_ptt2sex) ptt2eth.update(_ptt2eth) ptt2dob.update(_ptt2dob) @@ -206,13 +208,14 @@ def multiprocess_patient_records(input_patient_record_data: dict): doc2ptt.update(_doc2ptt) except Exception as exception: - time = datetime.datetime.now() + time = datetime.now() with open(log_file_path, "a+") as log_file: log_file.write("\n" + str(time) + ": " + str(exception)) log_file.write("\n" + str(time) + ": " + traceback.format_exc()) return doc2ptt, ptt2dod, ptt2age, ptt2dob, ptt2eth, ptt2sex + def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict): # cui2ptt_pos.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a count {: {:, ...}}\n... @@ -224,10 +227,9 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict): annotation_process_pool_results = [] with Pool(processes=CPU_THREADS) as annotations_process_pool: - rec_que = Queue() - record_chunks = list(chunk(input_annotations, CPU_THREADS)) + record_chunks = list(chunk(input_annotations, int(len(input_annotations) / CPU_THREADS))) counter = 0 for record_chunk in record_chunks: @@ -255,12 +257,11 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict): ############################################# - # for testing -# OUTPUT_FOLDER_PATH = "../../data/cogstack-cohort/" -# INPUT_FOLDER_PATH = "../../data/cogstack-cohort/" -# INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN = "medical_reports_anns_" -# INPUT_PATIENT_RECORD_FILE_NAME_PATTERN = "medical_reports_text__" +#OUTPUT_FOLDER_PATH = "../../data/cogstack-cohort/" +#INPUT_FOLDER_PATH = "../../data/cogstack-cohort/" +#INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN = "medical_reports_anns_" +#INPUT_PATIENT_RECORD_FILE_NAME_PATTERN = "medical_reports_text__" global_doc2ptt = {}