Skip to content

Commit

Permalink
NiFi: reverted cohort export script (ann section).
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Aug 11, 2024
1 parent 7ea2ff7 commit f59090e
Showing 1 changed file with 22 additions and 66 deletions.
88 changes: 22 additions & 66 deletions nifi/user-scripts/cogstack_cohort_generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,34 +269,18 @@ def multiprocess_annotation_records(input_annotations: dict):
else:
record_chunks = input_annotations

counter = 0
for record_chunk in record_chunks:
rec_que.put(record_chunk)
annotation_process_pool_results.append(annotations_process_pool.starmap_async(_process_annotation_records, [(rec_que.get(),)], error_callback=logging.error))
counter += 1

for result in annotation_process_pool_results:
result_data = result.get(timeout=TIMEOUT)
for result in annotation_process_pool_results:
result_data = result.get(timeout=TIMEOUT)

_cui2ptt_pos, _cui2ptt_tsp = result_data[0][0], result_data[0][1]

for cui, patient_id_count_vals in _cui2ptt_pos.items():
if cui not in cui2ptt_pos.keys():
cui2ptt_pos[cui] = patient_id_count_vals
else:
for patient_id, count in patient_id_count_vals.items():
if patient_id not in cui2ptt_pos[cui].keys():
cui2ptt_pos[cui][patient_id] = count
else:
cui2ptt_pos[cui][patient_id] += count

for cui, patient_id_timestamps in _cui2ptt_tsp.items():
if cui not in cui2ptt_tsp.keys():
cui2ptt_tsp[cui] = patient_id_timestamps
else:
for patient_id, timestamp in patient_id_timestamps.items():
if patient_id not in cui2ptt_tsp[cui].keys():
cui2ptt_tsp[cui][patient_id] = timestamp
else:
cui2ptt_tsp[cui][patient_id] = timestamp
_cui2ptt_pos, _cui2ptt_tsp = result_data[0][0], result_data[0][1]
cui2ptt_pos.update(_cui2ptt_pos)
cui2ptt_tsp.update(_cui2ptt_tsp)

except Exception as exception:
time = datetime.now()
Expand Down Expand Up @@ -372,13 +356,6 @@ def multiprocess_annotation_records(input_annotations: dict):
global_doc2ptt = json.loads(global_doc2ptt)

if INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN:

# cui2ptt_pos.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a count {<cui>: {<patient_id>:<count>, ...}}\n...
cui2ptt_pos = defaultdict(Counter) # store the count of a SNOMED term for a patient

# cui2ptt_tsp.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a timestamp {<cui>: {<patient_id>:<tsp>, ...}}\n...
cui2ptt_tsp = defaultdict(lambda: defaultdict(int)) # store the first mention timestamp of a SNOMED term for a patient

# read each of the patient record files one by one
for root, sub_directories, files in os.walk(INPUT_FOLDER_PATH):
for file_name in files:
Expand All @@ -390,42 +367,21 @@ def multiprocess_annotation_records(input_annotations: dict):
with open(f_path, mode="r+") as f:
contents = json.loads(f.read())

_cui2ptt_pos, _cui2ptt_tsp = multiprocess_annotation_records(contents)
cui2ptt_pos, cui2ptt_tsp = multiprocess_annotation_records(contents)
with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_pos.jsonl"), "a+", encoding="utf-8") as outfile:
for k,v in cui2ptt_pos.items():
o = {k: v}
json_obj = json.loads(json.dumps(o))
json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':'))
print('', file=outfile)

with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_tsp.jsonl"), "a+", encoding="utf-8") as outfile:
for k,v in cui2ptt_tsp.items():
o = {k: v}
json_obj = json.loads(json.dumps(o))
json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':'))
print('', file=outfile)

with open(log_file_path, "a+") as log_file:
time = datetime.now()
log_file.write("\n" + str(time) + ": processed file " + str(file_name))

for cui, patient_id_count_vals in _cui2ptt_pos.items():
if cui not in cui2ptt_pos.keys():
cui2ptt_pos[cui] = patient_id_count_vals
else:
for patient_id, count in patient_id_count_vals.items():
if patient_id not in cui2ptt_pos[cui]:
cui2ptt_pos[cui][patient_id] = count
else:
cui2ptt_pos[cui][patient_id] += count

for cui, patient_id_timestamps in _cui2ptt_tsp.items():
if cui not in cui2ptt_tsp.keys():
cui2ptt_tsp[cui] = patient_id_timestamps
else:
for patient_id, timestamp in patient_id_timestamps.items():
if patient_id not in cui2ptt_pos[cui].keys():
cui2ptt_tsp[cui][patient_id] = timestamp
else:
cui2ptt_tsp[cui][patient_id] = timestamp

with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_pos.jsonl"), "a+", encoding="utf-8") as outfile:
for k,v in cui2ptt_pos.items():
o = {k: v}
json_obj = json.loads(json.dumps(o))
json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':'))
print('', file=outfile)

with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_tsp.jsonl"), "a+", encoding="utf-8") as outfile:
for k,v in cui2ptt_tsp.items():
o = {k: v}
json_obj = json.loads(json.dumps(o))
json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':'))
print('', file=outfile)
log_file.write("\n" + str(time) + ": processed file " + str(file_name))

0 comments on commit f59090e

Please sign in to comment.