diff --git a/nifi/user-scripts/cogstack_cohort_generate_data.py b/nifi/user-scripts/cogstack_cohort_generate_data.py index 64d67105..9e8dbbcf 100644 --- a/nifi/user-scripts/cogstack_cohort_generate_data.py +++ b/nifi/user-scripts/cogstack_cohort_generate_data.py @@ -269,34 +269,18 @@ def multiprocess_annotation_records(input_annotations: dict): else: record_chunks = input_annotations + counter = 0 for record_chunk in record_chunks: rec_que.put(record_chunk) annotation_process_pool_results.append(annotations_process_pool.starmap_async(_process_annotation_records, [(rec_que.get(),)], error_callback=logging.error)) + counter += 1 - for result in annotation_process_pool_results: - result_data = result.get(timeout=TIMEOUT) + for result in annotation_process_pool_results: + result_data = result.get(timeout=TIMEOUT) - _cui2ptt_pos, _cui2ptt_tsp = result_data[0][0], result_data[0][1] - - for cui, patient_id_count_vals in _cui2ptt_pos.items(): - if cui not in cui2ptt_pos.keys(): - cui2ptt_pos[cui] = patient_id_count_vals - else: - for patient_id, count in patient_id_count_vals.items(): - if patient_id not in cui2ptt_pos[cui].keys(): - cui2ptt_pos[cui][patient_id] = count - else: - cui2ptt_pos[cui][patient_id] += count - - for cui, patient_id_timestamps in _cui2ptt_tsp.items(): - if cui not in cui2ptt_tsp.keys(): - cui2ptt_tsp[cui] = patient_id_timestamps - else: - for patient_id, timestamp in patient_id_timestamps.items(): - if patient_id not in cui2ptt_tsp[cui].keys(): - cui2ptt_tsp[cui][patient_id] = timestamp - else: - cui2ptt_tsp[cui][patient_id] = timestamp + _cui2ptt_pos, _cui2ptt_tsp = result_data[0][0], result_data[0][1] + cui2ptt_pos.update(_cui2ptt_pos) + cui2ptt_tsp.update(_cui2ptt_tsp) except Exception as exception: time = datetime.now() @@ -372,13 +356,6 @@ def multiprocess_annotation_records(input_annotations: dict): global_doc2ptt = json.loads(global_doc2ptt) if INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN: - - # cui2ptt_pos.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a count {: {:, ...}}\n... - cui2ptt_pos = defaultdict(Counter) # store the count of a SNOMED term for a patient - - # cui2ptt_tsp.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a timestamp {: {:, ...}}\n... - cui2ptt_tsp = defaultdict(lambda: defaultdict(int)) # store the first mention timestamp of a SNOMED term for a patient - # read each of the patient record files one by one for root, sub_directories, files in os.walk(INPUT_FOLDER_PATH): for file_name in files: @@ -390,42 +367,21 @@ def multiprocess_annotation_records(input_annotations: dict): with open(f_path, mode="r+") as f: contents = json.loads(f.read()) - _cui2ptt_pos, _cui2ptt_tsp = multiprocess_annotation_records(contents) + cui2ptt_pos, cui2ptt_tsp = multiprocess_annotation_records(contents) + with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_pos.jsonl"), "a+", encoding="utf-8") as outfile: + for k,v in cui2ptt_pos.items(): + o = {k: v} + json_obj = json.loads(json.dumps(o)) + json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':')) + print('', file=outfile) + + with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_tsp.jsonl"), "a+", encoding="utf-8") as outfile: + for k,v in cui2ptt_tsp.items(): + o = {k: v} + json_obj = json.loads(json.dumps(o)) + json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':')) + print('', file=outfile) with open(log_file_path, "a+") as log_file: time = datetime.now() - log_file.write("\n" + str(time) + ": processed file " + str(file_name)) - - for cui, patient_id_count_vals in _cui2ptt_pos.items(): - if cui not in cui2ptt_pos.keys(): - cui2ptt_pos[cui] = patient_id_count_vals - else: - for patient_id, count in patient_id_count_vals.items(): - if patient_id not in cui2ptt_pos[cui]: - cui2ptt_pos[cui][patient_id] = count - else: - cui2ptt_pos[cui][patient_id] += count - - for cui, patient_id_timestamps in _cui2ptt_tsp.items(): - if cui not in cui2ptt_tsp.keys(): - cui2ptt_tsp[cui] = patient_id_timestamps - else: - for patient_id, timestamp in patient_id_timestamps.items(): - if patient_id not in cui2ptt_pos[cui].keys(): - cui2ptt_tsp[cui][patient_id] = timestamp - else: - cui2ptt_tsp[cui][patient_id] = timestamp - - with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_pos.jsonl"), "a+", encoding="utf-8") as outfile: - for k,v in cui2ptt_pos.items(): - o = {k: v} - json_obj = json.loads(json.dumps(o)) - json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':')) - print('', file=outfile) - - with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_tsp.jsonl"), "a+", encoding="utf-8") as outfile: - for k,v in cui2ptt_tsp.items(): - o = {k: v} - json_obj = json.loads(json.dumps(o)) - json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':')) - print('', file=outfile) + log_file.write("\n" + str(time) + ": processed file " + str(file_name)) \ No newline at end of file