From a7ff3dbb52ce58f685bf7605a1c7e313af8c3f23 Mon Sep 17 00:00:00 2001 From: Ethan Steinberg Date: Wed, 10 Apr 2024 10:17:25 -0700 Subject: [PATCH] Add support for no metadata --- native/perform_etl.cc | 108 ++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 42 deletions(-) diff --git a/native/perform_etl.cc b/native/perform_etl.cc index 7be5396..eda0bda 100644 --- a/native/perform_etl.cc +++ b/native/perform_etl.cc @@ -969,7 +969,12 @@ void join_and_write_single( } } - auto metadata_type = std::make_shared(metadata_fields); + std::shared_ptr metadata_type; + if (metadata_columns.size() != 0) { + metadata_type = std::make_shared(metadata_fields); + } else { + metadata_type = std::make_shared(); + } auto timestamp_type = std::make_shared(arrow::TimeUnit::MICRO); @@ -984,6 +989,7 @@ void join_and_write_single( arrow::field("metadata", metadata_type), }; + auto measurement_type = std::make_shared(measurement_type_fields); @@ -1074,29 +1080,41 @@ void join_and_write_single( metadata_columns.size()); std::vector> primitive_metadata_builders(metadata_columns.size()); - std::vector> metadata_builders( - metadata_columns.size()); - for (size_t i = 0; i < metadata_columns.size(); i++) { - if (is_text_metadata[i]) { - auto builder = std::make_shared(pool); - text_metadata_builders[i] = builder; - metadata_builders[i] = builder; - } else { - auto builder = std::make_shared( - std::make_shared( - metadata_columns[i].second->byte_width())); - primitive_metadata_builders[i] = builder; - metadata_builders[i] = builder; + + std::shared_ptr metadata_builder; + std::shared_ptr null_metadata_builder; + std::shared_ptr metadata_builder_holder; + + if (metadata_columns.size() != 0) { + std::vector> metadata_builders( + metadata_columns.size()); + for (size_t i = 0; i < metadata_columns.size(); i++) { + if (is_text_metadata[i]) { + auto builder = std::make_shared(pool); + text_metadata_builders[i] = builder; + metadata_builders[i] = builder; + } else { + auto builder = std::make_shared( + std::make_shared( + metadata_columns[i].second->byte_width())); + primitive_metadata_builders[i] = builder; + metadata_builders[i] = builder; + } } - } - auto metadata_builder = std::make_shared( - metadata_type, pool, metadata_builders); + metadata_builder = std::make_shared( + metadata_type, pool, metadata_builders); + metadata_builder_holder = metadata_builder; + } else { + null_metadata_builder = std::make_shared(pool); + metadata_builder_holder = null_metadata_builder; + } std::vector> - measurement_builder_fields{code_builder, text_value_builder, - numeric_value_builder, - datetime_value_builder, metadata_builder}; + measurement_builder_fields{ + code_builder, text_value_builder, numeric_value_builder, + datetime_value_builder, metadata_builder_holder}; + auto measurement_builder = std::make_shared( measurement_type, pool, measurement_builder_fields); @@ -1125,6 +1143,7 @@ void join_and_write_single( std::shared_ptr table = arrow::Table::Make(schema, columns); + PARQUET_THROW_NOT_OK(writer->WriteTable(*table)); amount_written = 0; @@ -1211,29 +1230,34 @@ void join_and_write_single( PARQUET_THROW_NOT_OK(text_value_builder->AppendNull()); } - PARQUET_THROW_NOT_OK(metadata_builder->Append()); - for (size_t j = 0; j < metadata_columns.size(); j++) { - if (non_null[3 + j]) { - size_t size = *reinterpret_cast( - patient_record.substr(offset).data()); - offset += sizeof(size); - auto entry = patient_record.substr(offset, size); - - if (is_text_metadata[j]) { - PARQUET_THROW_NOT_OK( - text_metadata_builders[j]->Append(entry)); - } else { - PARQUET_THROW_NOT_OK( - primitive_metadata_builders[j]->Append(entry)); - } - offset += size; - } else { - if (is_text_metadata[j]) { - PARQUET_THROW_NOT_OK( - text_metadata_builders[j]->AppendNull()); + if (metadata_columns.size() == 0) { + PARQUET_THROW_NOT_OK(null_metadata_builder->AppendNull()); + } else { + PARQUET_THROW_NOT_OK(metadata_builder->Append()); + + for (size_t j = 0; j < metadata_columns.size(); j++) { + if (non_null[3 + j]) { + size_t size = *reinterpret_cast( + patient_record.substr(offset).data()); + offset += sizeof(size); + auto entry = patient_record.substr(offset, size); + + if (is_text_metadata[j]) { + PARQUET_THROW_NOT_OK( + text_metadata_builders[j]->Append(entry)); + } else { + PARQUET_THROW_NOT_OK( + primitive_metadata_builders[j]->Append(entry)); + } + offset += size; } else { - PARQUET_THROW_NOT_OK( - primitive_metadata_builders[j]->AppendNull()); + if (is_text_metadata[j]) { + PARQUET_THROW_NOT_OK( + text_metadata_builders[j]->AppendNull()); + } else { + PARQUET_THROW_NOT_OK( + primitive_metadata_builders[j]->AppendNull()); + } } } }