Skip to content

Commit

Permalink
Add support for no metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanSteinberg committed Apr 11, 2024
1 parent b228adb commit a7ff3db
Showing 1 changed file with 66 additions and 42 deletions.
108 changes: 66 additions & 42 deletions native/perform_etl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,12 @@ void join_and_write_single(
}
}

auto metadata_type = std::make_shared<arrow::StructType>(metadata_fields);
std::shared_ptr<arrow::DataType> metadata_type;
if (metadata_columns.size() != 0) {
metadata_type = std::make_shared<arrow::StructType>(metadata_fields);
} else {
metadata_type = std::make_shared<arrow::FloatType>();
}

auto timestamp_type =
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
Expand All @@ -984,6 +989,7 @@ void join_and_write_single(

arrow::field("metadata", metadata_type),
};

auto measurement_type =
std::make_shared<arrow::StructType>(measurement_type_fields);

Expand Down Expand Up @@ -1074,29 +1080,41 @@ void join_and_write_single(
metadata_columns.size());
std::vector<std::shared_ptr<arrow::FixedSizeBinaryBuilder>>
primitive_metadata_builders(metadata_columns.size());
std::vector<std::shared_ptr<arrow::ArrayBuilder>> metadata_builders(
metadata_columns.size());
for (size_t i = 0; i < metadata_columns.size(); i++) {
if (is_text_metadata[i]) {
auto builder = std::make_shared<arrow::StringBuilder>(pool);
text_metadata_builders[i] = builder;
metadata_builders[i] = builder;
} else {
auto builder = std::make_shared<arrow::FixedSizeBinaryBuilder>(
std::make_shared<arrow::FixedSizeBinaryType>(
metadata_columns[i].second->byte_width()));
primitive_metadata_builders[i] = builder;
metadata_builders[i] = builder;

std::shared_ptr<arrow::StructBuilder> metadata_builder;
std::shared_ptr<arrow::FloatBuilder> null_metadata_builder;
std::shared_ptr<arrow::ArrayBuilder> metadata_builder_holder;

if (metadata_columns.size() != 0) {
std::vector<std::shared_ptr<arrow::ArrayBuilder>> metadata_builders(
metadata_columns.size());
for (size_t i = 0; i < metadata_columns.size(); i++) {
if (is_text_metadata[i]) {
auto builder = std::make_shared<arrow::StringBuilder>(pool);
text_metadata_builders[i] = builder;
metadata_builders[i] = builder;
} else {
auto builder = std::make_shared<arrow::FixedSizeBinaryBuilder>(
std::make_shared<arrow::FixedSizeBinaryType>(
metadata_columns[i].second->byte_width()));
primitive_metadata_builders[i] = builder;
metadata_builders[i] = builder;
}
}
}

auto metadata_builder = std::make_shared<arrow::StructBuilder>(
metadata_type, pool, metadata_builders);
metadata_builder = std::make_shared<arrow::StructBuilder>(
metadata_type, pool, metadata_builders);
metadata_builder_holder = metadata_builder;
} else {
null_metadata_builder = std::make_shared<arrow::FloatBuilder>(pool);
metadata_builder_holder = null_metadata_builder;
}

std::vector<std::shared_ptr<arrow::ArrayBuilder>>
measurement_builder_fields{code_builder, text_value_builder,
numeric_value_builder,
datetime_value_builder, metadata_builder};
measurement_builder_fields{
code_builder, text_value_builder, numeric_value_builder,
datetime_value_builder, metadata_builder_holder};

auto measurement_builder = std::make_shared<arrow::StructBuilder>(
measurement_type, pool, measurement_builder_fields);

Expand Down Expand Up @@ -1125,6 +1143,7 @@ void join_and_write_single(

std::shared_ptr<arrow::Table> table =
arrow::Table::Make(schema, columns);

PARQUET_THROW_NOT_OK(writer->WriteTable(*table));

amount_written = 0;
Expand Down Expand Up @@ -1211,29 +1230,34 @@ void join_and_write_single(
PARQUET_THROW_NOT_OK(text_value_builder->AppendNull());
}

PARQUET_THROW_NOT_OK(metadata_builder->Append());
for (size_t j = 0; j < metadata_columns.size(); j++) {
if (non_null[3 + j]) {
size_t size = *reinterpret_cast<const size_t*>(
patient_record.substr(offset).data());
offset += sizeof(size);
auto entry = patient_record.substr(offset, size);

if (is_text_metadata[j]) {
PARQUET_THROW_NOT_OK(
text_metadata_builders[j]->Append(entry));
} else {
PARQUET_THROW_NOT_OK(
primitive_metadata_builders[j]->Append(entry));
}
offset += size;
} else {
if (is_text_metadata[j]) {
PARQUET_THROW_NOT_OK(
text_metadata_builders[j]->AppendNull());
if (metadata_columns.size() == 0) {
PARQUET_THROW_NOT_OK(null_metadata_builder->AppendNull());
} else {
PARQUET_THROW_NOT_OK(metadata_builder->Append());

for (size_t j = 0; j < metadata_columns.size(); j++) {
if (non_null[3 + j]) {
size_t size = *reinterpret_cast<const size_t*>(
patient_record.substr(offset).data());
offset += sizeof(size);
auto entry = patient_record.substr(offset, size);

if (is_text_metadata[j]) {
PARQUET_THROW_NOT_OK(
text_metadata_builders[j]->Append(entry));
} else {
PARQUET_THROW_NOT_OK(
primitive_metadata_builders[j]->Append(entry));
}
offset += size;
} else {
PARQUET_THROW_NOT_OK(
primitive_metadata_builders[j]->AppendNull());
if (is_text_metadata[j]) {
PARQUET_THROW_NOT_OK(
text_metadata_builders[j]->AppendNull());
} else {
PARQUET_THROW_NOT_OK(
primitive_metadata_builders[j]->AppendNull());
}
}
}
}
Expand Down

0 comments on commit a7ff3db

Please sign in to comment.