Skip to content

Commit

Permalink
stub in new SizeStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidl committed Aug 29, 2023
1 parent 1452200 commit 9589fc3
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 72 deletions.
24 changes: 22 additions & 2 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,12 @@ bool CompactProtocolReader::read(ColumnChunkMetaData* c)
ParquetFieldInt64(5, c->num_values),
ParquetFieldInt64(6, c->total_uncompressed_size),
ParquetFieldInt64(7, c->total_compressed_size),
ParquetFieldStructList(8, c->key_value_metadata),
ParquetFieldInt64(9, c->data_page_offset),
ParquetFieldInt64(10, c->index_page_offset),
ParquetFieldInt64(11, c->dictionary_page_offset),
ParquetFieldStruct(12, c->statistics));
ParquetFieldStruct(12, c->statistics),
OptionalParquetFieldStruct(16, c->size_estimate_statistics));
return function_builder(this, op);
}

Expand Down Expand Up @@ -308,13 +310,31 @@ bool CompactProtocolReader::read(OffsetIndex* o)
return function_builder(this, op);
}

bool CompactProtocolReader::read(RepetitionDefinitionLevelHistogram* r)
{
auto op = std::make_tuple(OptionalParquetFieldInt64List(1, r->repetition_level_histogram),
OptionalParquetFieldInt64List(2, r->definition_level_histogram));
return function_builder(this, op);
}

bool CompactProtocolReader::read(SizeStatistics* s)
{
auto op =
std::make_tuple(OptionalParquetFieldInt64(1, s->unencoded_variable_width_stored_bytes),
OptionalParquetFieldStruct(2, s->repetition_definition_level_histogram));
return function_builder(this, op);
}

bool CompactProtocolReader::read(ColumnIndex* c)
{
auto op = std::make_tuple(ParquetFieldBoolList(1, c->null_pages),
ParquetFieldBinaryList(2, c->min_values),
ParquetFieldBinaryList(3, c->max_values),
ParquetFieldEnum<BoundaryOrder>(4, c->boundary_order),
ParquetFieldInt64List(5, c->null_counts));
ParquetFieldInt64List(5, c->null_counts),
// FIXME(ets): this will likely be RepetitionDefinitionLevelHistogram
// https://github.com/apache/parquet-format/pull/197
OptionalParquetFieldStruct(6, c->size_statistics));
return function_builder(this, op);
}

Expand Down
73 changes: 73 additions & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class CompactProtocolReader {
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
bool read(RepetitionDefinitionLevelHistogram* r);
bool read(SizeStatistics* s);
bool read(ColumnIndex* c);
bool read(Statistics* s);

Expand Down Expand Up @@ -299,6 +301,27 @@ class ParquetFieldInt64 {
int field() { return field_val; }
};

/**
* @brief Functor to set value to 64 bit integer read from CompactProtocolReader
*
* @return True if field type is not int32 or int64
*/
class OptionalParquetFieldInt64 {
int field_val;
std::optional<int64_t>& val;

public:
OptionalParquetFieldInt64(int f, std::optional<int64_t>& v) : field_val(f), val(v) {}

inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
val = cpr->get_i64();
return (field_type < ST_FLD_I16 || field_type > ST_FLD_I64);
}

int field() { return field_val; }
};

/**
* @brief Functor to read a vector of 64-bit integers from CompactProtocolReader
*
Expand Down Expand Up @@ -327,6 +350,32 @@ class ParquetFieldInt64List {
int field() { return field_val; }
};

class OptionalParquetFieldInt64List {
int field_val;
std::optional<std::vector<int64_t>>& val;

public:
OptionalParquetFieldInt64List(int f, std::optional<std::vector<int64_t>>& v)
: field_val(f), val(v)
{
}
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_LIST) return true;
val = std::vector<int64_t>();
uint8_t t;
int32_t n = cpr->get_listh(&t);
if (t != ST_FLD_I64) return true;
val.value().resize(n);
for (int32_t i = 0; i < n; i++) {
val.value()[i] = cpr->get_i64();
}
return false;
}

int field() { return field_val; }
};

/**
* @brief Functor to read a vector of structures from CompactProtocolReader
*
Expand Down Expand Up @@ -423,6 +472,30 @@ ParquetFieldStructFunctor<T> ParquetFieldStruct(int f, T& v)
return ParquetFieldStructFunctor<T>(f, v);
}

template <typename T>
class OptionalParquetFieldStructFunctor {
int field_val;
std::optional<T>& val;

public:
OptionalParquetFieldStructFunctor(int f, std::optional<T>& v) : field_val(f), val(v) {}

inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_STRUCT) { return true; }
val = T{};
return !(cpr->read(&val.value()));
}

int field() { return field_val; }
};

template <typename T>
OptionalParquetFieldStructFunctor<T> OptionalParquetFieldStruct(int f, std::optional<T>& v)
{
return OptionalParquetFieldStructFunctor<T>(f, v);
}

/**
* @brief Functor to read a union member from CompactProtocolReader
*
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,14 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
c.field_int(5, s.num_values);
c.field_int(6, s.total_uncompressed_size);
c.field_int(7, s.total_compressed_size);
if (s.key_value_metadata.size() != 0) { c.field_struct_list(8, s.key_value_metadata); }
c.field_int(9, s.data_page_offset);
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
c.field_struct(12, s.statistics);
if (s.size_estimate_statistics.has_value()) {
c.field_struct(16, s.size_estimate_statistics.value());
}
return c.value();
}

Expand Down Expand Up @@ -233,6 +237,30 @@ size_t CompactProtocolWriter::write(OffsetIndex const& s)
return c.value();
}

size_t CompactProtocolWriter::write(RepetitionDefinitionLevelHistogram const& r)
{
CompactProtocolFieldWriter c(*this);
if (r.repetition_level_histogram.has_value()) {
c.field_int_list(1, r.repetition_level_histogram.value());
}
if (r.definition_level_histogram.has_value()) {
c.field_int_list(2, r.definition_level_histogram.value());
}
return c.value();
}

size_t CompactProtocolWriter::write(SizeStatistics const& s)
{
CompactProtocolFieldWriter c(*this);
if (s.unencoded_variable_width_stored_bytes.has_value()) {
c.field_int(1, s.unencoded_variable_width_stored_bytes.value());
}
if (s.repetition_definition_level_histogram.has_value()) {
c.field_struct(2, s.repetition_definition_level_histogram.value());
}
return c.value();
}

void CompactProtocolFieldWriter::put_byte(uint8_t v) { writer.m_buf.push_back(v); }

void CompactProtocolFieldWriter::put_byte(uint8_t const* raw, uint32_t len)
Expand Down Expand Up @@ -308,6 +336,17 @@ inline void CompactProtocolFieldWriter::field_int_list(int field, std::vector<En
current_field_value = field;
}

inline void CompactProtocolFieldWriter::field_int_list(int field, const std::vector<int64_t>& val)
{
put_field_header(field, current_field_value, ST_FLD_LIST);
put_byte((uint8_t)((std::min(val.size(), (size_t)0xfu) << 4) | ST_FLD_I64));
if (val.size() >= 0xf) put_uint(val.size());
for (auto& v : val) {
put_int(static_cast<int32_t>(v));
}
current_field_value = field;
}

template <typename T>
inline void CompactProtocolFieldWriter::field_struct(int field, T const& val)
{
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class CompactProtocolWriter {
size_t write(Statistics const&);
size_t write(PageLocation const&);
size_t write(OffsetIndex const&);
size_t write(RepetitionDefinitionLevelHistogram const&);
size_t write(SizeStatistics const&);

protected:
std::vector<uint8_t>& m_buf;
Expand Down Expand Up @@ -91,6 +93,8 @@ class CompactProtocolFieldWriter {
template <typename Enum>
inline void field_int_list(int field, std::vector<Enum> const& val);

inline void field_int_list(int field, const std::vector<int64_t>& val);

template <typename T>
inline void field_struct(int field, T const& val);

Expand Down
Loading

0 comments on commit 9589fc3

Please sign in to comment.