diff --git a/Makefile b/Makefile index 25511e1..618f791 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ check: export: psql $(MIMIC) --set=OMOP_SCHEMA="$(OMOP_SCHEMA)" -f export/export_mimic_omop.sql &&\ cp import/import_mimic_omop.sql etl/Result/ &&\ + cp import/import_mimic_omop_duckdb.sql etl/Result/ &&\ cp omop/build-omop/postgresql/* etl/Result/ # tar -cf $(MIMIC_SCHEMA)-omop.tar etl/Result/ diff --git a/import/import_mimic_omop_duckdb.py b/import/import_mimic_omop_duckdb.py new file mode 100644 index 0000000..f033a2c --- /dev/null +++ b/import/import_mimic_omop_duckdb.py @@ -0,0 +1,54 @@ +import duckdb + +# Note: Presorting the data (order by) improves the performances for queries based on the columns. + +# a temp table used to sort the measurement tables based on concept_ids +# we cut the table in few parts so that 64GB RAM is enough +# ajust the number of temporary parquet files to your resources +with duckdb.connect("temp_mimic-omop.db") as con: + con.sql(""" CREATE TABLE IF NOT EXISTS measurement_tmp AS SELECT(*) FROM read_csv('measurement.csv.gz', delim=',', header=true, columns={'measurement_id': 'INTEGER', 'person_id': 'INTEGER', 'measurement_concept_id': 'INTEGER', 'measurement_date': 'DATE', 'measurement_datetime': 'TIMESTAMP', 'measurement_time': 'VARCHAR', 'measurement_type_concept_id': 'INTEGER', 'operator_concept_id': 'INTEGER', 'value_as_number': 'NUMERIC', 'value_as_concept_id': 'INTEGER', 'unit_concept_id': 'INTEGER', 'range_low': 'NUMERIC', 'range_high': 'NUMERIC', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'measurement_source_value': 'VARCHAR', 'measurement_source_concept_id': 'INTEGER', 'unit_source_value': 'VARCHAR', 'value_source_value': 'VARCHAR'})""") + con.sql("""copy (SELECT(*) FROM measurement_tmp where measurement_source_concept_id < 2001020788 order by measurement_source_concept_id, person_id) to 'temp_meas1.csv' (HEADER false, DELIMITER ',')""") + con.sql("""copy (SELECT(*) FROM measurement_tmp where measurement_source_concept_id between 2001020788 and 2001028088 order by measurement_source_concept_id, person_id) to 'temp_meas2.csv' (HEADER false, DELIMITER ',' )""") + con.sql("""copy (SELECT(*) FROM measurement_tmp where measurement_source_concept_id between 2001028089 and 2001030688 order by measurement_source_concept_id, person_id) to 'temp_meas3.csv' (HEADER false, DELIMITER ',')""") + con.sql("""copy (SELECT(*) FROM measurement_tmp where measurement_source_concept_id is null or measurement_source_concept_id > 2001030688 order by measurement_source_concept_id, person_id) to 'temp_meas4.csv' (HEADER false, DELIMITER ',')""") + +with duckdb.connect("mimic-omop.db") as con: + con.sql("""CREATE TABLE IF NOT EXISTS measurement AS SELECT(*) FROM read_csv('temp_meas*.csv', delim=',', header=false, columns={'measurement_id': 'INTEGER', 'person_id': 'INTEGER', 'measurement_concept_id': 'INTEGER', 'measurement_date': 'DATE', 'measurement_datetime': 'TIMESTAMP', 'measurement_time': 'VARCHAR', 'measurement_type_concept_id': 'INTEGER', 'operator_concept_id': 'INTEGER', 'value_as_number': 'NUMERIC', 'value_as_concept_id': 'INTEGER', 'unit_concept_id': 'INTEGER', 'range_low': 'NUMERIC', 'range_high': 'NUMERIC', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'measurement_source_value': 'VARCHAR', 'measurement_source_concept_id': 'INTEGER', 'unit_source_value': 'VARCHAR', 'value_source_value': 'VARCHAR'})""") + con.sql("""CREATE TABLE IF NOT EXISTS concept AS SELECT(*) FROM read_csv('concept.csv.gz', auto_detect=false, delim=',', header=true, columns={'concept_id': 'INTEGER', 'concept_name': 'VARCHAR', 'domain_id': 'VARCHAR', 'vocabulary_id': 'VARCHAR', 'concept_class_id': 'VARCHAR', 'standard_concept': 'VARCHAR', 'concept_code': 'VARCHAR', 'valid_start_date': 'DATE', 'valid_end_date': 'DATE', 'invalid_reason': 'VARCHAR'}) order by domain_id, vocabulary_id, concept_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS vocabulary AS SELECT(*) FROM read_csv('vocabulary.csv.gz', delim=',', header=true, columns={'vocabulary_id': 'VARCHAR', 'vocabulary_name': 'VARCHAR', 'vocabulary_reference': 'VARCHAR', 'vocabulary_version': 'VARCHAR', 'vocabulary_concept_id': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS domain AS SELECT(*) FROM read_csv('domain.csv.gz', delim=',', header=true, columns={'domain_id': 'VARCHAR', 'domain_name': 'VARCHAR', 'domain_concept_id': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS concept_class AS SELECT(*) FROM read_csv('concept_class.csv.gz', delim=',', header=true, columns={'concept_class_id': 'VARCHAR', 'concept_class_name': 'VARCHAR', 'concept_class_concept_id': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS concept_relationship AS SELECT(*) FROM read_csv('concept_relationship.csv.gz', delim=',', header=true, columns={'concept_id_1': 'INTEGER', 'concept_id_2': 'INTEGER', 'relationship_id': 'VARCHAR', 'valid_start_date': 'DATE', 'valid_end_date': 'DATE', 'invalid_reason': 'VARCHAR'}) order by relationship_id, concept_id_1, concept_id_2""") + con.sql(""" CREATE TABLE IF NOT EXISTS relationship AS SELECT(*) FROM read_csv('relationship.csv.gz', delim=',', header=true, columns={'relationship_id': 'VARCHAR', 'relationship_name': 'VARCHAR', 'is_hierarchical': 'VARCHAR', 'defines_ancestry': 'VARCHAR', 'reverse_relationship_id': 'VARCHAR', 'relationship_concept_id': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS concept_synonym AS SELECT(*) FROM read_csv('concept_synonym.csv.gz', delim=',', header=true, columns={'concept_id': 'INTEGER', 'concept_synonym_name': 'VARCHAR', 'language_concept_id': 'INTEGER'}) order by language_concept_id, concept_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS concept_ancestor AS SELECT(*) FROM read_csv('concept_ancestor.csv.gz', delim=',', header=true, columns={'ancestor_concept_id': 'INTEGER', 'descendant_concept_id': 'INTEGER', 'min_levels_of_separation': 'INTEGER', 'max_levels_of_separation': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS source_to_concept_map AS SELECT(*) FROM read_csv('source_to_concept_map.csv.gz', delim=',', header=true, columns={'source_code': 'VARCHAR', 'source_concept_id': 'INTEGER', 'source_vocabulary_id': 'VARCHAR', 'source_code_description': 'VARCHAR', 'target_concept_id': 'INTEGER', 'target_vocabulary_id': 'VARCHAR', 'valid_start_date': 'DATE', 'valid_end_date': 'DATE', 'invalid_reason': 'VARCHAR'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS cohort_definition AS SELECT(*) FROM read_csv('cohort_definition.csv.gz', delim=',', header=true, columns={'cohort_definition_id': 'INTEGER', 'cohort_definition_name': 'VARCHAR', 'cohort_definition_description': 'TEXT', 'definition_type_concept_id': 'INTEGER', 'cohort_definition_syntax': 'TEXT', 'subject_concept_id': 'INTEGER', 'cohort_initiation_date': 'DATE'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS attribute_definition AS SELECT(*) FROM read_csv('attribute_definition.csv.gz', delim=',', header=true, columns={'attribute_definition_id': 'INTEGER', 'attribute_name': 'VARCHAR', 'attribute_description': 'TEXT', 'attribute_type_concept_id': 'INTEGER', 'attribute_syntax': 'TEXT'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS cdm_source AS SELECT(*) FROM read_csv('cdm_source.csv.gz', delim=',', header=true, columns={'cdm_source_name': 'VARCHAR', 'cdm_source_abbreviation': 'VARCHAR', 'cdm_holder': 'VARCHAR', 'source_description': 'TEXT', 'source_documentation_reference': 'VARCHAR', 'cdm_etl_reference': 'VARCHAR', 'source_release_date': 'DATE', 'cdm_release_date': 'DATE', 'cdm_version': 'VARCHAR', 'vocabulary_version': 'VARCHAR'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS person AS SELECT(*) FROM read_csv('person.csv.gz', delim=',', header=true, columns={'person_id': 'INTEGER', 'gender_concept_id': 'INTEGER', 'year_of_birth': 'INTEGER', 'month_of_birth': 'INTEGER', 'day_of_birth': 'INTEGER', 'birth_datetime': 'TIMESTAMP', 'race_concept_id': 'INTEGER', 'ethnicity_concept_id': 'INTEGER', 'location_id': 'INTEGER', 'provider_id': 'INTEGER', 'care_site_id': 'INTEGER', 'person_source_value': 'VARCHAR', 'gender_source_value': 'VARCHAR', 'gender_source_concept_id': 'INTEGER', 'race_source_value': 'VARCHAR', 'race_source_concept_id': 'INTEGER', 'ethnicity_source_value': 'VARCHAR', 'ethnicity_source_concept_id': 'INTEGER'}) order by gender_source_concept_id, person_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS provider AS SELECT(*) FROM read_csv('provider.csv.gz', delim=',', header=true, columns={'provider_id': 'INTEGER', 'provider_name': 'VARCHAR', 'NPI': 'VARCHAR', 'DEA': 'VARCHAR', 'specialty_concept_id': 'INTEGER', 'care_site_id': 'INTEGER', 'year_of_birth': 'INTEGER', 'gender_concept_id': 'INTEGER', 'provider_source_value': 'VARCHAR', 'specialty_source_value': 'VARCHAR', 'specialty_source_concept_id': 'INTEGER', 'gender_source_value': 'VARCHAR', 'gender_source_concept_id': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS observation AS SELECT(*) FROM read_csv('observation.csv.gz', delim=',', header=true, columns={'observation_id': 'INTEGER', 'person_id': 'INTEGER', 'observation_concept_id': 'INTEGER', 'observation_date': 'DATE', 'observation_datetime': 'TIMESTAMP', 'observation_type_concept_id': 'INTEGER', 'value_as_number': 'NUMERIC', 'value_as_string': 'VARCHAR', 'value_as_concept_id': 'INTEGER', 'qualifier_concept_id': 'INTEGER', 'unit_concept_id': 'INTEGER', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'observation_source_value': 'VARCHAR', 'observation_source_concept_id': 'INTEGER', 'unit_source_value': 'VARCHAR', 'qualifier_source_value': 'VARCHAR'}) order by observation_source_concept_id, person_id, visit_occurrence_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS payer_plan_period AS SELECT(*) FROM read_csv('payer_plan_period.csv.gz', delim=',', header=true, columns={'payer_plan_period_id': 'INTEGER', 'person_id': 'INTEGER', 'payer_plan_period_start_date': 'DATE', 'payer_plan_period_end_date': 'DATE', 'payer_concept_id': 'INTEGER', 'payer_source_value': 'VARCHAR', 'payer_source_concept_id': 'INTEGER', 'plan_concept_id': 'INTEGER', 'plan_source_value': 'VARCHAR', 'plan_source_concept_id': 'INTEGER', 'sponsor_concept_id': 'INTEGER', 'sponsor_source_value': 'VARCHAR', 'sponsor_source_concept_id': 'INTEGER', 'family_source_value': 'VARCHAR', 'stop_reason_concept_id': 'INTEGER', 'stop_reason_source_value': 'VARCHAR', 'stop_reason_source_concept_id': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS condition_occurrence AS SELECT(*) FROM read_csv('condition_occurrence.csv.gz', delim=',', header=true, columns={'condition_occurrence_id': 'INTEGER', 'person_id': 'INTEGER', 'condition_concept_id': 'INTEGER', 'condition_start_date': 'DATE', 'condition_start_datetime': 'TIMESTAMP', 'condition_end_date': 'DATE', 'condition_end_datetime': 'TIMESTAMP', 'condition_type_concept_id': 'INTEGER', 'stop_reason': 'VARCHAR', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'condition_source_value': 'VARCHAR', 'condition_source_concept_id': 'INTEGER', 'condition_status_source_value': 'VARCHAR', 'condition_status_concept_id': 'INTEGER'}) order by condition_source_concept_id, person_id, visit_occurrence_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS fact_relationship AS SELECT(*) FROM read_csv('fact_relationship.csv.gz', delim=',', header=true, columns={'domain_concept_id_1': 'INTEGER', 'fact_id_1': 'INTEGER', 'domain_concept_id_2': 'INTEGER', 'fact_id_2': 'INTEGER', 'relationship_concept_id': 'INTEGER'}) order by relationship_concept_id, domain_concept_id_1, fact_id_1""") + con.sql(""" CREATE TABLE IF NOT EXISTS location AS SELECT(*) FROM read_csv('location.csv.gz', delim=',', header=true, columns={'location_id': 'INTEGER', 'address_1': 'VARCHAR', 'address_2': 'VARCHAR', 'city': 'VARCHAR', 'state': 'VARCHAR', 'zip': 'VARCHAR', 'county': 'VARCHAR', 'location_source_value': 'VARCHAR'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS condition_era AS SELECT(*) FROM read_csv('condition_era.csv.gz', delim=',', header=true, columns={'condition_era_id': 'INTEGER', 'person_id': 'INTEGER', 'condition_concept_id': 'INTEGER', 'condition_era_start_date': 'DATE', 'condition_era_end_date': 'DATE', 'condition_occurrence_count': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS cohort_attribute AS SELECT(*) FROM read_csv('cohort_attribute.csv.gz', delim=',', header=true, columns={'cohort_definition_id': 'INTEGER', 'subject_id': 'INTEGER', 'cohort_start_date': 'DATE', 'cohort_end_date': 'DATE', 'attribute_definition_id': 'INTEGER', 'value_as_number': 'NUMERIC', 'value_as_concept_id': 'INTEGER'}) order by attribute_definition_id, subject_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS care_site AS SELECT(*) FROM read_csv('care_site.csv.gz', delim=',', header=true, columns={'care_site_id': 'INTEGER', 'care_site_name': 'VARCHAR', 'place_of_service_concept_id': 'INTEGER', 'location_id': 'INTEGER', 'care_site_source_value': 'VARCHAR', 'place_of_service_source_value': 'VARCHAR'}) order by place_of_service_concept_id, care_site_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS procedure_occurrence AS SELECT(*) FROM read_csv('procedure_occurrence.csv.gz', delim=',', header=true, columns={'procedure_occurrence_id': 'INTEGER', 'person_id': 'INTEGER', 'procedure_concept_id': 'INTEGER', 'procedure_date': 'DATE', 'procedure_datetime': 'TIMESTAMP', 'procedure_type_concept_id': 'INTEGER', 'modifier_concept_id': 'INTEGER', 'quantity': 'INTEGER', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'procedure_source_value': 'VARCHAR', 'procedure_source_concept_id': 'INTEGER', 'modifier_source_value': 'VARCHAR'}) ORDER BY procedure_source_concept_id, person_id, visit_occurrence_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS drug_era AS SELECT(*) FROM read_csv('drug_era.csv.gz', delim=',', header=true, columns={'drug_era_id': 'INTEGER', 'person_id': 'INTEGER', 'drug_concept_id': 'INTEGER', 'drug_era_start_date': 'DATE', 'drug_era_end_date': 'DATE', 'drug_exposure_count': 'INTEGER', 'gap_days': 'INTEGER'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS device_exposure AS SELECT(*) FROM read_csv('device_exposure.csv.gz', delim=',', header=true, columns={'device_exposure_id': 'INTEGER', 'person_id': 'INTEGER', 'device_concept_id': 'INTEGER', 'device_exposure_start_date': 'DATE', 'device_exposure_start_datetime': 'TIMESTAMP', 'device_exposure_end_date': 'DATE', 'device_exposure_end_datetime': 'TIMESTAMP', 'device_type_concept_id': 'INTEGER', 'unique_device_id': 'VARCHAR', 'quantity': 'INTEGER', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'device_source_value': 'VARCHAR', 'device_source_concept_id': 'INTEGER'}) order by device_source_concept_id, person_id, visit_occurrence_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS note AS SELECT(*) FROM read_csv('note.csv.gz', delim=',', header=true, columns={'note_id': 'INTEGER', 'person_id': 'INTEGER', 'note_date': 'DATE', 'note_datetime': 'TIMESTAMP', 'note_type_concept_id': 'INTEGER', 'note_class_concept_id': 'INTEGER', 'note_title': 'VARCHAR', 'note_text': 'TEXT', 'encoding_concept_id': 'INTEGER', 'language_concept_id': 'INTEGER', 'provider_id': 'INTEGER', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'INTEGER', 'note_source_value': 'VARCHAR'}) order by note_type_concept_id, person_id, visit_occurrence_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS death AS SELECT(*) FROM read_csv('death.csv.gz', delim=',', header=true, columns={'person_id': 'INTEGER', 'death_date': 'DATE', 'death_datetime': 'TIMESTAMP', 'death_type_concept_id': 'INTEGER', 'cause_concept_id': 'INTEGER', 'cause_source_value': 'VARCHAR', 'cause_source_concept_id': 'INTEGER'}) order by death_type_concept_id, person_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS cost AS SELECT(*) FROM read_csv('cost.csv.gz', delim=',', header=true, columns={'cost_id': 'INTEGER', 'cost_event_id': 'INTEGER', 'cost_domain_id': 'VARCHAR', 'cost_type_concept_id': 'INTEGER', 'currency_concept_id': 'INTEGER', 'total_charge': 'NUMERIC', 'total_cost': 'NUMERIC', 'total_paid': 'NUMERIC', 'paid_by_payer': 'NUMERIC', 'paid_by_patient': 'NUMERIC', 'paid_patient_copay': 'NUMERIC', 'paid_patient_coinsurance': 'NUMERIC', 'paid_patient_deductible': 'NUMERIC', 'paid_by_primary': 'NUMERIC', 'paid_ingredient_cost': 'NUMERIC', 'paid_dispensing_fee': 'NUMERIC', 'payer_plan_period_id': 'INTEGER', 'amount_allowed': 'NUMERIC', 'revenue_code_concept_id': 'INTEGER', 'revenue_code_source_value': 'VARCHAR', 'drg_concept_id': 'INTEGER', 'drg_source_value': 'VARCHAR'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS specimen AS SELECT(*) FROM read_csv('specimen.csv.gz', delim=',', header=true, columns={'specimen_id': 'INTEGER', 'person_id': 'INTEGER', 'specimen_concept_id': 'INTEGER', 'specimen_type_concept_id': 'INTEGER', 'specimen_date': 'DATE', 'specimen_datetime': 'TIMESTAMP', 'quantity': 'NUMERIC', 'unit_concept_id': 'INTEGER', 'anatomic_site_concept_id': 'INTEGER', 'disease_status_concept_id': 'INTEGER', 'specimen_source_id': 'VARCHAR', 'specimen_source_value': 'VARCHAR', 'unit_source_value': 'VARCHAR', 'anatomic_site_source_value': 'VARCHAR', 'disease_status_source_value': 'VARCHAR'}) order by specimen_concept_id, person_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS cohort AS SELECT(*) FROM read_csv('cohort.csv.gz', delim=',', header=true, columns={'cohort_definition_id': 'INTEGER', 'subject_id': 'INTEGER', 'cohort_start_date': 'DATE', 'cohort_end_date': 'DATE'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS dose_era AS SELECT(*) FROM read_csv('dose_era.csv.gz', delim=',', header=true, columns={'dose_era_id': 'INTEGER', 'person_id': 'INTEGER', 'drug_concept_id': 'INTEGER', 'unit_concept_id': 'INTEGER', 'dose_value': 'NUMERIC', 'dose_era_start_date': 'DATE', 'dose_era_end_date': 'DATE', 'temporal_unit_concept_id': 'INTEGER', 'temporal_value': 'VARCHAR'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS drug_strength AS SELECT(*) FROM read_csv('drug_strength.csv.gz', delim=',', header=true, columns={'drug_concept_id': 'INTEGER', 'ingredient_concept_id': 'INTEGER', 'amount_value': 'DOUBLE', 'amount_unit_concept_id': 'INTEGER', 'numerator_value': 'DOUBLE', 'numerator_unit_concept_id': 'INTEGER', 'denominator_value': 'DOUBLE', 'denominator_unit_concept_id': 'INTEGER', 'box_size': 'INTEGER', 'valid_start_date': 'DATE', 'valid_end_date': 'DATE', 'invalid_reason': 'VARCHAR'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS visit_occurrence AS SELECT(*) FROM read_csv('visit_occurrence.csv.gz', delim=',', header=true, columns={'visit_occurrence_id': 'INTEGER', 'person_id': 'INTEGER', 'visit_concept_id': 'INTEGER', 'visit_start_date': 'DATE', 'visit_start_datetime': 'TIMESTAMP', 'visit_end_date': 'DATE', 'visit_end_datetime': 'TIMESTAMP', 'visit_type_concept_id': 'INTEGER', 'provider_id': 'VARCHAR', 'care_site_id': 'INTEGER', 'visit_source_value': 'VARCHAR', 'visit_source_concept_id': 'INTEGER', 'admitting_source_concept_id': 'INTEGER', 'admitting_source_value': 'VARCHAR', 'discharge_to_concept_id': 'INTEGER', 'discharge_to_source_value': 'VARCHAR', 'preceding_visit_occurrence_id': 'INTEGER', 'admitting_concept_id': 'INTEGER', 'discharge_to_source_concept_id': 'INTEGER'}) order by visit_concept_id, person_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS observation_period AS SELECT(*) FROM read_csv('observation_period.csv.gz', delim=',', header=true, columns={'observation_period_id': 'INTEGER', 'person_id': 'INTEGER', 'observation_period_start_date': 'DATE', 'observation_period_end_date': 'DATE', 'period_type_concept_id': 'INTEGER', 'observation_period_start_datetime': 'TIMESTAMP', 'observation_period_end_datetime': 'TIMESTAMP'})""") + con.sql(""" CREATE TABLE IF NOT EXISTS visit_detail AS SELECT(*) FROM read_csv('visit_detail.csv.gz', delim=',', header=true, columns={'visit_detail_id': 'INTEGER', 'person_id': 'INTEGER', 'visit_detail_concept_id': 'INTEGER', 'visit_start_date': 'DATE', 'visit_start_datetime': 'TIMESTAMP', 'visit_end_date': 'DATE', 'visit_end_datetime': 'TIMESTAMP', 'visit_type_concept_id': 'INTEGER', 'provider_id': 'VARCHAR', 'care_site_id': 'INTEGER', 'admitting_source_concept_id': 'VARCHAR', 'discharge_to_concept_id': 'INTEGER', 'preceding_visit_detail_id': 'INTEGER', 'visit_source_value': 'VARCHAR', 'visit_source_concept_id': 'VARCHAR', 'admitting_source_value': 'VARCHAR', 'discharge_to_source_value': 'VARCHAR', 'visit_detail_parent_id': 'VARCHAR', 'visit_occurrence_id': 'INTEGER', 'visit_detail_source_value': 'VARCHAR', 'visit_detail_source_concept_id': 'VARCHAR', 'admitting_concept_id': 'INTEGER', 'discharge_to_source_concept_id': 'VARCHAR'}) order by visit_detail_concept_id, person_id, visit_occurrence_id""") + con.sql(""" CREATE TABLE IF NOT EXISTS drug_exposure AS SELECT(*) FROM read_csv('drug_exposure.csv.gz', delim=',', header=true, columns={'drug_exposure_id': 'INTEGER', 'person_id': 'INTEGER', 'drug_concept_id': 'INTEGER', 'drug_exposure_start_date': 'DATE', 'drug_exposure_start_datetime': 'TIMESTAMP', 'drug_exposure_end_date': 'DATE', 'drug_exposure_end_datetime': 'TIMESTAMP', 'verbatim_end_date': 'VARCHAR', 'drug_type_concept_id': 'INTEGER', 'stop_reason': 'VARCHAR', 'refills': 'VARCHAR', 'quantity': 'DOUBLE', 'days_supply': 'VARCHAR', 'sig': 'VARCHAR', 'route_concept_id': 'INTEGER', 'lot_number': 'VARCHAR', 'provider_id': 'VARCHAR', 'visit_occurrence_id': 'INTEGER', 'visit_detail_id': 'VARCHAR', 'drug_source_value': 'VARCHAR', 'drug_source_concept_id': 'VARCHAR', 'route_source_value': 'VARCHAR', 'dose_unit_source_value': 'VARCHAR', 'quantity_source_value': 'VARCHAR'})""") + # con.sql(""" CREATE TABLE IF NOT EXISTS note_nlp AS SELECT(*) FROM read_csv('note_nlp.csv.gz', delim=',', header=true, columns={'note_nlp_id': 'INTEGER', 'note_id': 'INTEGER', 'section_concept_id': 'INTEGER', 'snippet': 'VARCHAR', '"offset"': 'VARCHAR', 'lexical_variant': 'VARCHAR', 'note_nlp_concept_id': 'INTEGER', 'note_nlp_source_concept_id': 'INTEGER', 'nlp_system': 'VARCHAR', 'nlp_date': 'DATE', 'nlp_datetime': 'TIMESTAMP', 'term_exists': 'VARCHAR', 'term_temporal': 'VARCHAR', 'term_modifiers': 'VARCHAR'})""") +