-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-44010: [C++] Add arrow::RecordBatch::MakeStatisticsArray()
#44252
base: main
Are you sure you want to change the base?
Changes from all commits
089f0a3
031024d
4a21c30
69f00f0
cc41a0f
9c529d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,24 @@ struct ArrowArray { | |
void* private_data; | ||
}; | ||
|
||
# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT "ARROW:average_byte_width:exact" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't know constexpr std::string_view is better or this is better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't use |
||
# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE \ | ||
"ARROW:average_byte_width:approximate" | ||
# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT "ARROW:distinct_count:exact" | ||
# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE \ | ||
"ARROW:distinct_count:approximate" | ||
# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT "ARROW:max_byte_width:exact" | ||
# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE \ | ||
"ARROW:max_byte_width:approximate" | ||
# define ARROW_STATISTICS_KEY_MAX_VALUE_EXACT "ARROW:max_value:exact" | ||
# define ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE "ARROW:max_value:approximate" | ||
# define ARROW_STATISTICS_KEY_MIN_VALUE_EXACT "ARROW:min_value:exact" | ||
# define ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE "ARROW:min_value:approximate" | ||
# define ARROW_STATISTICS_KEY_NULL_COUNT_EXACT "ARROW:null_count:exact" | ||
# define ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE "ARROW:null_count:approximate" | ||
# define ARROW_STATISTICS_KEY_ROW_COUNT_EXACT "ARROW:row_count:exact" | ||
# define ARROW_STATISTICS_KEY_ROW_COUNT_APPROXIMATE "ARROW:row_count:approximate" | ||
|
||
#endif // ARROW_C_DATA_INTERFACE | ||
|
||
#ifndef ARROW_C_DEVICE_DATA_INTERFACE | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,8 +26,13 @@ | |
#include <utility> | ||
|
||
#include "arrow/array.h" | ||
#include "arrow/array/builder_binary.h" | ||
#include "arrow/array/builder_dict.h" | ||
#include "arrow/array/builder_nested.h" | ||
#include "arrow/array/builder_union.h" | ||
#include "arrow/array/concatenate.h" | ||
#include "arrow/array/validate.h" | ||
#include "arrow/c/abi.h" | ||
#include "arrow/pretty_print.h" | ||
#include "arrow/status.h" | ||
#include "arrow/table.h" | ||
|
@@ -465,6 +470,220 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo( | |
return Make(schema_, num_rows(), std::move(copied_columns)); | ||
} | ||
|
||
namespace { | ||
struct EnumeratedStatistics { | ||
int nth_statistics = 0; | ||
bool start_new_column = false; | ||
std::optional<int32_t> nth_column = std::nullopt; | ||
const char* key = nullptr; | ||
std::shared_ptr<DataType> type = nullptr; | ||
ArrayStatistics::ValueType value = false; | ||
}; | ||
using OnStatistics = | ||
std::function<Status(const EnumeratedStatistics& enumerated_statistics)>; | ||
Status EnumerateStatistics(const RecordBatch& record_batch, OnStatistics on_statistics) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So actually this is for a two-phase building, one pass for types, and one-pass for data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. I think that it's one of complexities. |
||
EnumeratedStatistics statistics; | ||
statistics.nth_statistics = 0; | ||
statistics.start_new_column = true; | ||
statistics.nth_column = std::nullopt; | ||
|
||
statistics.key = ARROW_STATISTICS_KEY_ROW_COUNT_EXACT; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So RowCount is also handled as a stats 🤔? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Statistics array will be passed to consumer before consumer receives a record batch. But DuckDB doesn't have row count in its There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep this for now to demonstrate table/record batch level statistics. |
||
statistics.type = int64(); | ||
statistics.value = record_batch.num_rows(); | ||
RETURN_NOT_OK(on_statistics(statistics)); | ||
mapleFU marked this conversation as resolved.
Show resolved
Hide resolved
|
||
statistics.start_new_column = false; | ||
|
||
const auto num_fields = record_batch.schema()->num_fields(); | ||
for (int nth_column = 0; nth_column < num_fields; ++nth_column) { | ||
auto column_statistics = record_batch.column(nth_column)->statistics(); | ||
if (!column_statistics) { | ||
continue; | ||
} | ||
|
||
statistics.start_new_column = true; | ||
statistics.nth_column = nth_column; | ||
if (column_statistics->null_count.has_value()) { | ||
statistics.nth_statistics++; | ||
statistics.key = ARROW_STATISTICS_KEY_NULL_COUNT_EXACT; | ||
statistics.type = int64(); | ||
statistics.value = column_statistics->null_count.value(); | ||
RETURN_NOT_OK(on_statistics(statistics)); | ||
statistics.start_new_column = false; | ||
} | ||
|
||
if (column_statistics->distinct_count.has_value()) { | ||
statistics.nth_statistics++; | ||
statistics.key = ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT; | ||
statistics.type = int64(); | ||
statistics.value = column_statistics->distinct_count.value(); | ||
RETURN_NOT_OK(on_statistics(statistics)); | ||
statistics.start_new_column = false; | ||
} | ||
|
||
if (column_statistics->min.has_value()) { | ||
statistics.nth_statistics++; | ||
if (column_statistics->is_min_exact) { | ||
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_EXACT; | ||
} else { | ||
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE; | ||
} | ||
statistics.type = column_statistics->MinArrowType(); | ||
statistics.value = column_statistics->min.value(); | ||
RETURN_NOT_OK(on_statistics(statistics)); | ||
statistics.start_new_column = false; | ||
} | ||
|
||
if (column_statistics->max.has_value()) { | ||
statistics.nth_statistics++; | ||
if (column_statistics->is_max_exact) { | ||
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_EXACT; | ||
} else { | ||
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE; | ||
} | ||
statistics.type = column_statistics->MaxArrowType(); | ||
statistics.value = column_statistics->max.value(); | ||
RETURN_NOT_OK(on_statistics(statistics)); | ||
statistics.start_new_column = false; | ||
} | ||
} | ||
return Status::OK(); | ||
} | ||
} // namespace | ||
|
||
Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray( | ||
MemoryPool* memory_pool) const { | ||
// Statistics schema: | ||
// struct< | ||
// column: int32, | ||
// statistics: map< | ||
// key: dictionary< | ||
// indices: int32, | ||
// dictionary: utf8, | ||
// >, | ||
// items: dense_union<...all needed types...>, | ||
// > | ||
// > | ||
|
||
// Statistics schema doesn't define static dense union type for | ||
// values. Each statistics schema have a dense union type that has | ||
// needled value types. The following block collects these types. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So actually this is logically a "set" prepared for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. If there are the same types, the first type is only used. |
||
std::vector<std::shared_ptr<Field>> values_types; | ||
std::vector<int8_t> values_type_indexes; | ||
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) { | ||
int8_t i = 0; | ||
for (const auto& field : values_types) { | ||
if (field->type()->id() == statistics.type->id()) { | ||
break; | ||
} | ||
i++; | ||
} | ||
if (i == static_cast<int8_t>(values_types.size())) { | ||
values_types.push_back(field(statistics.type->name(), statistics.type)); | ||
} | ||
values_type_indexes.push_back(i); | ||
return Status::OK(); | ||
})); | ||
|
||
// statistics.key: dictionary<indices: int32, dictionary: utf8> | ||
auto keys_type = dictionary(int32(), utf8(), false); | ||
// statistics.items: dense_union<...all needed types...> | ||
auto values_type = dense_union(values_types); | ||
// struct< | ||
// column: int32, | ||
// statistics: map< | ||
// key: dictionary< | ||
// indices: int32, | ||
// dictionary: utf8, | ||
// >, | ||
// items: dense_union<...all needed types...>, | ||
// > | ||
// > | ||
auto statistics_type = | ||
struct_({field("column", int32()), | ||
field("statistics", map(keys_type, values_type, false))}); | ||
|
||
std::vector<std::shared_ptr<ArrayBuilder>> field_builders; | ||
// columns: int32 | ||
auto columns_builder = std::make_shared<Int32Builder>(memory_pool); | ||
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(columns_builder)); | ||
// statistics.key: dictionary<indices: int32, dictionary: utf8> | ||
auto keys_builder = std::make_shared<StringDictionary32Builder>(); | ||
// statistics.items: dense_union<...all needed types...> | ||
std::vector<std::shared_ptr<ArrayBuilder>> values_builders; | ||
for (const auto& values_type : values_types) { | ||
std::unique_ptr<ArrayBuilder> values_builder; | ||
RETURN_NOT_OK(MakeBuilder(memory_pool, values_type->type(), &values_builder)); | ||
values_builders.push_back(std::shared_ptr<ArrayBuilder>(std::move(values_builder))); | ||
} | ||
auto items_builder = std::make_shared<DenseUnionBuilder>( | ||
memory_pool, std::move(values_builders), values_type); | ||
// statistics: | ||
// map< | ||
// key: dictionary< | ||
// indices: int32, | ||
// dictionary: utf8, | ||
// >, | ||
// items: dense_union<...all needed types...>, | ||
// > | ||
auto values_builder = std::make_shared<MapBuilder>( | ||
memory_pool, std::static_pointer_cast<ArrayBuilder>(keys_builder), | ||
std::static_pointer_cast<ArrayBuilder>(items_builder)); | ||
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(values_builder)); | ||
// struct< | ||
// column: int32, | ||
// statistics: map< | ||
// key: dictionary< | ||
// indices: int32, | ||
// dictionary: utf8, | ||
// >, | ||
// items: dense_union<...all needed types...>, | ||
// > | ||
// > | ||
StructBuilder builder(statistics_type, memory_pool, std::move(field_builders)); | ||
|
||
// Append statistics. | ||
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) { | ||
if (statistics.start_new_column) { | ||
RETURN_NOT_OK(builder.Append()); | ||
if (statistics.nth_column.has_value()) { | ||
RETURN_NOT_OK(columns_builder->Append(statistics.nth_column.value())); | ||
} else { | ||
RETURN_NOT_OK(columns_builder->AppendNull()); | ||
} | ||
RETURN_NOT_OK(values_builder->Append()); | ||
} | ||
RETURN_NOT_OK(keys_builder->Append(statistics.key, | ||
static_cast<int32_t>(strlen(statistics.key)))); | ||
const auto values_type_index = values_type_indexes[statistics.nth_statistics]; | ||
RETURN_NOT_OK(items_builder->Append(values_type_index)); | ||
struct Visitor { | ||
ArrayBuilder* builder; | ||
|
||
Status operator()(const bool& value) { | ||
return static_cast<BooleanBuilder*>(builder)->Append(value); | ||
} | ||
Status operator()(const int64_t& value) { | ||
return static_cast<Int64Builder*>(builder)->Append(value); | ||
} | ||
Status operator()(const uint64_t& value) { | ||
return static_cast<UInt64Builder*>(builder)->Append(value); | ||
} | ||
Status operator()(const double& value) { | ||
return static_cast<DoubleBuilder*>(builder)->Append(value); | ||
} | ||
Status operator()(const std::string& value) { | ||
return static_cast<StringBuilder*>(builder)->Append( | ||
value.data(), static_cast<int32_t>(value.size())); | ||
} | ||
} visitor; | ||
visitor.builder = values_builders[values_type_index].get(); | ||
RETURN_NOT_OK(std::visit(visitor, statistics.value)); | ||
return Status::OK(); | ||
})); | ||
|
||
return builder.Finish(); | ||
} | ||
|
||
Status RecordBatch::Validate() const { | ||
return ValidateBatch(*this, /*full_validation=*/false); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may forgot a bit but we don't distinct "bytes" and "utf8" in stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, we didn't discuss it...
Let's discuss it in #44579.
We can assume "utf8" here for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a
// TODO(GH-44579)
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I should have added it...
I've added it.