Skip to content

Commit

Permalink
Fixing parquet list of struct interpretation (#13715)
Browse files Browse the repository at this point in the history
This change alters how we interpret non-annotated data in a parquet file. Most modern parquet writers would produce something like:
```
message spark_schema {
  required int32 id;
  optional group phoneNumbers (LIST) {
    repeated group phone {
      required int64 number;
      optional binary kind (STRING);
    }
  }
}
```

But the list annotation isn't required. If it didn't exist, we would incorrectly interpret this schema as a struct of struct and not a list of struct. This change alters the code to look at the child and see if it is repeated. If it is, this indicates a list.

closes #13664

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Mark Harris (https://github.com/harrism)

Approvers:
  - Mark Harris (https://github.com/harrism)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #13715
  • Loading branch information
hyperbolic2346 authored Oct 6, 2023
1 parent 04e2cd6 commit fc36947
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// for nested schemas, it's more complicated. This warp will visit 32 incoming values,
// however not all of them will necessarily represent a value at this nesting level. so
// the validity bit for thread t might actually represent output value t-6. the correct
// position for thread t's bit is cur_value_count. for cuda 11 we could use
// position for thread t's bit is thread_value_count. for cuda 11 we could use
// __reduce_or_sync(), but until then we have to do a warp reduce.
WarpReduceOr32(is_valid << thread_value_count);

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ struct SchemaElement {
{
return type == UNDEFINED_TYPE &&
// this assumption might be a little weak.
((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children == 2));
((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children > 1));
}
};

Expand Down
86 changes: 81 additions & 5 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,81 @@ type_id to_type_id(SchemaElement const& schema,
return type_id::EMPTY;
}

void metadata::sanitize_schema()
{
// Parquet isn't very strict about incoming metadata. Lots of things can and should be inferred.
// There are also a lot of rules that simply aren't followed and are expected to be worked around.
// This step sanitizes the metadata to something that isn't ambiguous.
//
// Take, for example, the following schema:
//
// required group field_id=-1 user {
// required int32 field_id=-1 id;
// optional group field_id=-1 phoneNumbers {
// repeated group field_id=-1 phone {
// required int64 field_id=-1 number;
// optional binary field_id=-1 kind (String);
// }
// }
// }
//
// This real-world example has no annotations telling us what is a list or a struct. On the
// surface this looks like a column of id's and a column of list<struct<int64, string>>, but this
// actually should be interpreted as a struct<list<struct<int64, string>>>. The phoneNumbers field
// has to be a struct because it is a group with no repeated tag and we have no annotation. The
// repeated group is actually BOTH a struct due to the multiple children and a list due to
// repeated.
//
// This code attempts to make this less messy for the code that follows.

std::function<void(size_t)> process = [&](size_t schema_idx) -> void {
if (schema_idx < 0) { return; }
auto& schema_elem = schema[schema_idx];
if (schema_idx != 0 && schema_elem.type == UNDEFINED_TYPE) {
auto const parent_type = schema[schema_elem.parent_idx].converted_type;
if (schema_elem.repetition_type == REPEATED && schema_elem.num_children > 1 &&
parent_type != LIST && parent_type != MAP) {
// This is a list of structs, so we need to mark this as a list, but also
// add a struct child and move this element's children to the struct
schema_elem.converted_type = LIST;
schema_elem.repetition_type = OPTIONAL;
auto const struct_node_idx = schema.size();

SchemaElement struct_elem;
struct_elem.name = "struct_node";
struct_elem.repetition_type = REQUIRED;
struct_elem.num_children = schema_elem.num_children;
struct_elem.type = UNDEFINED_TYPE;
struct_elem.converted_type = UNKNOWN;

// swap children
struct_elem.children_idx = std::move(schema_elem.children_idx);
schema_elem.children_idx = {struct_node_idx};
schema_elem.num_children = 1;

struct_elem.max_definition_level = schema_elem.max_definition_level;
struct_elem.max_repetition_level = schema_elem.max_repetition_level;
schema_elem.max_definition_level--;
schema_elem.max_repetition_level = schema[schema_elem.parent_idx].max_repetition_level;

// change parent index on new node and on children
struct_elem.parent_idx = schema_idx;
for (auto& child_idx : struct_elem.children_idx) {
schema[child_idx].parent_idx = struct_node_idx;
}
// add our struct
schema.push_back(struct_elem);
}
}

for (auto& child_idx : schema_elem.children_idx) {
process(child_idx);
}
};

process(0);
}

metadata::metadata(datasource* source)
{
constexpr auto header_len = sizeof(file_header_s);
Expand All @@ -195,6 +270,7 @@ metadata::metadata(datasource* source)
CompactProtocolReader cp(buffer->data(), ender->footer_len);
CUDF_EXPECTS(cp.read(this), "Cannot parse metadata");
CUDF_EXPECTS(cp.InitSchema(this), "Cannot initialize schema");
sanitize_schema();
}

std::vector<metadata> aggregate_reader_metadata::metadatas_from_sources(
Expand Down Expand Up @@ -445,8 +521,10 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
child_col_name_info, schema_elem.children_idx[0], out_col_array, has_list_parent);
}

auto const one_level_list = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx));

// if we're at the root, this is a new output column
auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))
auto const col_type = one_level_list
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);
Expand Down Expand Up @@ -485,7 +563,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
input_column_info{schema_idx, schema_elem.name, schema_elem.max_repetition_level > 0});

// set up child output column for one-level encoding list
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
if (one_level_list) {
// determine the element data type
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
Expand All @@ -506,9 +584,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));

// pop off the extra nesting element.
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
nesting.pop_back();
}
if (one_level_list) { nesting.pop_back(); }

path_is_valid = true; // If we're able to reach leaf then path is valid
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ using namespace cudf::io::parquet;
*/
struct metadata : public FileMetaData {
explicit metadata(datasource* source);
void sanitize_schema();
};

class aggregate_reader_metadata {
Expand Down
78 changes: 78 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6732,4 +6732,82 @@ TEST_P(ParquetV2Test, CheckEncodings)
}
}

TEST_F(ParquetReaderTest, RepeatedNoAnnotations)
{
constexpr unsigned char repeated_bytes[] = {
0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x30, 0x15, 0x30, 0x4c, 0x15, 0x0c, 0x15, 0x00, 0x12,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x04, 0x00,
0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x0a, 0x15, 0x0a,
0x2c, 0x15, 0x0c, 0x15, 0x10, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x03, 0x03, 0x88, 0xc6, 0x02,
0x26, 0x80, 0x01, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x10, 0x19, 0x18, 0x02, 0x69, 0x64, 0x15,
0x00, 0x16, 0x0c, 0x16, 0x78, 0x16, 0x78, 0x26, 0x54, 0x26, 0x08, 0x00, 0x00, 0x15, 0x04, 0x15,
0x40, 0x15, 0x40, 0x4c, 0x15, 0x08, 0x15, 0x00, 0x12, 0x00, 0x00, 0xe3, 0x0c, 0x23, 0x4b, 0x01,
0x00, 0x00, 0x00, 0xc7, 0x35, 0x3a, 0x42, 0x00, 0x00, 0x00, 0x00, 0x8e, 0x6b, 0x74, 0x84, 0x00,
0x00, 0x00, 0x00, 0x55, 0xa1, 0xae, 0xc6, 0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x22, 0x15,
0x22, 0x2c, 0x15, 0x10, 0x15, 0x10, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
0x03, 0xc0, 0x03, 0x00, 0x00, 0x00, 0x03, 0x90, 0xaa, 0x02, 0x03, 0x94, 0x03, 0x26, 0xda, 0x02,
0x1c, 0x15, 0x04, 0x19, 0x25, 0x00, 0x10, 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e,
0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x06, 0x6e, 0x75, 0x6d,
0x62, 0x65, 0x72, 0x15, 0x00, 0x16, 0x10, 0x16, 0xa0, 0x01, 0x16, 0xa0, 0x01, 0x26, 0x96, 0x02,
0x26, 0xba, 0x01, 0x00, 0x00, 0x15, 0x04, 0x15, 0x24, 0x15, 0x24, 0x4c, 0x15, 0x04, 0x15, 0x00,
0x12, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x68, 0x6f, 0x6d, 0x65, 0x06, 0x00, 0x00, 0x00, 0x6d,
0x6f, 0x62, 0x69, 0x6c, 0x65, 0x15, 0x00, 0x15, 0x20, 0x15, 0x20, 0x2c, 0x15, 0x10, 0x15, 0x10,
0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0xc0, 0x03, 0x00, 0x00, 0x00,
0x03, 0x90, 0xef, 0x01, 0x03, 0x04, 0x26, 0xcc, 0x04, 0x1c, 0x15, 0x0c, 0x19, 0x25, 0x00, 0x10,
0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05,
0x70, 0x68, 0x6f, 0x6e, 0x65, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x15, 0x00, 0x16, 0x10, 0x16, 0x82,
0x01, 0x16, 0x82, 0x01, 0x26, 0x8a, 0x04, 0x26, 0xca, 0x03, 0x00, 0x00, 0x15, 0x02, 0x19, 0x6c,
0x48, 0x04, 0x75, 0x73, 0x65, 0x72, 0x15, 0x04, 0x00, 0x15, 0x02, 0x25, 0x00, 0x18, 0x02, 0x69,
0x64, 0x00, 0x35, 0x02, 0x18, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65,
0x72, 0x73, 0x15, 0x02, 0x00, 0x35, 0x04, 0x18, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x15, 0x04,
0x00, 0x15, 0x04, 0x25, 0x00, 0x18, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x00, 0x15, 0x0c,
0x25, 0x02, 0x18, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x25, 0x00, 0x00, 0x16, 0x00, 0x19, 0x1c, 0x19,
0x3c, 0x26, 0x80, 0x01, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x10, 0x19, 0x18, 0x02, 0x69, 0x64,
0x15, 0x00, 0x16, 0x0c, 0x16, 0x78, 0x16, 0x78, 0x26, 0x54, 0x26, 0x08, 0x00, 0x00, 0x26, 0xda,
0x02, 0x1c, 0x15, 0x04, 0x19, 0x25, 0x00, 0x10, 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65,
0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x06, 0x6e, 0x75,
0x6d, 0x62, 0x65, 0x72, 0x15, 0x00, 0x16, 0x10, 0x16, 0xa0, 0x01, 0x16, 0xa0, 0x01, 0x26, 0x96,
0x02, 0x26, 0xba, 0x01, 0x00, 0x00, 0x26, 0xcc, 0x04, 0x1c, 0x15, 0x0c, 0x19, 0x25, 0x00, 0x10,
0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05,
0x70, 0x68, 0x6f, 0x6e, 0x65, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x15, 0x00, 0x16, 0x10, 0x16, 0x82,
0x01, 0x16, 0x82, 0x01, 0x26, 0x8a, 0x04, 0x26, 0xca, 0x03, 0x00, 0x00, 0x16, 0x9a, 0x03, 0x16,
0x0c, 0x00, 0x28, 0x49, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x72, 0x73, 0x20, 0x76,
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x30, 0x2e, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x62, 0x75,
0x69, 0x6c, 0x64, 0x20, 0x62, 0x34, 0x35, 0x63, 0x65, 0x37, 0x63, 0x62, 0x61, 0x32, 0x31, 0x39,
0x39, 0x66, 0x32, 0x32, 0x64, 0x39, 0x33, 0x32, 0x36, 0x39, 0x63, 0x31, 0x35, 0x30, 0x64, 0x38,
0x61, 0x38, 0x33, 0x39, 0x31, 0x36, 0x63, 0x36, 0x39, 0x62, 0x35, 0x65, 0x29, 0x00, 0x32, 0x01,
0x00, 0x00, 0x50, 0x41, 0x52, 0x31};

auto read_opts = cudf::io::parquet_reader_options::builder(
cudf::io::source_info{reinterpret_cast<char const*>(repeated_bytes), sizeof(repeated_bytes)});
auto result = cudf::io::read_parquet(read_opts);

EXPECT_EQ(result.tbl->view().column(0).size(), 6);
EXPECT_EQ(result.tbl->view().num_columns(), 2);

column_wrapper<int32_t> col0{1, 2, 3, 4, 5, 6};
column_wrapper<int64_t> child0{{5555555555l, 1111111111l, 1111111111l, 2222222222l, 3333333333l}};
cudf::test::strings_column_wrapper child1{{"-", "home", "home", "-", "mobile"}, {0, 1, 1, 0, 1}};
auto struct_col = cudf::test::structs_column_wrapper{{child0, child1}};

auto list_offsets_column =
cudf::test::fixed_width_column_wrapper<cudf::size_type>{0, 0, 0, 0, 1, 2, 5}.release();
auto num_list_rows = list_offsets_column->size() - 1;

auto mask = cudf::create_null_mask(6, cudf::mask_state::ALL_VALID);
cudf::set_null_mask(static_cast<cudf::bitmask_type*>(mask.data()), 0, 2, false);

auto list_col = cudf::make_lists_column(
num_list_rows, std::move(list_offsets_column), struct_col.release(), 2, std::move(mask));

std::vector<std::unique_ptr<cudf::column>> struct_children;
struct_children.push_back(std::move(list_col));

auto outer_struct =
cudf::test::structs_column_wrapper{{std::move(struct_children)}, {0, 0, 1, 1, 1, 1}};
table_view expected{{col0, outer_struct}};

CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected);
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit fc36947

Please sign in to comment.