Skip to content

Commit

Permalink
move null counting to value encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidl committed Jul 25, 2023
1 parent e1b4632 commit 792077e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 63 deletions.
22 changes: 9 additions & 13 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,6 @@ __global__ void __launch_bounds__(128, 8)
}
}
__syncthreads();
uint32_t num_nulls = 0;
while (s->rle_numvals < s->page.num_rows) {
uint32_t rle_numvals = s->rle_numvals;
uint32_t nrows = min(s->page.num_rows - rle_numvals, 128);
Expand All @@ -1026,8 +1025,6 @@ __global__ void __launch_bounds__(128, 8)
++def;
} else {
// We have found the shallowest level at which this row is null
// TODO: need to test with struct with nulls
num_nulls++;
break;
}
}
Expand All @@ -1047,8 +1044,6 @@ __global__ void __launch_bounds__(128, 8)
__syncthreads();
}

uint32_t const null_count = block_reduce(temp_storage.reduce_storage).Sum(num_nulls);

if (t < 32) {
uint8_t* const cur = s->cur;
uint8_t* const rle_out = s->rle_out;
Expand All @@ -1059,10 +1054,7 @@ __global__ void __launch_bounds__(128, 8)
cur[t] = rle_bytes >> (t * 8);
}
__syncwarp();
if (t == 0) {
s->cur = rle_out;
s->page.num_nulls = null_count;
}
if (t == 0) { s->cur = rle_out; }
}
}
} else if (s->page.page_type != PageType::DICTIONARY_PAGE &&
Expand Down Expand Up @@ -1105,10 +1097,7 @@ __global__ void __launch_bounds__(128, 8)
cur[t] = rle_bytes >> (t * 8);
}
__syncwarp();
if (t == 0) {
s->cur = rle_out;
s->page.num_nulls = s->page.num_values - s->page.num_leaf_values;
}
if (t == 0) { s->cur = rle_out; }
}
};
encode_levels(s->col.rep_values, s->col.num_rep_level_bits(), s->page.rep_lvl_bytes);
Expand Down Expand Up @@ -1144,6 +1133,7 @@ __global__ void __launch_bounds__(128, 8)
s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col);
}
__syncthreads();
uint32_t num_valid = 0;
for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) {
uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128);
uint32_t len, pos;
Expand All @@ -1170,6 +1160,8 @@ __global__ void __launch_bounds__(128, 8)
return std::make_tuple(is_valid, val_idx);
}();

if (cur_val_idx + t < s->page.num_leaf_values && is_valid) num_valid++;

cur_val_idx += nvals;
if (dict_bits >= 0) {
// Dictionary encoding
Expand Down Expand Up @@ -1342,7 +1334,11 @@ __global__ void __launch_bounds__(128, 8)
__syncthreads();
}
}

uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid);

if (t == 0) {
s->page.num_nulls = s->page.num_values - valid_count;
uint8_t* base = s->page.page_data + s->page.max_hdr_size;
auto actual_data_size = static_cast<uint32_t>(s->cur - base);
if (actual_data_size > s->page.max_data_size) {
Expand Down
77 changes: 27 additions & 50 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4127,6 +4127,8 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndex)
{
constexpr auto num_rows = 100000;
auto const is_v2 = GetParam();
auto const expected_hdr_type =
is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE;

// fixed length strings
auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) {
Expand Down Expand Up @@ -4186,15 +4188,9 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndex)
for (size_t o = 0; o < oi.page_locations.size(); o++) {
auto const& page_loc = oi.page_locations[o];
auto const ph = read_page_header(source, page_loc);
if (is_v2) {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE_V2);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += ph.data_page_header_v2.num_values;
} else {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += ph.data_page_header.num_values;
}
EXPECT_EQ(ph.type, expected_hdr_type);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values;
}

// loop over page stats from the column index. check that stats.min <= page.min
Expand Down Expand Up @@ -4222,6 +4218,8 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexNulls)
{
constexpr auto num_rows = 100000;
auto const is_v2 = GetParam();
auto const expected_hdr_type =
is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE;

// fixed length strings
auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) {
Expand Down Expand Up @@ -4291,15 +4289,9 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexNulls)
for (size_t o = 0; o < oi.page_locations.size(); o++) {
auto const& page_loc = oi.page_locations[o];
auto const ph = read_page_header(source, page_loc);
if (is_v2) {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE_V2);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += ph.data_page_header_v2.num_values;
} else {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += ph.data_page_header.num_values;
}
EXPECT_EQ(ph.type, expected_hdr_type);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values;
}

// loop over page stats from the column index. check that stats.min <= page.min
Expand Down Expand Up @@ -4333,6 +4325,8 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexNullColumn)
{
constexpr auto num_rows = 100000;
auto const is_v2 = GetParam();
auto const expected_hdr_type =
is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE;

// fixed length strings
auto str1_elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) {
Expand Down Expand Up @@ -4387,15 +4381,9 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexNullColumn)
for (size_t o = 0; o < oi.page_locations.size(); o++) {
auto const& page_loc = oi.page_locations[o];
auto const ph = read_page_header(source, page_loc);
if (is_v2) {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE_V2);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += ph.data_page_header_v2.num_values;
} else {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += ph.data_page_header.num_values;
}
EXPECT_EQ(ph.type, expected_hdr_type);
EXPECT_EQ(page_loc.first_row_index, num_vals);
num_vals += is_v2 ? ph.data_page_header_v2.num_rows : ph.data_page_header.num_values;
}

// loop over page stats from the column index. check that stats.min <= page.min
Expand Down Expand Up @@ -4432,6 +4420,8 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexNullColumn)
TEST_P(ParquetV2Test, CheckColumnOffsetIndexStruct)
{
auto const is_v2 = GetParam();
auto const expected_hdr_type =
is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE;

auto c0 = testdata::ascending<uint32_t>();

Expand Down Expand Up @@ -4487,17 +4477,11 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexStruct)
for (size_t o = 0; o < oi.page_locations.size(); o++) {
auto const& page_loc = oi.page_locations[o];
auto const ph = read_page_header(source, page_loc);
if (is_v2) {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE_V2);
// last column has 2 values per row
EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals);
num_vals += ph.data_page_header_v2.num_values;
} else {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE);
// last column has 2 values per row
EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals);
num_vals += ph.data_page_header.num_values;
}
EXPECT_EQ(ph.type, expected_hdr_type);
EXPECT_EQ(page_loc.first_row_index, num_vals);
// last column has 2 values per row
num_vals += is_v2 ? ph.data_page_header_v2.num_rows
: ph.data_page_header.num_values / (c == rg.columns.size() - 1 ? 2 : 1);
}

// loop over page stats from the column index. check that stats.min <= page.min
Expand All @@ -4520,6 +4504,8 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexStruct)
TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls)
{
auto const is_v2 = GetParam();
auto const expected_hdr_type =
is_v2 ? cudf::io::parquet::PageType::DATA_PAGE_V2 : cudf::io::parquet::PageType::DATA_PAGE;

using cudf::test::iterators::null_at;
using cudf::test::iterators::nulls_at;
Expand Down Expand Up @@ -4625,21 +4611,12 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls)
// the first row index is correct
auto const oi = read_offset_index(source, chunk);

int64_t num_vals = 0;
for (size_t o = 0; o < oi.page_locations.size(); o++) {
auto const& page_loc = oi.page_locations[o];
auto const ph = read_page_header(source, page_loc);
if (is_v2) {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE_V2);
// last column has 2 values per row
EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals);
num_vals += ph.data_page_header_v2.num_values;
} else {
EXPECT_EQ(ph.type, cudf::io::parquet::PageType::DATA_PAGE);
// last column has 2 values per row
EXPECT_EQ(page_loc.first_row_index * (c == rg.columns.size() - 1 ? 2 : 1), num_vals);
num_vals += ph.data_page_header.num_values;
}
EXPECT_EQ(ph.type, expected_hdr_type);
// check null counts in V2 header
if (is_v2) { EXPECT_EQ(ph.data_page_header_v2.num_nulls, expected_null_counts[c]); }
}

// check null counts in column chunk stats and page indexes
Expand Down

0 comments on commit 792077e

Please sign in to comment.