diff --git a/native/perform_etl.cc b/native/perform_etl.cc index eda0bda..f3b4c48 100644 --- a/native/perform_etl.cc +++ b/native/perform_etl.cc @@ -94,7 +94,7 @@ const std::vector known_fields = { "numeric_value", "datetime_value", "text_value"}; std::set>> -get_metadata_fields(const std::vector& files) { +get_properties_fields(const std::vector& files) { arrow::MemoryPool* pool = arrow::default_memory_pool(); std::set>> result; @@ -119,8 +119,8 @@ get_metadata_fields(const std::vector& files) { } std::set>> -get_metadata_fields_multithreaded(const std::vector& files, - size_t num_threads) { +get_properties_fields_multithreaded(const std::vector& files, + size_t num_threads) { std::vector threads; std::vector< std::set>>> @@ -135,7 +135,7 @@ get_metadata_fields_multithreaded(const std::vector& files, j < std::min(files.size(), files_per_thread * (i + 1)); j++) { fraction.push_back(files[j]); } - results[i] = get_metadata_fields(fraction); + results[i] = get_properties_fields(fraction); }); } @@ -175,7 +175,7 @@ void shard_reader( all_write_queues, moodycamel::LightweightSemaphore& all_write_semaphore, const std::vector>>& - metadata_columns) { + properties_columns) { arrow::MemoryPool* pool = arrow::default_memory_pool(); std::vector ptoks; @@ -220,10 +220,10 @@ void shard_reader( int datetime_value_index = -1; int text_value_index = -1; - std::vector metadata_indices(metadata_columns.size(), -1); + std::vector properties_indices(properties_columns.size(), -1); std::bitset::digits> - is_text_metadata; + is_text_properties; const auto& manifest = arrow_reader->manifest(); for (const auto& schema_field : manifest.schema_fields) { @@ -291,42 +291,43 @@ void shard_reader( } text_value_index = schema_field.column_index; } else { - // Must be metadata + // Must be properties auto iter = std::find_if( - std::begin(metadata_columns), - std::end(metadata_columns), [&](const auto& entry) { + std::begin(properties_columns), + std::end(properties_columns), [&](const auto& entry) { return entry.first == schema_field.field->name(); }); - if (iter == std::end(metadata_columns)) { + if (iter == std::end(properties_columns)) { throw std::runtime_error( - "Had an extra column in the metadata that " + "Had an extra column in the properties that " "shouldn't exist? " + schema_field.field->ToString()); } if (!schema_field.field->type()->Equals(iter->second)) { throw std::runtime_error( - "C++ MEDS-Flat requires large_string metadata but " + "C++ MEDS-Flat requires large_string properties " + "but " "found " + schema_field.field->ToString()); } - int offset = (iter - std::begin(metadata_columns)); + int offset = (iter - std::begin(properties_columns)); if (iter->second->Equals(arrow::LargeStringType())) { - is_text_metadata[offset] = true; + is_text_properties[offset] = true; } else { - is_text_metadata[offset] = false; + is_text_properties[offset] = false; if (iter->second->byte_width() == -1) { throw std::runtime_error( - "Found non text metadata with unknown byte " + "Found non text properties with unknown byte " "width? " + iter->second->ToString()); } } - metadata_indices[offset] = schema_field.column_index; + properties_indices[offset] = schema_field.column_index; } } @@ -401,44 +402,46 @@ void shard_reader( } std::vector> - text_metadata_arrays(metadata_columns.size()); + text_properties_arrays(properties_columns.size()); std::vector> - primitive_metadata_arrays(metadata_columns.size()); + primitive_properties_arrays(properties_columns.size()); - for (size_t i = 0; i < metadata_columns.size(); i++) { - if (metadata_indices[i] == -1) { + for (size_t i = 0; i < properties_columns.size(); i++) { + if (properties_indices[i] == -1) { continue; } - if (is_text_metadata[i]) { - auto metadata_array = + if (is_text_properties[i]) { + auto properties_array = std::dynamic_pointer_cast( - record_batch->column(metadata_indices[i])); - if (!metadata_array) { + record_batch->column(properties_indices[i])); + if (!properties_array) { throw std::runtime_error( - "Could not cast metadata array to text" + - metadata_columns[i].first + " " + - metadata_columns[i].second->ToString()); + "Could not cast properties array to text" + + properties_columns[i].first + " " + + properties_columns[i].second->ToString()); } - text_metadata_arrays[i] = metadata_array; + text_properties_arrays[i] = properties_array; } else { std::shared_ptr fixed_size_array; PARQUET_ASSIGN_OR_THROW( fixed_size_array, - record_batch->column(metadata_indices[i]) + record_batch->column(properties_indices[i]) ->View(std::make_shared< arrow::FixedSizeBinaryType>( - metadata_columns[i].second->byte_width()))); + properties_columns[i] + .second->byte_width()))); - auto metadata_array = std::dynamic_pointer_cast< + auto properties_array = std::dynamic_pointer_cast< arrow::FixedSizeBinaryArray>(fixed_size_array); - if (!metadata_array) { + if (!properties_array) { throw std::runtime_error( - "Could not cast metadata array to fixed size " + - metadata_columns[i].first + " " + - metadata_columns[i].second->ToString()); + "Could not cast properties array to fixed " + "size " + + properties_columns[i].first + " " + + properties_columns[i].second->ToString()); } - primitive_metadata_arrays[i] = metadata_array; + primitive_properties_arrays[i] = properties_array; } } @@ -486,21 +489,21 @@ void shard_reader( add_string_to_vector(data, text_value_array->Value(i)); } - for (size_t j = 0; j < metadata_columns.size(); j++) { - if (is_text_metadata[j]) { - if (text_metadata_arrays[j] && - text_metadata_arrays[j]->IsValid(i)) { + for (size_t j = 0; j < properties_columns.size(); j++) { + if (is_text_properties[j]) { + if (text_properties_arrays[j] && + text_properties_arrays[j]->IsValid(i)) { non_null[3 + j] = true; add_string_to_vector( - data, text_metadata_arrays[j]->Value(i)); + data, text_properties_arrays[j]->Value(i)); } } else { - if (primitive_metadata_arrays[j] && - primitive_metadata_arrays[j]->IsValid(i)) { + if (primitive_properties_arrays[j] && + primitive_properties_arrays[j]->IsValid(i)) { non_null[3 + j] = true; add_string_to_vector( data, - primitive_metadata_arrays[j]->GetView(i)); + primitive_properties_arrays[j]->GetView(i)); } } } @@ -857,39 +860,39 @@ sort_and_shard(const std::filesystem::path& source_directory, paths.push_back(entry.path()); } - auto set_metadata_fields = - get_metadata_fields_multithreaded(paths, num_shards); + auto set_properties_fields = + get_properties_fields_multithreaded(paths, num_shards); std::vector>> - metadata_columns(std::begin(set_metadata_fields), - std::end(set_metadata_fields)); - std::sort(std::begin(metadata_columns), std::end(metadata_columns)); - - metadata_columns.erase( - std::unique(std::begin(metadata_columns), std::end(metadata_columns), - [](const auto& a, const auto& b) { - return (a.first == b.first) && - a.second->Equals(b.second); - }), - std::end(metadata_columns)); - - for (ssize_t i = 0; i < static_cast(metadata_columns.size()) - 1; + properties_columns(std::begin(set_properties_fields), + std::end(set_properties_fields)); + std::sort(std::begin(properties_columns), std::end(properties_columns)); + + properties_columns.erase(std::unique(std::begin(properties_columns), + std::end(properties_columns), + [](const auto& a, const auto& b) { + return (a.first == b.first) && + a.second->Equals(b.second); + }), + std::end(properties_columns)); + + for (ssize_t i = 0; i < static_cast(properties_columns.size()) - 1; i++) { - if (metadata_columns[i].first == metadata_columns[i + 1].first) { + if (properties_columns[i].first == properties_columns[i + 1].first) { throw std::runtime_error( "Got conflicting types for column " + - metadata_columns[i].first + - ", types: " + metadata_columns[i].second->ToString() + " vs " + - metadata_columns[i + 1].second->ToString()); + properties_columns[i].first + + ", types: " + properties_columns[i].second->ToString() + + " vs " + properties_columns[i + 1].second->ToString()); } } - if (metadata_columns.size() + 3 > + if (properties_columns.size() + 3 > std::numeric_limits::digits) { throw std::runtime_error( "C++ MEDS-ETL currently only supports at most " + std::to_string(std::numeric_limits::digits) + - " metadata columns"); + " properties columns"); } moodycamel::BlockingConcurrentQueue> file_queue; @@ -914,9 +917,9 @@ sort_and_shard(const std::filesystem::path& source_directory, for (size_t i = 0; i < num_shards; i++) { threads.emplace_back([i, &file_queue, &write_queues, &write_semaphore, - num_shards, &metadata_columns]() { + num_shards, &properties_columns]() { shard_reader(i, num_shards, file_queue, write_queues, - write_semaphore, metadata_columns); + write_semaphore, properties_columns); }); threads.emplace_back([i, &write_queues, &write_semaphore, num_shards, @@ -945,41 +948,46 @@ sort_and_shard(const std::filesystem::path& source_directory, "Had excess unsorted items. This should not be possible"); } - return metadata_columns; + return properties_columns; } void join_and_write_single( const std::filesystem::path& source_directory, const std::filesystem::path& target_path, const std::vector>>& - metadata_columns) { - arrow::FieldVector metadata_fields; + properties_columns) { + arrow::FieldVector properties_fields; std::bitset::digits> - is_text_metadata; - for (size_t i = 0; i < metadata_columns.size(); i++) { - const auto& metadata_column = metadata_columns[i]; - if (metadata_column.second->Equals(arrow::LargeStringType())) { - is_text_metadata[i] = true; - metadata_fields.push_back(arrow::field( - metadata_column.first, std::make_shared())); + is_text_properties; + for (size_t i = 0; i < properties_columns.size(); i++) { + const auto& properties_column = properties_columns[i]; + if (properties_column.second->Equals(arrow::LargeStringType())) { + is_text_properties[i] = true; + properties_fields.push_back( + arrow::field(properties_column.first, + std::make_shared())); } else { - is_text_metadata[i] = false; - metadata_fields.push_back( - arrow::field(metadata_column.first, metadata_column.second)); + is_text_properties[i] = false; + properties_fields.push_back(arrow::field(properties_column.first, + properties_column.second)); } } - std::shared_ptr metadata_type; - if (metadata_columns.size() != 0) { - metadata_type = std::make_shared(metadata_fields); + std::shared_ptr properties_type; + if (properties_columns.size() != 0) { + properties_type = + std::make_shared(properties_fields); } else { - metadata_type = std::make_shared(); + properties_type = std::make_shared(); } auto timestamp_type = std::make_shared(arrow::TimeUnit::MICRO); - auto measurement_type_fields = { + auto event_type_fields = { + arrow::field("time", std::make_shared( + arrow::TimeUnit::MICRO)), + arrow::field("code", std::make_shared()), arrow::field("text_value", std::make_shared()), @@ -987,25 +995,13 @@ void join_and_write_single( arrow::field("datetime_value", std::make_shared( arrow::TimeUnit::MICRO)), - arrow::field("metadata", metadata_type), - }; - - auto measurement_type = - std::make_shared(measurement_type_fields); - - auto event_type_fields = { - arrow::field("time", std::make_shared( - arrow::TimeUnit::MICRO)), - arrow::field("measurements", - std::make_shared(measurement_type)), + arrow::field("properties", properties_type), }; auto event_type = std::make_shared(event_type_fields); auto events_type = std::make_shared(event_type); auto schema_fields = { arrow::field("patient_id", std::make_shared()), - arrow::field("static_measurements", - std::make_shared()), arrow::field("events", events_type), }; auto schema = std::make_shared(schema_fields); @@ -1066,8 +1062,6 @@ void join_and_write_single( } auto patient_id_builder = std::make_shared(pool); - auto static_measurements_builder = - std::make_shared(pool); auto code_builder = std::make_shared(pool); @@ -1076,55 +1070,48 @@ void join_and_write_single( auto datetime_value_builder = std::make_shared(timestamp_type, pool); - std::vector> text_metadata_builders( - metadata_columns.size()); + std::vector> text_properties_builders( + properties_columns.size()); std::vector> - primitive_metadata_builders(metadata_columns.size()); + primitive_properties_builders(properties_columns.size()); - std::shared_ptr metadata_builder; - std::shared_ptr null_metadata_builder; - std::shared_ptr metadata_builder_holder; + std::shared_ptr properties_builder; + std::shared_ptr null_properties_builder; + std::shared_ptr properties_builder_holder; - if (metadata_columns.size() != 0) { - std::vector> metadata_builders( - metadata_columns.size()); - for (size_t i = 0; i < metadata_columns.size(); i++) { - if (is_text_metadata[i]) { + if (properties_columns.size() != 0) { + std::vector> properties_builders( + properties_columns.size()); + for (size_t i = 0; i < properties_columns.size(); i++) { + if (is_text_properties[i]) { auto builder = std::make_shared(pool); - text_metadata_builders[i] = builder; - metadata_builders[i] = builder; + text_properties_builders[i] = builder; + properties_builders[i] = builder; } else { auto builder = std::make_shared( std::make_shared( - metadata_columns[i].second->byte_width())); - primitive_metadata_builders[i] = builder; - metadata_builders[i] = builder; + properties_columns[i].second->byte_width())); + primitive_properties_builders[i] = builder; + properties_builders[i] = builder; } } - metadata_builder = std::make_shared( - metadata_type, pool, metadata_builders); - metadata_builder_holder = metadata_builder; + properties_builder = std::make_shared( + properties_type, pool, properties_builders); + properties_builder_holder = properties_builder; } else { - null_metadata_builder = std::make_shared(pool); - metadata_builder_holder = null_metadata_builder; + null_properties_builder = std::make_shared(pool); + properties_builder_holder = null_properties_builder; } - std::vector> - measurement_builder_fields{ - code_builder, text_value_builder, numeric_value_builder, - datetime_value_builder, metadata_builder_holder}; - - auto measurement_builder = std::make_shared( - measurement_type, pool, measurement_builder_fields); - auto time_builder = std::make_shared(timestamp_type, pool); - auto measurements_builder = - std::make_shared(pool, measurement_builder); std::vector> event_builder_fields{ - time_builder, measurements_builder}; + time_builder, code_builder, + text_value_builder, numeric_value_builder, + datetime_value_builder, properties_builder_holder}; + auto event_builder = std::make_shared( event_type, pool, event_builder_fields); @@ -1132,14 +1119,12 @@ void join_and_write_single( std::make_shared(pool, event_builder); auto flush_arrays = [&]() { - std::vector> columns(3); + std::vector> columns(2); PARQUET_THROW_NOT_OK(patient_id_builder->Finish(columns.data() + 0)); - PARQUET_THROW_NOT_OK( - static_measurements_builder->Finish(columns.data() + 1)); std::shared_ptr events_array; PARQUET_THROW_NOT_OK(events_builder->Finish(&events_array)); - PARQUET_ASSIGN_OR_THROW(columns[2], events_array->View(events_type)); + PARQUET_ASSIGN_OR_THROW(columns[1], events_array->View(events_type)); std::shared_ptr table = arrow::Table::Make(schema, columns); @@ -1151,7 +1136,6 @@ void join_and_write_single( bool is_first = true; int64_t last_patient_id = -1; - int64_t last_time = -1; while (!queue.empty()) { auto next = std::move(queue.top()); @@ -1170,24 +1154,14 @@ void join_and_write_single( } last_patient_id = patient_id; - last_time = time; PARQUET_THROW_NOT_OK(patient_id_builder->Append(patient_id)); - PARQUET_THROW_NOT_OK(static_measurements_builder->AppendNull()); PARQUET_THROW_NOT_OK(events_builder->Append()); - - PARQUET_THROW_NOT_OK(event_builder->Append()); - PARQUET_THROW_NOT_OK(time_builder->Append(time)); - PARQUET_THROW_NOT_OK(measurements_builder->Append()); - } else if (time != last_time) { - last_time = time; - - PARQUET_THROW_NOT_OK(event_builder->Append()); - PARQUET_THROW_NOT_OK(time_builder->Append(time)); - PARQUET_THROW_NOT_OK(measurements_builder->Append()); } - PARQUET_THROW_NOT_OK(measurement_builder->Append()); + PARQUET_THROW_NOT_OK(event_builder->Append()); + PARQUET_THROW_NOT_OK(time_builder->Append(time)); + std::bitset::digits> non_null( *reinterpret_cast( patient_record.data() + patient_record.size() - @@ -1230,33 +1204,33 @@ void join_and_write_single( PARQUET_THROW_NOT_OK(text_value_builder->AppendNull()); } - if (metadata_columns.size() == 0) { - PARQUET_THROW_NOT_OK(null_metadata_builder->AppendNull()); + if (properties_columns.size() == 0) { + PARQUET_THROW_NOT_OK(null_properties_builder->AppendNull()); } else { - PARQUET_THROW_NOT_OK(metadata_builder->Append()); + PARQUET_THROW_NOT_OK(properties_builder->Append()); - for (size_t j = 0; j < metadata_columns.size(); j++) { + for (size_t j = 0; j < properties_columns.size(); j++) { if (non_null[3 + j]) { size_t size = *reinterpret_cast( patient_record.substr(offset).data()); offset += sizeof(size); auto entry = patient_record.substr(offset, size); - if (is_text_metadata[j]) { + if (is_text_properties[j]) { PARQUET_THROW_NOT_OK( - text_metadata_builders[j]->Append(entry)); + text_properties_builders[j]->Append(entry)); } else { PARQUET_THROW_NOT_OK( - primitive_metadata_builders[j]->Append(entry)); + primitive_properties_builders[j]->Append(entry)); } offset += size; } else { - if (is_text_metadata[j]) { + if (is_text_properties[j]) { PARQUET_THROW_NOT_OK( - text_metadata_builders[j]->AppendNull()); + text_properties_builders[j]->AppendNull()); } else { PARQUET_THROW_NOT_OK( - primitive_metadata_builders[j]->AppendNull()); + primitive_properties_builders[j]->AppendNull()); } } } @@ -1283,7 +1257,7 @@ void join_and_write( const std::filesystem::path& source_directory, const std::filesystem::path& target_directory, const std::vector>>& - metadata_columns) { + properties_columns) { std::filesystem::create_directory(target_directory); std::vector shards; @@ -1295,12 +1269,12 @@ void join_and_write( std::vector threads; for (const auto& shard : shards) { - threads.emplace_back( - [shard, &source_directory, &target_directory, &metadata_columns]() { - join_and_write_single(source_directory / shard, - target_directory / (shard + ".parquet"), - metadata_columns); - }); + threads.emplace_back([shard, &source_directory, &target_directory, + &properties_columns]() { + join_and_write_single(source_directory / shard, + target_directory / (shard + ".parquet"), + properties_columns); + }); } for (auto& thread : threads) { @@ -1323,9 +1297,9 @@ void perform_etl(const std::string& source_directory, std::filesystem::path shard_path = target_path / "shards"; std::filesystem::path data_path = target_path / "data"; - auto metadata_columns = + auto properties_columns = sort_and_shard(source_path / "flat_data", shard_path, num_shards); - join_and_write(shard_path, data_path, metadata_columns); + join_and_write(shard_path, data_path, properties_columns); fs::remove_all(shard_path); }