Skip to content

Commit

Permalink
Merge pull request #42 from ChaoPang/add_discharge_concept_id_correct…
Browse files Browse the repository at this point in the history
…_time

extract events from the discharged_concept_id field of the visit table with the correct time
  • Loading branch information
ChaoPang authored Nov 1, 2024
2 parents 818ce90 + e502251 commit ba0eec1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 8 deletions.
29 changes: 21 additions & 8 deletions src/meds_etl/omop.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,17 @@ def write_event_data(
# Use the OMOP table name + `_start_datetime` as the `time` column
# if it's available otherwise `_start_date`, `_datetime`, `_date`
# in that order of preference
options = ["_start_datetime", "_start_date", "_datetime", "_date"]
# We prefer user defined time field options over the default time options if available
options = table_details.get("time_field_options", []) + [
table_name + "_start_datetime",
table_name + "_start_date",
table_name + "_datetime",
table_name + "_date",
]
options = [
cast_to_datetime(schema, table_name + option, move_to_end_of_day=True)
cast_to_datetime(schema, option, move_to_end_of_day=True)
for option in options
if table_name + option in schema.names()
if option in schema.names()
]
assert len(options) > 0, f"Could not find the time column {schema.names()}"
time = pl.coalesce(options)
Expand Down Expand Up @@ -450,8 +456,8 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v
# and use it to generate metadata file as well as populate maps
# from (concept ID -> concept code) and (concept ID -> concept name)
print("Generating metadata from OMOP `concept` table")
for concept_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept")),
total=len(get_table_files(path_to_src_omop_dir, "concept")[0]) + len(get_table_files(path_to_src_omop_dir, "concept")[1]),
for concept_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept")),
total=len(get_table_files(path_to_src_omop_dir, "concept")[0]) + len(get_table_files(path_to_src_omop_dir, "concept")[1]),
desc="Generating metadata from OMOP `concept` table"):
# Note: Concept table is often split into gzipped shards by default
if verbose:
Expand Down Expand Up @@ -492,8 +498,8 @@ def extract_metadata(path_to_src_omop_dir: str, path_to_decompressed_dir: str, v

# Include map from custom concepts to normalized (ie standard ontology)
# parent concepts, where possible, in the code_metadata dictionary
for concept_relationship_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept_relationship")),
total=len(get_table_files(path_to_src_omop_dir, "concept_relationship")[0]) + len(get_table_files(path_to_src_omop_dir, "concept_relationship")[1]),
for concept_relationship_file in tqdm(itertools.chain(*get_table_files(path_to_src_omop_dir, "concept_relationship")),
total=len(get_table_files(path_to_src_omop_dir, "concept_relationship")[0]) + len(get_table_files(path_to_src_omop_dir, "concept_relationship")[1]),
desc="Generating metadata from OMOP `concept_relationship` table"):
with load_file(path_to_decompressed_dir, concept_relationship_file) as f:
# This table has `concept_id_1`, `concept_id_2`, `relationship_id` columns
Expand Down Expand Up @@ -668,7 +674,14 @@ def main():
"drug_exposure": {
"concept_id_field": "drug_concept_id",
},
"visit": {"fallback_concept_id": DEFAULT_VISIT_CONCEPT_ID, "file_suffix": "occurrence"},
"visit": [
{"fallback_concept_id": DEFAULT_VISIT_CONCEPT_ID, "file_suffix": "occurrence"},
{
"concept_id_field": "discharged_to_concept_id",
"time_field_options": ["visit_end_datetime", "visit_end_date"],
"file_suffix": "occurrence",
},
],
"condition": {
"file_suffix": "occurrence",
},
Expand Down
71 changes: 71 additions & 0 deletions tests/test_omop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import datetime
import tempfile
from pathlib import Path

import polars as pl

from meds_etl.omop import DEFAULT_VISIT_CONCEPT_ID, write_event_data


def test_discharged_to_concept_id_correct():
"""
Test the tests.
"""
# Define the schema and a sample record for the OMOP visit table
visit_table_details = [
{"fallback_concept_id": DEFAULT_VISIT_CONCEPT_ID, "file_suffix": "occurrence"},
{
"concept_id_field": "discharged_to_concept_id",
"time_field_options": ["visit_end_datetime", "visit_end_date"],
"file_suffix": "occurrence",
},
]
visit_occurrence = pl.DataFrame(
{
"visit_occurrence_id": [1, 2, 3],
"person_id": [12345, 12345, 12345],
"visit_concept_id": [9201, 9202, 9202], # Example: 9201 for inpatient visit
"visit_start_date": [datetime.date(2024, 10, 25), datetime.date(2024, 10, 30), datetime.date(2024, 11, 1)],
"visit_end_date": [datetime.date(2024, 10, 28), datetime.date(2024, 10, 30), datetime.date(2024, 11, 1)],
"visit_type_concept_id": [44818517, 44818517, 44818517], # Example: 44818517 for primary care visit
"provider_id": [56789, 56789, 56789],
"care_site_id": [101, None, None],
"visit_source_value": ["Visit/IP", "Visit/OP", "Visit/OP"],
"visit_source_concept_id": [9201, 9202, 9202], # Use 0 if no mapping exists
"admitting_source_concept_id": [38004294, 0, 0], # Example: 38004294 for Emergency Room
"discharged_to_concept_id": [38004453, 0, None], # Example: 38004453 for Home
"preceding_visit_occurrence_id": [None, None, None],
}
)

with tempfile.TemporaryDirectory() as tmpdir:
write_event_data(
path_to_MEDS_unsorted_dir=tmpdir,
get_batch=lambda: visit_occurrence.lazy(),
table_name="visit",
all_table_details=visit_table_details,
concept_id_map={
9201: "Visit/IP",
9202: "Visit/OP",
38004453: "SNOMED/38004453",
319835: "Hypertension",
45763524: "Diabetes",
},
concept_name_map={
9201: "Inpatient Visit",
9202: "Outpatient Visit",
38004453: "Home",
319835: "Hypertension",
45763524: "Diabetes",
},
)
expected_meds = pl.read_parquet(list(Path(tmpdir).glob("*.parquet")))
assert len(expected_meds) == 4
actual_meds_dicts = expected_meds.sort("time").select("code", "time").to_dicts()
expected_meds_dicts = [
{"code": "Visit/IP", "time": datetime.datetime(2024, 10, 25, 23, 59, 59)},
{"code": "SNOMED/38004453", "time": datetime.datetime(2024, 10, 28, 23, 59, 59)},
{"code": "Visit/OP", "time": datetime.datetime(2024, 10, 30, 23, 59, 59)},
{"code": "Visit/OP", "time": datetime.datetime(2024, 11, 1, 23, 59, 59)},
]
assert actual_meds_dicts == expected_meds_dicts

0 comments on commit ba0eec1

Please sign in to comment.