From 54305a8b8a9836b8ad48bfc9585f71c9383e7c30 Mon Sep 17 00:00:00 2001 From: Ethan Steinberg Date: Wed, 22 May 2024 16:06:32 +0000 Subject: [PATCH] Make more generic --- native/BUILD | 5 +- native/perform_etl.cc | 366 ++++++++++++++---------------------------- 2 files changed, 124 insertions(+), 247 deletions(-) diff --git a/native/BUILD b/native/BUILD index 95e1195..55bc347 100644 --- a/native/BUILD +++ b/native/BUILD @@ -54,12 +54,12 @@ cmake( "ARROW_WITH_ZSTD": "ON", "EP_COMMON_CMAKE_ARGS": "-DWITH_OPENSSL=OFF", "ARROW_DEPENDENCY_SOURCE": "BUNDLED", + "BUILD_WARNING_LEVEL": "PRODUCTION", }, tags = ["requires-network"], generate_args = ["-DCMAKE_RANLIB=/usr/bin/ranlib"], working_directory="cpp", lib_source = "@arrow//:all", - out_lib_dir = "lib64", out_static_libs = ["libparquet.a", "libarrow.a", "libarrow_bundled_dependencies.a"], linkopts = ["-pthread"], ) @@ -72,6 +72,5 @@ cmake( }, working_directory="build/cmake", lib_source = "@zstd//:all", - out_lib_dir = "lib64", out_static_libs = ["libzstd.a"], -) \ No newline at end of file +) diff --git a/native/perform_etl.cc b/native/perform_etl.cc index eda0bda..e9b6880 100644 --- a/native/perform_etl.cc +++ b/native/perform_etl.cc @@ -89,9 +89,10 @@ std::vector> get_fields_for_file( return fields; } -const std::vector known_fields = { - "patient_id", "time", "code", - "numeric_value", "datetime_value", "text_value"}; +const std::vector known_fields = {"patient_id", "time"}; + +const std::vector main_fields = {"code", "numeric_value", + "datetime_value", "text_value"}; std::set>> get_metadata_fields(const std::vector& files) { @@ -214,11 +215,6 @@ void shard_reader( int patient_id_index = -1; int time_index = -1; - int code_index = -1; - - int numeric_value_index = -1; - int datetime_value_index = -1; - int text_value_index = -1; std::vector metadata_indices(metadata_columns.size(), -1); @@ -253,43 +249,6 @@ void shard_reader( schema_field.field->ToString()); } time_index = schema_field.column_index; - } else if (schema_field.field->name() == "code") { - if (!schema_field.field->type()->Equals( - arrow::LargeStringType())) { - throw std::runtime_error( - "The C++ MEDS-Flat ETL requires large_string codes " - "but found " + - schema_field.field->ToString()); - } - - code_index = schema_field.column_index; - } else if (schema_field.field->name() == "numeric_value") { - if (!schema_field.field->type()->Equals( - arrow::FloatType())) { - throw std::runtime_error( - "C++ MEDS-Flat requires Float numeric_value but " - "found " + - schema_field.field->ToString()); - } - numeric_value_index = schema_field.column_index; - } else if (schema_field.field->name() == "datetime_value") { - if (!schema_field.field->type()->Equals( - arrow::TimestampType(arrow::TimeUnit::MICRO))) { - throw std::runtime_error( - "C++ MEDS-Flat requires microsecond timestamp " - "datetime_value but found " + - schema_field.field->ToString()); - } - datetime_value_index = schema_field.column_index; - } else if (schema_field.field->name() == "text_value") { - if (!schema_field.field->type()->Equals( - arrow::LargeStringType())) { - throw std::runtime_error( - "C++ MEDS-Flat requires Float32 numeric_value but " - "found " + - schema_field.field->ToString()); - } - text_value_index = schema_field.column_index; } else { // Must be metadata auto iter = std::find_if( @@ -313,19 +272,6 @@ void shard_reader( int offset = (iter - std::begin(metadata_columns)); - if (iter->second->Equals(arrow::LargeStringType())) { - is_text_metadata[offset] = true; - } else { - is_text_metadata[offset] = false; - - if (iter->second->byte_width() == -1) { - throw std::runtime_error( - "Found non text metadata with unknown byte " - "width? " + - iter->second->ToString()); - } - } - metadata_indices[offset] = schema_field.column_index; } } @@ -339,10 +285,6 @@ void shard_reader( throw std::runtime_error("Could not find time column index"); } - if (code_index == -1) { - throw std::runtime_error("Could not find code column index"); - } - std::shared_ptr<::arrow::RecordBatchReader> rb_reader; PARQUET_THROW_NOT_OK( arrow_reader->GetRecordBatchReader(&rb_reader)); @@ -370,58 +312,39 @@ void shard_reader( throw std::runtime_error("Could not cast time array"); } - auto code_array = - std::dynamic_pointer_cast( - record_batch->column(code_index)); - if (!code_array) { - throw std::runtime_error("Could not cast code array"); - } - - auto numeric_value_array = std::dynamic_pointer_cast< - arrow::NumericArray>( - record_batch->column(numeric_value_index)); - if (!numeric_value_array) { - throw std::runtime_error( - "Could not cast numeric_value array"); - } - - auto datetime_value_array = std::dynamic_pointer_cast< - arrow::NumericArray>( - record_batch->column(datetime_value_index)); - if (!datetime_value_array) { - throw std::runtime_error( - "Could not cast datetime_value array"); - } - - auto text_value_array = - std::dynamic_pointer_cast( - record_batch->column(text_value_index)); - if (!text_value_array) { - throw std::runtime_error("Could not cast text_value array"); - } - + std::vector> + text_metadata_arrays(record_batch->num_columns()); std::vector> - text_metadata_arrays(metadata_columns.size()); + large_text_metadata_arrays(record_batch->num_columns()); std::vector> - primitive_metadata_arrays(metadata_columns.size()); + primitive_metadata_arrays(record_batch->num_columns()); for (size_t i = 0; i < metadata_columns.size(); i++) { if (metadata_indices[i] == -1) { continue; } - if (is_text_metadata[i]) { + { auto metadata_array = std::dynamic_pointer_cast( record_batch->column(metadata_indices[i])); - if (!metadata_array) { - throw std::runtime_error( - "Could not cast metadata array to text" + - metadata_columns[i].first + " " + - metadata_columns[i].second->ToString()); + if (metadata_array) { + large_text_metadata_arrays[i] = metadata_array; + continue; + } + } + + { + auto metadata_array = + std::dynamic_pointer_cast( + record_batch->column(metadata_indices[i])); + if (metadata_array) { + text_metadata_arrays[i] = metadata_array; + continue; } - text_metadata_arrays[i] = metadata_array; - } else { + } + + { std::shared_ptr fixed_size_array; PARQUET_ASSIGN_OR_THROW( fixed_size_array, @@ -432,17 +355,19 @@ void shard_reader( auto metadata_array = std::dynamic_pointer_cast< arrow::FixedSizeBinaryArray>(fixed_size_array); - if (!metadata_array) { - throw std::runtime_error( - "Could not cast metadata array to fixed size " + - metadata_columns[i].first + " " + - metadata_columns[i].second->ToString()); + if (metadata_array) { + primitive_metadata_arrays[i] = metadata_array; + continue; } - primitive_metadata_arrays[i] = metadata_array; } + + throw std::runtime_error( + "Could not cast metadata field " + + metadata_columns[i].first + " " + + metadata_columns[i].second->ToString()); } - for (int64_t i = 0; i < text_value_array->length(); i++) { + for (int64_t i = 0; i < patient_id_array->length(); i++) { if (!patient_id_array->IsValid(i)) { throw std::runtime_error( "patient_id incorrectly has null value " + source); @@ -451,10 +376,6 @@ void shard_reader( throw std::runtime_error( "time incorrectly has null value " + source); } - if (!code_array->IsValid(i)) { - throw std::runtime_error( - "code incorrectly has null value " + source); - } std::vector data; @@ -467,37 +388,23 @@ void shard_reader( add_literal_to_vector(data, patient_id); add_literal_to_vector(data, time); - add_string_to_vector(data, code_array->Value(i)); - - if (numeric_value_array->IsValid(i)) { - non_null[0] = true; - add_literal_to_vector(data, - numeric_value_array->Value(i)); - } - - if (datetime_value_array->IsValid(i)) { - non_null[1] = true; - add_literal_to_vector(data, - datetime_value_array->Value(i)); - } - - if (text_value_array->IsValid(i)) { - non_null[2] = true; - add_string_to_vector(data, text_value_array->Value(i)); - } - for (size_t j = 0; j < metadata_columns.size(); j++) { - if (is_text_metadata[j]) { - if (text_metadata_arrays[j] && - text_metadata_arrays[j]->IsValid(i)) { - non_null[3 + j] = true; + if (text_metadata_arrays[j] != nullptr) { + if (text_metadata_arrays[j]->IsValid(i)) { + non_null[j] = true; add_string_to_vector( data, text_metadata_arrays[j]->Value(i)); } - } else { - if (primitive_metadata_arrays[j] && - primitive_metadata_arrays[j]->IsValid(i)) { - non_null[3 + j] = true; + } else if (large_text_metadata_arrays[j] != nullptr) { + if (large_text_metadata_arrays[j]->IsValid(i)) { + non_null[j] = true; + add_string_to_vector( + data, + large_text_metadata_arrays[j]->Value(i)); + } + } else if (primitive_metadata_arrays[j] != nullptr) { + if (primitive_metadata_arrays[j]->IsValid(i)) { + non_null[j] = true; add_string_to_vector( data, primitive_metadata_arrays[j]->GetView(i)); @@ -954,44 +861,51 @@ void join_and_write_single( const std::vector>>& metadata_columns) { arrow::FieldVector metadata_fields; + arrow::FieldVector measurement_fields; std::bitset::digits> is_text_metadata; + for (size_t i = 0; i < metadata_columns.size(); i++) { + arrow::FieldVector* fields = nullptr; + const auto& metadata_column = metadata_columns[i]; + + if (std::find(std::begin(main_fields), std::end(main_fields), + metadata_column.first) != std::end(main_fields)) { + fields = &measurement_fields; + } else { + fields = &metadata_fields; + } + if (metadata_column.second->Equals(arrow::LargeStringType())) { is_text_metadata[i] = true; - metadata_fields.push_back(arrow::field( + fields->push_back(arrow::field( + metadata_column.first, std::make_shared())); + } else if (metadata_column.second->Equals(arrow::StringType())) { + is_text_metadata[i] = true; + fields->push_back(arrow::field( metadata_column.first, std::make_shared())); } else { is_text_metadata[i] = false; - metadata_fields.push_back( + fields->push_back( arrow::field(metadata_column.first, metadata_column.second)); } } std::shared_ptr metadata_type; - if (metadata_columns.size() != 0) { + if (metadata_fields.size() != 0) { metadata_type = std::make_shared(metadata_fields); } else { metadata_type = std::make_shared(); } + measurement_fields.push_back(arrow::field("metadata", metadata_type)); + auto timestamp_type = std::make_shared(arrow::TimeUnit::MICRO); - auto measurement_type_fields = { - arrow::field("code", std::make_shared()), - - arrow::field("text_value", std::make_shared()), - arrow::field("numeric_value", std::make_shared()), - arrow::field("datetime_value", std::make_shared( - arrow::TimeUnit::MICRO)), - - arrow::field("metadata", metadata_type), - }; - auto measurement_type = - std::make_shared(measurement_type_fields); + std::make_shared(measurement_fields); auto event_type_fields = { arrow::field("time", std::make_shared( @@ -1069,13 +983,6 @@ void join_and_write_single( auto static_measurements_builder = std::make_shared(pool); - auto code_builder = std::make_shared(pool); - - auto text_value_builder = std::make_shared(pool); - auto numeric_value_builder = std::make_shared(pool); - auto datetime_value_builder = - std::make_shared(timestamp_type, pool); - std::vector> text_metadata_builders( metadata_columns.size()); std::vector> @@ -1085,23 +992,32 @@ void join_and_write_single( std::shared_ptr null_metadata_builder; std::shared_ptr metadata_builder_holder; - if (metadata_columns.size() != 0) { - std::vector> metadata_builders( - metadata_columns.size()); - for (size_t i = 0; i < metadata_columns.size(); i++) { - if (is_text_metadata[i]) { - auto builder = std::make_shared(pool); - text_metadata_builders[i] = builder; - metadata_builders[i] = builder; - } else { - auto builder = std::make_shared( - std::make_shared( - metadata_columns[i].second->byte_width())); - primitive_metadata_builders[i] = builder; - metadata_builders[i] = builder; - } + std::vector> metadata_builders; + std::vector> measurement_builders; + + for (size_t i = 0; i < metadata_columns.size(); i++) { + std::vector>* builders; + if (std::find(std::begin(main_fields), std::end(main_fields), + metadata_columns[i].first) != std::end(main_fields)) { + builders = &measurement_builders; + } else { + builders = &metadata_builders; + } + + if (is_text_metadata[i]) { + auto builder = std::make_shared(pool); + text_metadata_builders[i] = builder; + builders->push_back(builder); + } else { + auto builder = std::make_shared( + std::make_shared( + metadata_columns[i].second->byte_width())); + primitive_metadata_builders[i] = builder; + builders->push_back(builder); } + } + if (metadata_builders.size() > 0) { metadata_builder = std::make_shared( metadata_type, pool, metadata_builders); metadata_builder_holder = metadata_builder; @@ -1110,13 +1026,10 @@ void join_and_write_single( metadata_builder_holder = null_metadata_builder; } - std::vector> - measurement_builder_fields{ - code_builder, text_value_builder, numeric_value_builder, - datetime_value_builder, metadata_builder_holder}; + measurement_builders.push_back(metadata_builder_holder); auto measurement_builder = std::make_shared( - measurement_type, pool, measurement_builder_fields); + measurement_type, pool, measurement_builders); auto time_builder = std::make_shared(timestamp_type, pool); @@ -1188,76 +1101,41 @@ void join_and_write_single( } PARQUET_THROW_NOT_OK(measurement_builder->Append()); + + if (metadata_builders.size() != 0) { + PARQUET_THROW_NOT_OK(metadata_builder->Append()); + } else { + PARQUET_THROW_NOT_OK(null_metadata_builder->AppendNull()); + } + std::bitset::digits> non_null( *reinterpret_cast( patient_record.data() + patient_record.size() - sizeof(unsigned long long))); size_t offset = sizeof(int64_t) * 2; - size_t size = *reinterpret_cast( - patient_record.substr(offset).data()); - offset += sizeof(size); - PARQUET_THROW_NOT_OK( - code_builder->Append(patient_record.substr(offset, size))); - offset += size; - - if (non_null[0]) { - PARQUET_THROW_NOT_OK( - numeric_value_builder->Append(*reinterpret_cast( - patient_record.substr(offset).data()))); - offset += sizeof(float); - } else { - PARQUET_THROW_NOT_OK(numeric_value_builder->AppendNull()); - } - - if (non_null[1]) { - PARQUET_THROW_NOT_OK(datetime_value_builder->Append( - *reinterpret_cast( - patient_record.substr(offset).data()))); - offset += sizeof(int64_t); - } else { - PARQUET_THROW_NOT_OK(datetime_value_builder->AppendNull()); - } - - if (non_null[2]) { - size_t size = *reinterpret_cast( - patient_record.substr(offset).data()); - offset += sizeof(size); - PARQUET_THROW_NOT_OK(text_value_builder->Append( - patient_record.substr(offset, size))); - offset += size; - } else { - PARQUET_THROW_NOT_OK(text_value_builder->AppendNull()); - } - - if (metadata_columns.size() == 0) { - PARQUET_THROW_NOT_OK(null_metadata_builder->AppendNull()); - } else { - PARQUET_THROW_NOT_OK(metadata_builder->Append()); + for (size_t j = 0; j < metadata_columns.size(); j++) { + if (non_null[j]) { + size_t size = *reinterpret_cast( + patient_record.substr(offset).data()); + offset += sizeof(size); + auto entry = patient_record.substr(offset, size); - for (size_t j = 0; j < metadata_columns.size(); j++) { - if (non_null[3 + j]) { - size_t size = *reinterpret_cast( - patient_record.substr(offset).data()); - offset += sizeof(size); - auto entry = patient_record.substr(offset, size); - - if (is_text_metadata[j]) { - PARQUET_THROW_NOT_OK( - text_metadata_builders[j]->Append(entry)); - } else { - PARQUET_THROW_NOT_OK( - primitive_metadata_builders[j]->Append(entry)); - } - offset += size; + if (is_text_metadata[j]) { + PARQUET_THROW_NOT_OK( + text_metadata_builders[j]->Append(entry)); } else { - if (is_text_metadata[j]) { - PARQUET_THROW_NOT_OK( - text_metadata_builders[j]->AppendNull()); - } else { - PARQUET_THROW_NOT_OK( - primitive_metadata_builders[j]->AppendNull()); - } + PARQUET_THROW_NOT_OK( + primitive_metadata_builders[j]->Append(entry)); + } + offset += size; + } else { + if (is_text_metadata[j]) { + PARQUET_THROW_NOT_OK( + text_metadata_builders[j]->AppendNull()); + } else { + PARQUET_THROW_NOT_OK( + primitive_metadata_builders[j]->AppendNull()); } } }