From b22c6096b54a50b9ffceb27b7154312d1622f362 Mon Sep 17 00:00:00 2001 From: Chao Pang Date: Fri, 25 Oct 2024 22:14:58 -0400 Subject: [PATCH 1/6] added time_field_options to specify a list of datetime fields to extract from for the event time --- src/meds_etl/omop.py | 47 +++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/meds_etl/omop.py b/src/meds_etl/omop.py index 96d3f39..e448bbb 100644 --- a/src/meds_etl/omop.py +++ b/src/meds_etl/omop.py @@ -179,6 +179,12 @@ def write_event_data( ), ) else: + # User defined time field options + time_field_options = [ + cast_to_datetime(schema, option, move_to_end_of_day=True) + for option in table_details.get("time_field_options", []) + if option in schema.names() + ] # Use the OMOP table name + `_start_datetime` as the `time` column # if it's available otherwise `_start_date`, `_datetime`, `_date` # in that order of preference @@ -188,6 +194,8 @@ def write_event_data( for option in options if table_name + option in schema.names() ] + # We prefer user defined time field options over the default time options if available + options = time_field_options + options assert len(options) > 0, f"Could not find the time column {schema.names()}" time = pl.coalesce(options) @@ -286,7 +294,8 @@ def write_event_data( unit_columns.append(pl.col("unit_source_value")) if "unit_concept_id" in schema.names(): unit_columns.append( - pl.col("unit_concept_id").replace_strict(concept_id_map, return_dtype=pl.Utf8(), default=None)) + pl.col("unit_concept_id").replace_strict(concept_id_map, return_dtype=pl.Utf8(), default=None) + ) if unit_columns: metadata["unit"] = pl.coalesce(unit_columns) @@ -450,9 +459,12 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v # and use it to generate metadata file as well as populate maps # from (concept ID -> concept code) and (concept ID -> concept name) print("Generating metadata from OMOP `concept` table") - for concept_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept")), - total=len(get_table_files(path_to_src_omop_dir, "concept")[0]) + len(get_table_files(path_to_src_omop_dir, "concept")[1]), - desc="Generating metadata from OMOP `concept` table"): + for concept_file in tqdm( + itertools.chain(*get_table_files(path_to_src_omop_dir, "concept")), + total=len(get_table_files(path_to_src_omop_dir, "concept")[0]) + + len(get_table_files(path_to_src_omop_dir, "concept")[1]), + desc="Generating metadata from OMOP `concept` table", + ): # Note: Concept table is often split into gzipped shards by default if verbose: print(concept_file) @@ -492,9 +504,12 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v # Include map from custom concepts to normalized (ie standard ontology) # parent concepts, where possible, in the code_metadata dictionary - for concept_relationship_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept_relationship")), - total=len(get_table_files(path_to_src_omop_dir, "concept_relationship")[0]) + len(get_table_files(path_to_src_omop_dir, "concept_relationship")[1]), - desc="Generating metadata from OMOP `concept_relationship` table"): + for concept_relationship_file in tqdm( + itertools.chain(*get_table_files(path_to_src_omop_dir, "concept_relationship")), + total=len(get_table_files(path_to_src_omop_dir, "concept_relationship")[0]) + + len(get_table_files(path_to_src_omop_dir, "concept_relationship")[1]), + desc="Generating metadata from OMOP `concept_relationship` table", + ): with load_file(path_to_decompressed_dir, concept_relationship_file) as f: # This table has `concept_id_1`, `concept_id_2`, `relationship_id` columns concept_relationship = read_polars_df(f.name) @@ -521,9 +536,12 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v # Extract dataset metadata e.g., the CDM source name and its release date datasets: List[str] = [] dataset_versions: List[str] = [] - for cdm_source_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "cdm_source")), - total=len(get_table_files(path_to_src_omop_dir, "cdm_source")[0]) + len(get_table_files(path_to_src_omop_dir, "cdm_source")[1]), - desc="Extracting dataset metadata"): + for cdm_source_file in tqdm( + itertools.chain(*get_table_files(path_to_src_omop_dir, "cdm_source")), + total=len(get_table_files(path_to_src_omop_dir, "cdm_source")[0]) + + len(get_table_files(path_to_src_omop_dir, "cdm_source")[1]), + desc="Extracting dataset metadata", + ): with load_file(path_to_decompressed_dir, cdm_source_file) as f: cdm_source = read_polars_df(f.name) cdm_source = cdm_source.rename({c: c.lower() for c in cdm_source.columns}) @@ -668,7 +686,14 @@ def main(): "drug_exposure": { "concept_id_field": "drug_concept_id", }, - "visit": {"fallback_concept_id": DEFAULT_VISIT_CONCEPT_ID, "file_suffix": "occurrence"}, + "visit": [ + {"fallback_concept_id": DEFAULT_VISIT_CONCEPT_ID, "file_suffix": "occurrence"}, + { + "concept_id_field": "discharged_to_concept_id", + "time_field_options": ["visit_end_datetime", "visit_end_date"], + "file_suffix": "occurrence", + }, + ], "condition": { "file_suffix": "occurrence", }, From d7c9d6d5b79a6e8f236b8566173b7f337d77543a Mon Sep 17 00:00:00 2001 From: Chao Pang Date: Fri, 25 Oct 2024 22:46:14 -0400 Subject: [PATCH 2/6] added a unit test for extracting the time stamp of discharged_to_concept_id --- tests/test_omop.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 tests/test_omop.py diff --git a/tests/test_omop.py b/tests/test_omop.py new file mode 100644 index 0000000..a5a42b2 --- /dev/null +++ b/tests/test_omop.py @@ -0,0 +1,61 @@ +import datetime +import tempfile +from pathlib import Path + +import polars as pl + +from meds_etl.omop import DEFAULT_VISIT_CONCEPT_ID, write_event_data + + +def test_discharged_to_concept_id_correct(): + """ + Test the tests. + """ + # Define the schema and a sample record for the OMOP visit table + visit_table_details = [ + {"fallback_concept_id": DEFAULT_VISIT_CONCEPT_ID, "file_suffix": "occurrence"}, + { + "concept_id_field": "discharged_to_concept_id", + "time_field_options": ["visit_end_datetime", "visit_end_date"], + "file_suffix": "occurrence", + }, + ] + visit_occurrence = pl.DataFrame( + { + "visit_occurrence_id": [1], + "person_id": [12345], + "visit_concept_id": [9201], # Example: 9201 for inpatient visit + "visit_start_date": [datetime.date(2024, 10, 25)], + "visit_end_date": [datetime.date(2024, 10, 28)], + "visit_type_concept_id": [44818517], # Example: 44818517 for primary care visit + "provider_id": [56789], + "care_site_id": [101], + "visit_source_value": ["Visit/IP"], + "visit_source_concept_id": [9201], # Use 0 if no mapping exists + "admitting_source_concept_id": [38004294], # Example: 38004294 for Emergency Room + "discharged_to_concept_id": [38004453], # Example: 38004453 for Home + "preceding_visit_occurrence_id": [None], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + write_event_data( + path_to_MEDS_unsorted_dir=tmpdir, + get_batch=lambda: visit_occurrence.lazy(), + table_name="visit", + all_table_details=visit_table_details, + concept_id_map={ + 9201: "Visit/IP", + 38004453: "SNOMED/38004453", + 319835: "Hypertension", + 45763524: "Diabetes", + }, + concept_name_map={9201: "Inpatient Visit", 38004453: "Home", 319835: "Hypertension", 45763524: "Diabetes"}, + ) + expected_meds = pl.read_parquet(Path(tmpdir).glob("*.parquet")) + assert len(expected_meds) == 2 + expected_meds_dicts = expected_meds.sort("time").to_dicts() + assert expected_meds_dicts[0]["code"] == "Visit/IP" + assert expected_meds_dicts[0]["time"] == datetime.datetime(2024, 10, 25, 23, 59, 59) + assert expected_meds_dicts[1]["code"] == "SNOMED/38004453" + assert expected_meds_dicts[1]["time"] == datetime.datetime(2024, 10, 28, 23, 59, 59) From 425b3d8ff014889e716a719e7ee44681b238f369 Mon Sep 17 00:00:00 2001 From: Chao Pang Date: Fri, 25 Oct 2024 22:53:12 -0400 Subject: [PATCH 3/6] restored the lines of code automatically changed by formatter --- src/meds_etl/omop.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/src/meds_etl/omop.py b/src/meds_etl/omop.py index e448bbb..6112411 100644 --- a/src/meds_etl/omop.py +++ b/src/meds_etl/omop.py @@ -294,8 +294,7 @@ def write_event_data( unit_columns.append(pl.col("unit_source_value")) if "unit_concept_id" in schema.names(): unit_columns.append( - pl.col("unit_concept_id").replace_strict(concept_id_map, return_dtype=pl.Utf8(), default=None) - ) + pl.col("unit_concept_id").replace_strict(concept_id_map, return_dtype=pl.Utf8(), default=None)) if unit_columns: metadata["unit"] = pl.coalesce(unit_columns) @@ -459,12 +458,9 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v # and use it to generate metadata file as well as populate maps # from (concept ID -> concept code) and (concept ID -> concept name) print("Generating metadata from OMOP `concept` table") - for concept_file in tqdm( - itertools.chain(*get_table_files(path_to_src_omop_dir, "concept")), - total=len(get_table_files(path_to_src_omop_dir, "concept")[0]) - + len(get_table_files(path_to_src_omop_dir, "concept")[1]), - desc="Generating metadata from OMOP `concept` table", - ): + for concept_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept")), + total=len(get_table_files(path_to_src_omop_dir, "concept")[0]) + len(get_table_files(path_to_src_omop_dir, "concept")[1]), + desc="Generating metadata from OMOP `concept` table"): # Note: Concept table is often split into gzipped shards by default if verbose: print(concept_file) @@ -504,12 +500,9 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v # Include map from custom concepts to normalized (ie standard ontology) # parent concepts, where possible, in the code_metadata dictionary - for concept_relationship_file in tqdm( - itertools.chain(*get_table_files(path_to_src_omop_dir, "concept_relationship")), - total=len(get_table_files(path_to_src_omop_dir, "concept_relationship")[0]) - + len(get_table_files(path_to_src_omop_dir, "concept_relationship")[1]), - desc="Generating metadata from OMOP `concept_relationship` table", - ): + for concept_relationship_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept_relationship")), + total=len(get_table_files(path_to_src_omop_dir, "concept_relationship")[0]) + len(get_table_files(path_to_src_omop_dir, "concept_relationship")[1]), + desc="Generating metadata from OMOP `concept_relationship` table"): with load_file(path_to_decompressed_dir, concept_relationship_file) as f: # This table has `concept_id_1`, `concept_id_2`, `relationship_id` columns concept_relationship = read_polars_df(f.name) @@ -536,12 +529,9 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v # Extract dataset metadata e.g., the CDM source name and its release date datasets: List[str] = [] dataset_versions: List[str] = [] - for cdm_source_file in tqdm( - itertools.chain(*get_table_files(path_to_src_omop_dir, "cdm_source")), - total=len(get_table_files(path_to_src_omop_dir, "cdm_source")[0]) - + len(get_table_files(path_to_src_omop_dir, "cdm_source")[1]), - desc="Extracting dataset metadata", - ): + for cdm_source_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "cdm_source")), + total=len(get_table_files(path_to_src_omop_dir, "cdm_source")[0]) + len(get_table_files(path_to_src_omop_dir, "cdm_source")[1]), + desc="Extracting dataset metadata"): with load_file(path_to_decompressed_dir, cdm_source_file) as f: cdm_source = read_polars_df(f.name) cdm_source = cdm_source.rename({c: c.lower() for c in cdm_source.columns}) From d42f672b0d3a9030875c885256135bcb1cc53ea0 Mon Sep 17 00:00:00 2001 From: Chao Pang Date: Fri, 25 Oct 2024 22:54:07 -0400 Subject: [PATCH 4/6] fixed test_omop.py --- tests/test_omop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_omop.py b/tests/test_omop.py index a5a42b2..607b115 100644 --- a/tests/test_omop.py +++ b/tests/test_omop.py @@ -52,7 +52,7 @@ def test_discharged_to_concept_id_correct(): }, concept_name_map={9201: "Inpatient Visit", 38004453: "Home", 319835: "Hypertension", 45763524: "Diabetes"}, ) - expected_meds = pl.read_parquet(Path(tmpdir).glob("*.parquet")) + expected_meds = pl.read_parquet(list(Path(tmpdir).glob("*.parquet"))) assert len(expected_meds) == 2 expected_meds_dicts = expected_meds.sort("time").to_dicts() assert expected_meds_dicts[0]["code"] == "Visit/IP" From d4803654847eeb46c470f5a3b60cb9a8e57aaa6b Mon Sep 17 00:00:00 2001 From: Chao Pang Date: Wed, 30 Oct 2024 07:04:25 -0400 Subject: [PATCH 5/6] Consolidated the logic to merge user defined time_option_fields and default time options --- src/meds_etl/omop.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/meds_etl/omop.py b/src/meds_etl/omop.py index 6112411..d98e748 100644 --- a/src/meds_etl/omop.py +++ b/src/meds_etl/omop.py @@ -179,23 +179,21 @@ def write_event_data( ), ) else: - # User defined time field options - time_field_options = [ - cast_to_datetime(schema, option, move_to_end_of_day=True) - for option in table_details.get("time_field_options", []) - if option in schema.names() - ] # Use the OMOP table name + `_start_datetime` as the `time` column # if it's available otherwise `_start_date`, `_datetime`, `_date` # in that order of preference - options = ["_start_datetime", "_start_date", "_datetime", "_date"] + # We prefer user defined time field options over the default time options if available + options = table_details.get("time_field_options", []) + [ + table_name + "_start_datetime", + table_name + "_start_date", + table_name + "_datetime", + table_name + "_date", + ] options = [ - cast_to_datetime(schema, table_name + option, move_to_end_of_day=True) + cast_to_datetime(schema, option, move_to_end_of_day=True) for option in options - if table_name + option in schema.names() + if option in schema.names() ] - # We prefer user defined time field options over the default time options if available - options = time_field_options + options assert len(options) > 0, f"Could not find the time column {schema.names()}" time = pl.coalesce(options) From e502251c40acd5b189963f7b09d80aceca4428cc Mon Sep 17 00:00:00 2001 From: Chao Pang Date: Wed, 30 Oct 2024 07:06:25 -0400 Subject: [PATCH 6/6] added more test cases for extracting discharged_to_concept_id --- tests/test_omop.py | 50 +++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/tests/test_omop.py b/tests/test_omop.py index 607b115..3bba698 100644 --- a/tests/test_omop.py +++ b/tests/test_omop.py @@ -22,19 +22,19 @@ def test_discharged_to_concept_id_correct(): ] visit_occurrence = pl.DataFrame( { - "visit_occurrence_id": [1], - "person_id": [12345], - "visit_concept_id": [9201], # Example: 9201 for inpatient visit - "visit_start_date": [datetime.date(2024, 10, 25)], - "visit_end_date": [datetime.date(2024, 10, 28)], - "visit_type_concept_id": [44818517], # Example: 44818517 for primary care visit - "provider_id": [56789], - "care_site_id": [101], - "visit_source_value": ["Visit/IP"], - "visit_source_concept_id": [9201], # Use 0 if no mapping exists - "admitting_source_concept_id": [38004294], # Example: 38004294 for Emergency Room - "discharged_to_concept_id": [38004453], # Example: 38004453 for Home - "preceding_visit_occurrence_id": [None], + "visit_occurrence_id": [1, 2, 3], + "person_id": [12345, 12345, 12345], + "visit_concept_id": [9201, 9202, 9202], # Example: 9201 for inpatient visit + "visit_start_date": [datetime.date(2024, 10, 25), datetime.date(2024, 10, 30), datetime.date(2024, 11, 1)], + "visit_end_date": [datetime.date(2024, 10, 28), datetime.date(2024, 10, 30), datetime.date(2024, 11, 1)], + "visit_type_concept_id": [44818517, 44818517, 44818517], # Example: 44818517 for primary care visit + "provider_id": [56789, 56789, 56789], + "care_site_id": [101, None, None], + "visit_source_value": ["Visit/IP", "Visit/OP", "Visit/OP"], + "visit_source_concept_id": [9201, 9202, 9202], # Use 0 if no mapping exists + "admitting_source_concept_id": [38004294, 0, 0], # Example: 38004294 for Emergency Room + "discharged_to_concept_id": [38004453, 0, None], # Example: 38004453 for Home + "preceding_visit_occurrence_id": [None, None, None], } ) @@ -46,16 +46,26 @@ def test_discharged_to_concept_id_correct(): all_table_details=visit_table_details, concept_id_map={ 9201: "Visit/IP", + 9202: "Visit/OP", 38004453: "SNOMED/38004453", 319835: "Hypertension", 45763524: "Diabetes", }, - concept_name_map={9201: "Inpatient Visit", 38004453: "Home", 319835: "Hypertension", 45763524: "Diabetes"}, + concept_name_map={ + 9201: "Inpatient Visit", + 9202: "Outpatient Visit", + 38004453: "Home", + 319835: "Hypertension", + 45763524: "Diabetes", + }, ) expected_meds = pl.read_parquet(list(Path(tmpdir).glob("*.parquet"))) - assert len(expected_meds) == 2 - expected_meds_dicts = expected_meds.sort("time").to_dicts() - assert expected_meds_dicts[0]["code"] == "Visit/IP" - assert expected_meds_dicts[0]["time"] == datetime.datetime(2024, 10, 25, 23, 59, 59) - assert expected_meds_dicts[1]["code"] == "SNOMED/38004453" - assert expected_meds_dicts[1]["time"] == datetime.datetime(2024, 10, 28, 23, 59, 59) + assert len(expected_meds) == 4 + actual_meds_dicts = expected_meds.sort("time").select("code", "time").to_dicts() + expected_meds_dicts = [ + {"code": "Visit/IP", "time": datetime.datetime(2024, 10, 25, 23, 59, 59)}, + {"code": "SNOMED/38004453", "time": datetime.datetime(2024, 10, 28, 23, 59, 59)}, + {"code": "Visit/OP", "time": datetime.datetime(2024, 10, 30, 23, 59, 59)}, + {"code": "Visit/OP", "time": datetime.datetime(2024, 11, 1, 23, 59, 59)}, + ] + assert actual_meds_dicts == expected_meds_dicts