diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index f6a5c97e059..ad090be99f3 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -355,15 +355,8 @@ ConfigureNVBench( # ################################################################################################## # * strings benchmark ------------------------------------------------------------------- ConfigureBench( - STRINGS_BENCH - string/convert_datetime.cpp - string/convert_durations.cpp - string/factory.cu - string/filter.cpp - string/repeat_strings.cpp - string/replace.cpp - string/translate.cpp - string/url_decode.cu + STRINGS_BENCH string/factory.cu string/filter.cpp string/repeat_strings.cpp string/replace.cpp + string/translate.cpp string/url_decode.cu ) ConfigureNVBench( @@ -372,6 +365,8 @@ ConfigureNVBench( string/char_types.cpp string/combine.cpp string/contains.cpp + string/convert_datetime.cpp + string/convert_durations.cpp string/convert_fixed_point.cpp string/convert_numerics.cpp string/copy.cpp diff --git a/cpp/benchmarks/common/generate_input.cu b/cpp/benchmarks/common/generate_input.cu index bdce8a31176..8bce718c7d8 100644 --- a/cpp/benchmarks/common/generate_input.cu +++ b/cpp/benchmarks/common/generate_input.cu @@ -23,11 +23,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -540,7 +542,7 @@ struct string_generator { // range 32-127 is ASCII; 127-136 will be multi-byte UTF-8 { } - __device__ void operator()(thrust::tuple str_begin_end) + __device__ void operator()(thrust::tuple str_begin_end) { auto begin = thrust::get<0>(str_begin_end); auto end = thrust::get<1>(str_begin_end); @@ -569,6 +571,9 @@ std::unique_ptr create_random_utf8_string_column(data_profile cons distribution_params{1. - profile.get_null_probability().value_or(0)}); auto lengths = len_dist(engine, num_rows + 1); auto null_mask = valid_dist(engine, num_rows + 1); + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + thrust::transform_if( thrust::device, lengths.begin(), @@ -580,28 +585,26 @@ std::unique_ptr create_random_utf8_string_column(data_profile cons auto valid_lengths = thrust::make_transform_iterator( thrust::make_zip_iterator(thrust::make_tuple(lengths.begin(), null_mask.begin())), valid_or_zero{}); - rmm::device_uvector offsets(num_rows + 1, cudf::get_default_stream()); - thrust::exclusive_scan( - thrust::device, valid_lengths, valid_lengths + lengths.size(), offsets.begin()); - // offsets are ready. - auto chars_length = *thrust::device_pointer_cast(offsets.end() - 1); + + // offsets are created as INT32 or INT64 as appropriate + auto [offsets, chars_length] = cudf::strings::detail::make_offsets_child_column( + valid_lengths, valid_lengths + num_rows, stream, mr); + // use the offsetalator to normalize the offset values for use by the string_generator + auto offsets_itr = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view()); rmm::device_uvector chars(chars_length, cudf::get_default_stream()); thrust::for_each_n(thrust::device, - thrust::make_zip_iterator(offsets.begin(), offsets.begin() + 1), + thrust::make_zip_iterator(offsets_itr, offsets_itr + 1), num_rows, string_generator{chars.data(), engine}); + auto [result_bitmask, null_count] = - cudf::detail::valid_if(null_mask.begin(), - null_mask.end() - 1, - thrust::identity{}, - cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + profile.get_null_probability().has_value() + ? cudf::detail::valid_if( + null_mask.begin(), null_mask.end() - 1, thrust::identity{}, stream, mr) + : std::pair{rmm::device_buffer{}, 0}; + return cudf::make_strings_column( - num_rows, - std::make_unique(std::move(offsets), rmm::device_buffer{}, 0), - chars.release(), - null_count, - profile.get_null_probability().has_value() ? std::move(result_bitmask) : rmm::device_buffer{}); + num_rows, std::move(offsets), chars.release(), null_count, std::move(result_bitmask)); } /** diff --git a/cpp/benchmarks/io/nvbench_helpers.hpp b/cpp/benchmarks/io/nvbench_helpers.hpp index 1e3ab2b7b4f..cc548ccd3de 100644 --- a/cpp/benchmarks/io/nvbench_helpers.hpp +++ b/cpp/benchmarks/io/nvbench_helpers.hpp @@ -28,6 +28,7 @@ enum class data_type : int32_t { INTEGRAL = static_cast(type_group_id::INTEGRAL), INTEGRAL_SIGNED = static_cast(type_group_id::INTEGRAL_SIGNED), FLOAT = static_cast(type_group_id::FLOATING_POINT), + BOOL8 = static_cast(cudf::type_id::BOOL8), DECIMAL = static_cast(type_group_id::FIXED_POINT), TIMESTAMP = static_cast(type_group_id::TIMESTAMP), DURATION = static_cast(type_group_id::DURATION), @@ -44,6 +45,7 @@ NVBENCH_DECLARE_ENUM_TYPE_STRINGS( case data_type::INTEGRAL: return "INTEGRAL"; case data_type::INTEGRAL_SIGNED: return "INTEGRAL_SIGNED"; case data_type::FLOAT: return "FLOAT"; + case data_type::BOOL8: return "BOOL8"; case data_type::DECIMAL: return "DECIMAL"; case data_type::TIMESTAMP: return "TIMESTAMP"; case data_type::DURATION: return "DURATION"; diff --git a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp index ce115fd7723..b14f9cbb67e 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp @@ -114,6 +114,7 @@ void BM_parquet_read_io_compression(nvbench::state& state) { auto const d_type = get_type_or_group({static_cast(data_type::INTEGRAL), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), @@ -298,6 +299,7 @@ void BM_parquet_read_wide_tables_mixed(nvbench::state& state) using d_type_list = nvbench::enum_type_list(data_type::INTEGRAL), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), diff --git a/cpp/benchmarks/io/parquet/parquet_writer.cpp b/cpp/benchmarks/io/parquet/parquet_writer.cpp index 256e50f0e64..84e4b8b93c0 100644 --- a/cpp/benchmarks/io/parquet/parquet_writer.cpp +++ b/cpp/benchmarks/io/parquet/parquet_writer.cpp @@ -89,6 +89,7 @@ void BM_parq_write_io_compression( { auto const data_types = get_type_or_group({static_cast(data_type::INTEGRAL), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), @@ -143,6 +144,7 @@ void BM_parq_write_varying_options( auto const data_types = get_type_or_group({static_cast(data_type::INTEGRAL_SIGNED), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), @@ -181,6 +183,7 @@ void BM_parq_write_varying_options( using d_type_list = nvbench::enum_type_list #include -#include -#include #include #include +#include #include -class StringDateTime : public cudf::benchmark {}; +#include -enum class direction { to, from }; +NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_D, "cudf::timestamp_D", "cudf::timestamp_D"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_s, "cudf::timestamp_s", "cudf::timestamp_s"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_ms, "cudf::timestamp_ms", "cudf::timestamp_ms"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_us, "cudf::timestamp_us", "cudf::timestamp_us"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_ns, "cudf::timestamp_ns", "cudf::timestamp_ns"); -template -void BM_convert_datetime(benchmark::State& state, direction dir) +using Types = nvbench::type_list; + +template +void bench_convert_datetime(nvbench::state& state, nvbench::type_list) { - auto const n_rows = static_cast(state.range(0)); - auto const data_type = cudf::data_type(cudf::type_to_id()); + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const from_ts = state.get_string("dir") == "from"; - auto const column = create_random_column(data_type.id(), row_count{n_rows}); - cudf::column_view input(column->view()); + auto const data_type = cudf::data_type(cudf::type_to_id()); + auto const ts_col = create_random_column(data_type.id(), row_count{num_rows}); - auto source = dir == direction::to ? cudf::strings::from_timestamps(input, "%Y-%m-%d %H:%M:%S") - : make_empty_column(cudf::data_type{cudf::type_id::STRING}); - cudf::strings_column_view source_string(source->view()); + auto format = std::string{"%Y-%m-%d %H:%M:%S"}; + auto s_col = cudf::strings::from_timestamps(ts_col->view(), format); + auto sv = cudf::strings_column_view(s_col->view()); - for (auto _ : state) { - cuda_event_timer raii(state, true); - if (dir == direction::to) - cudf::strings::to_timestamps(source_string, data_type, "%Y-%m-%d %H:%M:%S"); - else - cudf::strings::from_timestamps(input, "%Y-%m-%d %H:%M:%S"); - } + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); - auto const bytes = dir == direction::to ? source_string.chars_size(cudf::get_default_stream()) - : n_rows * sizeof(TypeParam); - state.SetBytesProcessed(state.iterations() * bytes); + if (from_ts) { + state.add_global_memory_reads(num_rows); + state.add_global_memory_writes(sv.chars_size(stream)); + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::strings::from_timestamps(ts_col->view(), format); + }); + } else { + state.add_global_memory_reads(sv.chars_size(stream)); + state.add_global_memory_writes(num_rows); + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::strings::to_timestamps(sv, data_type, format); + }); + } } -#define STR_BENCHMARK_DEFINE(name, type, dir) \ - BENCHMARK_DEFINE_F(StringDateTime, name)(::benchmark::State & state) \ - { \ - BM_convert_datetime(state, dir); \ - } \ - BENCHMARK_REGISTER_F(StringDateTime, name) \ - ->RangeMultiplier(1 << 5) \ - ->Range(1 << 10, 1 << 25) \ - ->UseManualTime() \ - ->Unit(benchmark::kMicrosecond); - -STR_BENCHMARK_DEFINE(from_days, cudf::timestamp_D, direction::from); -STR_BENCHMARK_DEFINE(from_seconds, cudf::timestamp_s, direction::from); -STR_BENCHMARK_DEFINE(from_mseconds, cudf::timestamp_ms, direction::from); -STR_BENCHMARK_DEFINE(from_useconds, cudf::timestamp_us, direction::from); -STR_BENCHMARK_DEFINE(from_nseconds, cudf::timestamp_ns, direction::from); - -STR_BENCHMARK_DEFINE(to_days, cudf::timestamp_D, direction::to); -STR_BENCHMARK_DEFINE(to_seconds, cudf::timestamp_s, direction::to); -STR_BENCHMARK_DEFINE(to_mseconds, cudf::timestamp_ms, direction::to); -STR_BENCHMARK_DEFINE(to_useconds, cudf::timestamp_us, direction::to); -STR_BENCHMARK_DEFINE(to_nseconds, cudf::timestamp_ns, direction::to); +NVBENCH_BENCH_TYPES(bench_convert_datetime, NVBENCH_TYPE_AXES(Types)) + .set_name("datetime") + .set_type_axes_names({"DataType"}) + .add_string_axis("dir", {"to", "from"}) + .add_int64_axis("num_rows", {1 << 16, 1 << 18, 1 << 20, 1 << 22}); diff --git a/cpp/benchmarks/string/convert_durations.cpp b/cpp/benchmarks/string/convert_durations.cpp index f12d292c2e7..9d2377f2d82 100644 --- a/cpp/benchmarks/string/convert_durations.cpp +++ b/cpp/benchmarks/string/convert_durations.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,92 +14,60 @@ * limitations under the License. */ +#include #include -#include - -#include #include #include -#include +#include #include -#include -#include - -class DurationsToString : public cudf::benchmark {}; -template -void BM_convert_from_durations(benchmark::State& state) -{ - cudf::size_type const source_size = state.range(0); - - // Every element is valid - auto data = cudf::detail::make_counting_transform_iterator( - 0, [source_size](auto i) { return TypeParam{i - source_size / 2}; }); +#include - cudf::test::fixed_width_column_wrapper source_durations(data, data + source_size); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::duration_D, "cudf::duration_D", "cudf::duration_D"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::duration_s, "cudf::duration_s", "cudf::duration_s"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::duration_ms, "cudf::duration_ms", "cudf::duration_ms"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::duration_us, "cudf::duration_us", "cudf::duration_us"); +NVBENCH_DECLARE_TYPE_STRINGS(cudf::duration_ns, "cudf::duration_ns", "cudf::duration_ns"); - for (auto _ : state) { - cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 - cudf::strings::from_durations(source_durations, "%D days %H:%M:%S"); - } - - state.SetBytesProcessed(state.iterations() * source_size * sizeof(TypeParam)); -} +using Types = nvbench::type_list; -class StringToDurations : public cudf::benchmark {}; -template -void BM_convert_to_durations(benchmark::State& state) +template +void bench_convert_duration(nvbench::state& state, nvbench::type_list) { - cudf::size_type const source_size = state.range(0); - - // Every element is valid - auto data = cudf::detail::make_counting_transform_iterator( - 0, [source_size](auto i) { return TypeParam{i - source_size / 2}; }); - - cudf::test::fixed_width_column_wrapper source_durations(data, data + source_size); - auto results = cudf::strings::from_durations(source_durations, "%D days %H:%M:%S"); - cudf::strings_column_view source_string(*results); - auto output_type = cudf::data_type(cudf::type_to_id()); - - for (auto _ : state) { - cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 - cudf::strings::to_durations(source_string, output_type, "%D days %H:%M:%S"); + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const data_type = cudf::data_type(cudf::type_to_id()); + auto const from_dur = state.get_string("dir") == "from"; + + auto const ts_col = create_random_column(data_type.id(), row_count{num_rows}); + cudf::column_view input(ts_col->view()); + + auto format = std::string{"%D days %H:%M:%S"}; + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + + if (from_dur) { + state.add_global_memory_reads(num_rows); + state.add_global_memory_writes(format.size() * num_rows); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::from_durations(input, format); }); + } else { + auto source = cudf::strings::from_durations(input, format); + auto view = cudf::strings_column_view(source->view()); + state.add_global_memory_reads(view.chars_size(stream)); + state.add_global_memory_writes(num_rows); + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::strings::to_durations(view, data_type, format); + }); } - - state.SetBytesProcessed(state.iterations() * source_size * sizeof(TypeParam)); } -#define DSBM_BENCHMARK_DEFINE(name, type) \ - BENCHMARK_DEFINE_F(DurationsToString, name)(::benchmark::State & state) \ - { \ - BM_convert_from_durations(state); \ - } \ - BENCHMARK_REGISTER_F(DurationsToString, name) \ - ->RangeMultiplier(1 << 5) \ - ->Range(1 << 10, 1 << 25) \ - ->UseManualTime() \ - ->Unit(benchmark::kMicrosecond); - -#define SDBM_BENCHMARK_DEFINE(name, type) \ - BENCHMARK_DEFINE_F(StringToDurations, name)(::benchmark::State & state) \ - { \ - BM_convert_to_durations(state); \ - } \ - BENCHMARK_REGISTER_F(StringToDurations, name) \ - ->RangeMultiplier(1 << 5) \ - ->Range(1 << 10, 1 << 25) \ - ->UseManualTime() \ - ->Unit(benchmark::kMicrosecond); - -DSBM_BENCHMARK_DEFINE(from_durations_D, cudf::duration_D); -DSBM_BENCHMARK_DEFINE(from_durations_s, cudf::duration_s); -DSBM_BENCHMARK_DEFINE(from_durations_ms, cudf::duration_ms); -DSBM_BENCHMARK_DEFINE(from_durations_us, cudf::duration_us); -DSBM_BENCHMARK_DEFINE(from_durations_ns, cudf::duration_ns); - -SDBM_BENCHMARK_DEFINE(to_durations_D, cudf::duration_D); -SDBM_BENCHMARK_DEFINE(to_durations_s, cudf::duration_s); -SDBM_BENCHMARK_DEFINE(to_durations_ms, cudf::duration_ms); -SDBM_BENCHMARK_DEFINE(to_durations_us, cudf::duration_us); -SDBM_BENCHMARK_DEFINE(to_durations_ns, cudf::duration_ns); +NVBENCH_BENCH_TYPES(bench_convert_duration, NVBENCH_TYPE_AXES(Types)) + .set_name("duration") + .set_type_axes_names({"DataType"}) + .add_string_axis("dir", {"to", "from"}) + .add_int64_axis("num_rows", {1 << 10, 1 << 15, 1 << 20, 1 << 25}); diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index b662b660557..7cd4697f592 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -18,6 +18,7 @@ #include "types.hpp" +#include #include #include #include @@ -53,6 +54,11 @@ struct schema_element { * @brief Allows specifying this column's child columns target type */ std::map child_types; + + /** + * @brief Allows specifying the order of the columns + */ + std::optional> column_order; }; /** @@ -87,13 +93,18 @@ enum class json_recovery_mode_t { * | `chunksize` | use `byte_range_xxx` for chunking instead | */ class json_reader_options { + public: + using dtype_variant = + std::variant, + std::map, + std::map, + schema_element>; ///< Variant type holding dtypes information for the columns + + private: source_info _source; // Data types of the column; empty to infer dtypes - std::variant, - std::map, - std::map> - _dtypes; + dtype_variant _dtypes; // Specify the compression format of the source or infer from file extension compression_type _compression = compression_type::AUTO; @@ -178,13 +189,7 @@ class json_reader_options { * * @returns Data types of the columns */ - [[nodiscard]] std::variant, - std::map, - std::map> const& - get_dtypes() const - { - return _dtypes; - } + [[nodiscard]] dtype_variant const& get_dtypes() const { return _dtypes; } /** * @brief Returns compression format of the source. @@ -228,7 +233,11 @@ class json_reader_options { */ [[nodiscard]] size_t get_byte_range_padding() const { - auto const num_columns = std::visit([](auto const& dtypes) { return dtypes.size(); }, _dtypes); + auto const num_columns = + std::visit(cudf::detail::visitor_overload{ + [](auto const& dtypes) { return dtypes.size(); }, + [](schema_element const& dtypes) { return dtypes.child_types.size(); }}, + _dtypes); auto const max_row_bytes = 16 * 1024; // 16KB auto const column_bytes = 64; @@ -390,6 +399,14 @@ class json_reader_options { */ void set_dtypes(std::map types) { _dtypes = std::move(types); } + /** + * @brief Set data types for a potentially nested column hierarchy. + * + * @param types schema element with column names and column order to support arbitrary nesting of + * data types + */ + void set_dtypes(schema_element types); + /** * @brief Set the compression type. * @@ -624,6 +641,18 @@ class json_reader_options_builder { return *this; } + /** + * @brief Set data types for columns to be read. + * + * @param types Struct schema_element with Column name -> schema_element with map and order + * @return this for chaining + */ + json_reader_options_builder& dtypes(schema_element types) + { + options.set_dtypes(std::move(types)); + return *this; + } + /** * @brief Set the compression type. * diff --git a/cpp/src/io/json/host_tree_algorithms.cu b/cpp/src/io/json/host_tree_algorithms.cu index 570a00cbfc2..7fafa885c66 100644 --- a/cpp/src/io/json/host_tree_algorithms.cu +++ b/cpp/src/io/json/host_tree_algorithms.cu @@ -269,7 +269,8 @@ std::map unified_schema(cudf::io::json_reader_optio }); return dnew; }, - [](std::map const& user_dtypes) { return user_dtypes; }}, + [](std::map const& user_dtypes) { return user_dtypes; }, + [](schema_element const& user_dtypes) { return user_dtypes.child_types; }}, options.get_dtypes()); } @@ -492,7 +493,7 @@ std::pair, hashmap_of_device_columns> build_tree auto expected_types = cudf::detail::make_host_vector(num_columns, stream); std::fill_n(expected_types.begin(), num_columns, NUM_NODE_CLASSES); - auto lookup_names = [&column_names](auto child_ids, auto name) { + auto lookup_names = [&column_names](auto const& child_ids, auto const& name) { for (auto const& child_id : child_ids) { if (column_names[child_id] == name) return child_id; } @@ -569,7 +570,7 @@ std::pair, hashmap_of_device_columns> build_tree for (size_t i = 0; i < adj[root_list_col_id].size() && i < user_dtypes.size(); i++) { NodeIndexT const first_child_id = adj[root_list_col_id][i]; - auto name = column_names[first_child_id]; + auto const& name = column_names[first_child_id]; auto value_id = std::stol(name); if (value_id >= 0 and value_id < static_cast(user_dtypes.size())) mark_is_pruned(first_child_id, schema_element{user_dtypes[value_id]}); @@ -580,7 +581,7 @@ std::pair, hashmap_of_device_columns> build_tree std::map const& user_dtypes) -> void { for (size_t i = 0; i < adj[root_list_col_id].size(); i++) { auto const first_child_id = adj[root_list_col_id][i]; - auto name = column_names[first_child_id]; + auto const& name = column_names[first_child_id]; if (user_dtypes.count(name)) mark_is_pruned(first_child_id, schema_element{user_dtypes.at(name)}); } @@ -589,10 +590,19 @@ std::pair, hashmap_of_device_columns> build_tree std::map const& user_dtypes) -> void { for (size_t i = 0; i < adj[root_list_col_id].size(); i++) { auto const first_child_id = adj[root_list_col_id][i]; - auto name = column_names[first_child_id]; + auto const& name = column_names[first_child_id]; if (user_dtypes.count(name)) mark_is_pruned(first_child_id, user_dtypes.at(name)); } + }, + [&root_list_col_id, &adj, &mark_is_pruned, &column_names]( + schema_element const& user_dtypes) -> void { + for (size_t i = 0; i < adj[root_list_col_id].size(); i++) { + auto const first_child_id = adj[root_list_col_id][i]; + auto const& name = column_names[first_child_id]; + if (user_dtypes.child_types.count(name) != 0) + mark_is_pruned(first_child_id, user_dtypes.child_types.at(name)); + } }}, options.get_dtypes()); } else { @@ -626,7 +636,9 @@ std::pair, hashmap_of_device_columns> build_tree [&root_struct_col_id, &adj, &mark_is_pruned, &u_schema]( std::map const& user_dtypes) -> void { mark_is_pruned(root_struct_col_id, u_schema); - }}, + }, + [&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](schema_element const& user_dtypes) + -> void { mark_is_pruned(root_struct_col_id, u_schema); }}, options.get_dtypes()); } // Useful for array of arrays @@ -714,7 +726,7 @@ std::pair, hashmap_of_device_columns> build_tree if (expected_category == NC_STRUCT) { // find field column ids, and its children and create columns. for (auto const& field_id : child_ids) { - auto name = column_names[field_id]; + auto const& name = column_names[field_id]; if (is_pruned[field_id]) continue; auto inserted = ref.get().child_columns.try_emplace(name, device_json_column(stream, mr)).second; @@ -745,7 +757,7 @@ std::pair, hashmap_of_device_columns> build_tree std::map> array_values; for (auto const& child_id : child_ids) { if (is_pruned[child_id]) continue; - auto name = column_names[child_id]; + auto const& name = column_names[child_id]; array_values[std::stoi(name)].push_back(child_id); } // diff --git a/cpp/src/io/json/json_column.cu b/cpp/src/io/json/json_column.cu index 7e4d975e431..30a154fdda2 100644 --- a/cpp/src/io/json/json_column.cu +++ b/cpp/src/io/json/json_column.cu @@ -399,9 +399,9 @@ std::pair, std::vector> device_json_co // - String columns will be returned as nullable, iff there's at least one null entry if (col->null_count() == 0) { col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0); } - // For string columns return ["offsets", "char"] schema + // For string columns return ["offsets"] schema if (target_type.id() == type_id::STRING) { - return {std::move(col), std::vector{{"offsets"}, {"chars"}}}; + return {std::move(col), std::vector{{"offsets"}}}; } // Non-string leaf-columns (e.g., numeric) do not have child columns in the schema return {std::move(col), std::vector{}}; @@ -410,12 +410,37 @@ std::pair, std::vector> device_json_co std::vector> child_columns; std::vector column_names{}; size_type num_rows{json_col.num_rows}; + + bool const has_column_order = + prune_columns and not schema.value_or(schema_element{}) + .column_order.value_or(std::vector{}) + .empty(); + + auto const& col_order = + has_column_order ? schema.value().column_order.value() : json_col.column_order; + // Create children columns - for (auto const& col_name : json_col.column_order) { - auto const& col = json_col.child_columns.find(col_name); - column_names.emplace_back(col->first); - auto& child_col = col->second; + for (auto const& col_name : col_order) { auto child_schema_element = get_child_schema(col_name); + auto const found_it = json_col.child_columns.find(col_name); + + if (prune_columns and found_it == std::end(json_col.child_columns)) { + CUDF_EXPECTS(child_schema_element.has_value(), + "Column name not found in input schema map, but present in column order and " + "prune_columns is enabled"); + column_names.emplace_back(make_column_name_info( + child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}), col_name)); + auto all_null_column = make_all_nulls_column( + child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}), + num_rows, + stream, + mr); + child_columns.emplace_back(std::move(all_null_column)); + continue; + } + column_names.emplace_back(found_it->first); + + auto& child_col = found_it->second; if (!prune_columns or child_schema_element.has_value()) { auto [child_column, names] = device_json_column_to_cudf_column( child_col, d_input, options, prune_columns, child_schema_element, stream, mr); @@ -576,11 +601,21 @@ table_with_metadata device_parse_nested_json(device_span d_input, std::vector out_column_names; auto parse_opt = parsing_options(options, stream); - // Iterate over the struct's child columns and convert to cudf column - size_type column_index = 0; - for (auto const& col_name : root_struct_col.column_order) { - auto& json_col = root_struct_col.child_columns.find(col_name)->second; + schema_element const* prune_schema = std::get_if(&options.get_dtypes()); + bool const has_column_order = options.is_enabled_prune_columns() and prune_schema != nullptr and + prune_schema->column_order.has_value() and + not prune_schema->column_order->empty(); + auto const& col_order = + has_column_order ? prune_schema->column_order.value() : root_struct_col.column_order; + if (has_column_order) { + CUDF_EXPECTS(prune_schema->child_types.size() == col_order.size(), + "Input schema column order size mismatch with input schema child types"); + } + auto root_col_size = root_struct_col.num_rows; + // Iterate over the struct's child columns/column_order and convert to cudf column + size_type column_index = 0; + for (auto const& col_name : col_order) { std::optional child_schema_element = std::visit( cudf::detail::visitor_overload{ [column_index](std::vector const& user_dtypes) -> std::optional { @@ -590,17 +625,23 @@ table_with_metadata device_parse_nested_json(device_span d_input, }, [col_name]( std::map const& user_dtypes) -> std::optional { - return (user_dtypes.find(col_name) != std::end(user_dtypes)) - ? std::optional{{user_dtypes.find(col_name)->second}} - : std::optional{}; + if (auto it = user_dtypes.find(col_name); it != std::end(user_dtypes)) + return std::optional{{it->second}}; + return std::nullopt; }, [col_name](std::map const& user_dtypes) -> std::optional { - return (user_dtypes.find(col_name) != std::end(user_dtypes)) - ? user_dtypes.find(col_name)->second - : std::optional{}; + if (auto it = user_dtypes.find(col_name); it != std::end(user_dtypes)) return it->second; + return std::nullopt; + }, + [col_name](schema_element const& user_dtypes) -> std::optional { + if (auto it = user_dtypes.child_types.find(col_name); + it != std::end(user_dtypes.child_types)) + return it->second; + return std::nullopt; }}, options.get_dtypes()); + #ifdef NJP_DEBUG_PRINT auto debug_schema_print = [](auto ret) { std::cout << ", type id: " @@ -608,20 +649,39 @@ table_with_metadata device_parse_nested_json(device_span d_input, << ", with " << (ret.has_value() ? ret->child_types.size() : 0) << " children" << "\n"; }; - std::visit( - cudf::detail::visitor_overload{[column_index](std::vector const&) { - std::cout << "Column by index: #" << column_index; - }, - [col_name](std::map const&) { - std::cout << "Column by flat name: '" << col_name; - }, - [col_name](std::map const&) { - std::cout << "Column by nested name: #" << col_name; - }}, - options.get_dtypes()); + std::visit(cudf::detail::visitor_overload{ + [column_index](std::vector const&) { + std::cout << "Column by index: #" << column_index; + }, + [col_name](std::map const&) { + std::cout << "Column by flat name: '" << col_name; + }, + [col_name](std::map const&) { + std::cout << "Column by nested name: #" << col_name; + }, + [col_name](schema_element const&) { + std::cout << "Column by nested schema with column order: #" << col_name; + }}, + options.get_dtypes()); debug_schema_print(child_schema_element); #endif + auto const found_it = root_struct_col.child_columns.find(col_name); + if (options.is_enabled_prune_columns() and + found_it == std::end(root_struct_col.child_columns)) { + CUDF_EXPECTS(child_schema_element.has_value(), + "Column name not found in input schema map, but present in column order and " + "prune_columns is enabled"); + // inserts all null column + out_column_names.emplace_back(make_column_name_info(child_schema_element.value(), col_name)); + auto all_null_column = + make_all_nulls_column(child_schema_element.value(), root_col_size, stream, mr); + out_columns.emplace_back(std::move(all_null_column)); + column_index++; + continue; + } + auto& json_col = found_it->second; + if (!options.is_enabled_prune_columns() or child_schema_element.has_value()) { // Get this JSON column's cudf column and schema info, (modifies json_col) auto [cudf_col, col_name_info] = diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index 7b3b04dea16..4989fff4b30 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -429,6 +429,29 @@ table_with_metadata device_parse_nested_json(device_span input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +/** + * @brief Create all null column of a given nested schema + * + * @param schema The schema of the column to create + * @param num_rows The number of rows in the column + * @param stream The CUDA stream to which kernels are dispatched + * @param mr resource with which to allocate + * @return The all null column + */ +std::unique_ptr make_all_nulls_column(schema_element const& schema, + size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Create metadata for a column of a given schema + * + * @param schema The schema of the column + * @param col_name The name of the column + * @return column metadata for a given schema + */ +column_name_info make_column_name_info(schema_element const& schema, std::string const& col_name); + /** * @brief Get the path data type of a column by path if present in input schema * diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index 60e78f4763d..f1c2826c62a 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -2198,9 +2198,9 @@ std::pair, std::vector> json_column_to // - String columns will be returned as nullable, iff there's at least one null entry if (col->null_count() == 0) { col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0); } - // For string columns return ["offsets", "char"] schema + // For string columns return ["offsets"] schema if (target_type.id() == type_id::STRING) { - return {std::move(col), std::vector{{"offsets"}, {"chars"}}}; + return {std::move(col), std::vector{{"offsets"}}}; } // Non-string leaf-columns (e.g., numeric) do not have child columns in the schema else { diff --git a/cpp/src/io/json/parser_features.cpp b/cpp/src/io/json/parser_features.cpp index 4caa5cd9e24..401a6e992de 100644 --- a/cpp/src/io/json/parser_features.cpp +++ b/cpp/src/io/json/parser_features.cpp @@ -16,14 +16,201 @@ #include "nested_json.hpp" +#include +#include +#include #include +#include +#include +#include +#include +#include #include #include #include +namespace cudf::io { +namespace { +bool validate_column_order(schema_element const& types) +{ + // For struct types, check if column_order size matches child_types size and all elements in + // column_order are in child_types, in child_types, call this function recursively. + // For list types, check if child_types size is 1 and call this function recursively. + if (types.type.id() == type_id::STRUCT) { + if (types.column_order.has_value()) { + if (types.column_order.value().size() != types.child_types.size()) { return false; } + for (auto const& column_name : types.column_order.value()) { + auto it = types.child_types.find(column_name); + if (it == types.child_types.end()) { return false; } + if (it->second.type.id() == type_id::STRUCT or it->second.type.id() == type_id::LIST) { + if (!validate_column_order(it->second)) { return false; } + } + } + } + } else if (types.type.id() == type_id::LIST) { + if (types.child_types.size() != 1) { return false; } + auto it = types.child_types.begin(); + if (it->second.type.id() == type_id::STRUCT or it->second.type.id() == type_id::LIST) { + if (!validate_column_order(it->second)) { return false; } + } + } + return true; +} +} // namespace + +void json_reader_options::set_dtypes(schema_element types) +{ + CUDF_EXPECTS( + validate_column_order(types), "Column order does not match child types", std::invalid_argument); + _dtypes = std::move(types); +} +} // namespace cudf::io + namespace cudf::io::json::detail { +/// Created an empty column of the specified schema +struct empty_column_functor { + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + + template ())> + std::unique_ptr operator()(schema_element const& schema) const + { + return make_empty_column(schema.type); + } + + template )> + std::unique_ptr operator()(schema_element const& schema) const + { + CUDF_EXPECTS(schema.child_types.size() == 1, "List column should have only one child"); + auto const& child_name = schema.child_types.begin()->first; + std::unique_ptr child = cudf::type_dispatcher( + schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name)); + auto offsets = make_empty_column(data_type(type_to_id())); + return make_lists_column(0, std::move(offsets), std::move(child), 0, {}, stream, mr); + } + + template )> + std::unique_ptr operator()(schema_element const& schema) const + { + std::vector> child_columns; + for (auto const& child_name : schema.column_order.value_or(std::vector{})) { + child_columns.push_back(cudf::type_dispatcher( + schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name))); + } + return make_structs_column(0, std::move(child_columns), 0, {}, stream, mr); + } +}; + +/// Created all null column of the specified schema +struct allnull_column_functor { + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + + private: + auto make_zeroed_offsets(size_type size) const + { + auto offsets_buff = + cudf::detail::make_zeroed_device_uvector_async(size + 1, stream, mr); + return std::make_unique(std::move(offsets_buff), rmm::device_buffer{}, 0); + } + + public: + template ())> + std::unique_ptr operator()(schema_element const& schema, size_type size) const + { + return make_fixed_width_column(schema.type, size, mask_state::ALL_NULL, stream, mr); + } + + template ())> + std::unique_ptr operator()(schema_element const& schema, size_type size) const + { + CUDF_EXPECTS(schema.child_types.size() == 1, "Dictionary column should have only one child"); + auto const& child_name = schema.child_types.begin()->first; + std::unique_ptr child = cudf::type_dispatcher(schema.child_types.at(child_name).type, + empty_column_functor{stream, mr}, + schema.child_types.at(child_name)); + return make_fixed_width_column(schema.type, size, mask_state::ALL_NULL, stream, mr); + auto indices = make_zeroed_offsets(size - 1); + auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); + return make_dictionary_column( + std::move(child), std::move(indices), std::move(null_mask), size, stream, mr); + } + + template )> + std::unique_ptr operator()(schema_element const& schema, size_type size) const + { + auto offsets = make_zeroed_offsets(size); + auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); + return make_strings_column( + size, std::move(offsets), rmm::device_buffer{}, size, std::move(null_mask)); + } + template )> + std::unique_ptr operator()(schema_element const& schema, size_type size) const + { + CUDF_EXPECTS(schema.child_types.size() == 1, "List column should have only one child"); + auto const& child_name = schema.child_types.begin()->first; + std::unique_ptr child = cudf::type_dispatcher(schema.child_types.at(child_name).type, + empty_column_functor{stream, mr}, + schema.child_types.at(child_name)); + auto offsets = make_zeroed_offsets(size); + auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); + return make_lists_column( + size, std::move(offsets), std::move(child), size, std::move(null_mask), stream, mr); + } + + template )> + std::unique_ptr operator()(schema_element const& schema, size_type size) const + { + std::vector> child_columns; + for (auto const& child_name : schema.column_order.value_or(std::vector{})) { + child_columns.push_back(cudf::type_dispatcher( + schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name), size)); + } + auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); + return make_structs_column( + size, std::move(child_columns), size, std::move(null_mask), stream, mr); + } +}; + +std::unique_ptr make_all_nulls_column(schema_element const& schema, + size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return cudf::type_dispatcher(schema.type, allnull_column_functor{stream, mr}, schema, num_rows); +} + +column_name_info make_column_name_info(schema_element const& schema, std::string const& col_name) +{ + column_name_info info; + info.name = col_name; + switch (schema.type.id()) { + case type_id::STRUCT: + for (auto const& child_name : schema.column_order.value_or(std::vector{})) { + info.children.push_back( + make_column_name_info(schema.child_types.at(child_name), child_name)); + } + break; + case type_id::LIST: + info.children.emplace_back("offsets"); + for (auto const& [child_name, child_schema] : schema.child_types) { + info.children.push_back(make_column_name_info(child_schema, child_name)); + } + break; + case type_id::DICTIONARY32: + info.children.emplace_back("indices"); + for (auto const& [child_name, child_schema] : schema.child_types) { + info.children.push_back(make_column_name_info(child_schema, child_name)); + } + break; + case type_id::STRING: info.children.emplace_back("offsets"); break; + default: break; + } + return info; +} + std::optional child_schema_element(std::string const& col_name, cudf::io::json_reader_options const& options) { @@ -46,6 +233,11 @@ std::optional child_schema_element(std::string const& col_name, return (user_dtypes.find(col_name) != std::end(user_dtypes)) ? user_dtypes.find(col_name)->second : std::optional{}; + }, + [col_name](schema_element const& user_dtypes) -> std::optional { + return (user_dtypes.child_types.find(col_name) != std::end(user_dtypes.child_types)) + ? user_dtypes.child_types.find(col_name)->second + : std::optional{}; }}, options.get_dtypes()); } diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 45380e6ea20..aaf5ebfbe7d 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -147,6 +147,8 @@ __device__ void gpuDecodeFixedWidthValues( } break; } + } else if (dtype == BOOLEAN) { + gpuOutputBoolean(sb, src_pos, static_cast(dst)); } else if (dtype == INT96) { gpuOutputInt96Timestamp(s, sb, src_pos, static_cast(dst)); } else if (dtype_len == 8) { @@ -841,6 +843,33 @@ __device__ inline bool maybe_has_nulls(page_state_s* s) return run_val != s->col.max_level[lvl]; } +template +inline __device__ void bool_plain_decode(page_state_s* s, state_buf* sb, int t, int to_decode) +{ + int pos = s->dict_pos; + int const target_pos = pos + to_decode; + + while (pos < target_pos) { + int const batch_len = min(target_pos - pos, decode_block_size_t); + + if (t < batch_len) { + int const bit_pos = pos + t; + int const byte_offset = bit_pos >> 3; + int const bit_in_byte_index = bit_pos & 7; + + uint8_t const* const read_from = s->data_start + byte_offset; + bool const read_bit = (*read_from) & (1 << bit_in_byte_index); + + int const write_to_index = rolling_index(bit_pos); + sb->dict_idx[write_to_index] = read_bit; + } + + pos += batch_len; + } + + if (t == 0) { s->dict_pos = pos; } +} + template __device__ int skip_decode(stream_type& parquet_stream, int num_to_skip, int t) { @@ -872,14 +901,7 @@ __device__ int skip_decode(stream_type& parquet_stream, int num_to_skip, int t) * @param num_rows Maximum number of rows to read * @param error_code Error code to set if an error is encountered */ -template - typename DecodeValuesFunc> +template CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) gpuDecodePageDataGeneric(PageInfo* pages, device_span chunks, @@ -887,12 +909,33 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) size_t num_rows, kernel_error::pointer error_code) { + constexpr bool has_dict_t = (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST); + constexpr bool has_bools_t = (kernel_mask_t == decode_kernel_mask::BOOLEAN) || + (kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) || + (kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST); + constexpr bool has_nesting_t = + (kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED); + constexpr bool has_lists_t = + (kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST); + constexpr bool split_decode_t = + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST); + constexpr int rolling_buf_size = decode_block_size_t * 2; constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); __shared__ __align__(16) page_state_s state_g; using state_buf_t = page_state_buffers_s; __shared__ __align__(16) state_buf_t state_buffers; @@ -920,32 +963,31 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. if (s->num_rows == 0) { return; } - DecodeValuesFunc decode_values; + using value_decoder_type = std::conditional_t< + split_decode_t, + decode_fixed_width_split_values_func, + decode_fixed_width_values_func>; + value_decoder_type decode_values; bool const should_process_nulls = is_nullable(s) && maybe_has_nulls(s); // shared buffer. all shared memory is suballocated out of here - constexpr int shared_rep_size = - has_lists_t - ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) - : 0; - constexpr int shared_dict_size = - has_dict_t - ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) - : 0; - constexpr int shared_def_size = - cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); - constexpr int shared_buf_size = shared_rep_size + shared_dict_size + shared_def_size; + constexpr int rle_run_buffer_bytes = + cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); + constexpr int shared_buf_size = + rle_run_buffer_bytes * (static_cast(has_dict_t) + static_cast(has_bools_t) + + static_cast(has_lists_t) + 1); __shared__ __align__(16) uint8_t shared_buf[shared_buf_size]; // setup all shared memory buffers - int shared_offset = 0; - rle_run* rep_runs = reinterpret_cast*>(shared_buf + shared_offset); - if constexpr (has_lists_t) { shared_offset += shared_rep_size; } - - rle_run* dict_runs = reinterpret_cast*>(shared_buf + shared_offset); - if constexpr (has_dict_t) { shared_offset += shared_dict_size; } - rle_run* def_runs = reinterpret_cast*>(shared_buf + shared_offset); + int shared_offset = 0; + auto rep_runs = reinterpret_cast(shared_buf + shared_offset); + if constexpr (has_lists_t) { shared_offset += rle_run_buffer_bytes; } + auto dict_runs = reinterpret_cast(shared_buf + shared_offset); + if constexpr (has_dict_t) { shared_offset += rle_run_buffer_bytes; } + auto bool_runs = reinterpret_cast(shared_buf + shared_offset); + if constexpr (has_bools_t) { shared_offset += rle_run_buffer_bytes; } + auto def_runs = reinterpret_cast(shared_buf + shared_offset); // initialize the stream decoders (requires values computed in setupLocalPageInfo) rle_stream def_decoder{def_runs}; @@ -974,6 +1016,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); } + // Use dictionary stream memory for bools + rle_stream bool_stream{bool_runs}; + bool bools_are_rle_stream = (s->dict_run == 0); + if constexpr (has_bools_t) { + if (bools_are_rle_stream) { + bool_stream.init(1, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); + } + } + __syncthreads(); + // We use two counters in the loop below: processed_count and valid_count. // - processed_count: number of values out of num_input_values that we have decoded so far. // the definition stream returns the number of total rows it has processed in each call @@ -1041,13 +1093,20 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) } __syncthreads(); - // if we have dictionary data + // if we have dictionary or bool data + // We want to limit the number of dictionary/bool items we decode, that correspond to + // the rows we have processed in this iteration that are valid. + // We know the number of valid rows to process with: next_valid_count - valid_count. if constexpr (has_dict_t) { - // We want to limit the number of dictionary items we decode, that correspond to - // the rows we have processed in this iteration that are valid. - // We know the number of valid rows to process with: next_valid_count - valid_count. dict_stream.decode_next(t, next_valid_count - valid_count); __syncthreads(); + } else if constexpr (has_bools_t) { + if (bools_are_rle_stream) { + bool_stream.decode_next(t, next_valid_count - valid_count); + } else { + bool_plain_decode(s, sb, t, next_valid_count - valid_count); + } + __syncthreads(); } // decode the values themselves @@ -1061,250 +1120,82 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) } // anonymous namespace -void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) -{ - constexpr int decode_block_size = 128; - - dim3 dim_block(decode_block_size, 1); - dim3 dim_grid(pages.size(), 1); // 1 threadblock per page - - if (level_type_size == 1) { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } else { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } -} +template +using kernel_tag_t = std::integral_constant; -void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) -{ - constexpr int decode_block_size = 128; - - dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks - - if (level_type_size == 1) { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } else { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } -} +template +using int_tag_t = std::integral_constant; -void __host__ -DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) +void __host__ DecodePageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + decode_kernel_mask kernel_mask, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) { - constexpr int decode_block_size = 128; - - dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks - - if (level_type_size == 1) { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } else { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric + // No template parameters on lambdas until C++20, so use type tags instead + auto launch_kernel = [&](auto block_size_tag, auto kernel_mask_tag) { + constexpr int decode_block_size = decltype(block_size_tag)::value; + constexpr decode_kernel_mask mask = decltype(kernel_mask_tag)::value; + + dim3 dim_block(decode_block_size, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodePageDataGeneric + gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } + }; + + switch (kernel_mask) { + case decode_kernel_mask::FIXED_WIDTH_NO_DICT: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_DICT: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_DICT_NESTED: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_DICT_LIST: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::BOOLEAN: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::BOOLEAN_NESTED: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::BOOLEAN_LIST: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + default: CUDF_EXPECTS(false, "Kernel type not handled by this function"); break; } } diff --git a/cpp/src/io/parquet/decode_preprocess.cu b/cpp/src/io/parquet/decode_preprocess.cu index 62f1ee88036..5b9831668e6 100644 --- a/cpp/src/io/parquet/decode_preprocess.cu +++ b/cpp/src/io/parquet/decode_preprocess.cu @@ -343,8 +343,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) bool has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - __shared__ rle_run rep_runs[rle_run_buffer_size]; + __shared__ rle_run def_runs[rle_run_buffer_size]; + __shared__ rle_run rep_runs[rle_run_buffer_size]; rle_stream decoders[level_type::NUM_LEVEL_TYPES] = {{def_runs}, {rep_runs}}; diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 52d53cb8225..a8a8c441a84 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -181,9 +181,13 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (is_string_col(chunk)) { // check for string before byte_stream_split so FLBA will go to the right kernel return decode_kernel_mask::STRING; + } else if (is_boolean(chunk)) { + return is_list(chunk) ? decode_kernel_mask::BOOLEAN_LIST + : is_nested(chunk) ? decode_kernel_mask::BOOLEAN_NESTED + : decode_kernel_mask::BOOLEAN; } - if (!is_byte_array(chunk) && !is_boolean(chunk)) { + if (!is_byte_array(chunk)) { if (page.encoding == Encoding::PLAIN) { return is_list(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST : is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index ca74a1c2ba0..5ece3a54892 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -618,8 +618,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - __shared__ rle_run rep_runs[rle_run_buffer_size]; + __shared__ rle_run def_runs[rle_run_buffer_size]; + __shared__ rle_run rep_runs[rle_run_buffer_size]; rle_stream decoders[level_type::NUM_LEVEL_TYPES] = {{def_runs}, {rep_runs}}; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index dba24b553e6..3b4d0e6dc80 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -224,6 +224,9 @@ enum class decode_kernel_mask { FIXED_WIDTH_NO_DICT_LIST = (1 << 13), // Run decode kernel for fixed width non-dictionary pages BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST = (1 << 14), // Run decode kernel for BYTE_STREAM_SPLIT encoded data for fixed width lists + BOOLEAN = (1 << 15), // Run decode kernel for boolean data + BOOLEAN_NESTED = (1 << 16), // Run decode kernel for nested boolean data + BOOLEAN_LIST = (1 << 17), // Run decode kernel for list boolean data }; // mask representing all the ways in which a string can be encoded @@ -539,7 +542,7 @@ enum class encode_kernel_mask { DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel - BYTE_STREAM_SPLIT = (1 << 5), // Run plain encoding kernel, but split streams + BYTE_STREAM_SPLIT = (1 << 5) // Run plain encoding kernel, but split streams }; /** @@ -911,72 +914,18 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] has_nesting Whether or not the data contains nested (but not list) data. - * @param[in] is_list Whether or not the data contains list data. + * @param[in] kernel_mask Mask indicating the type of decoding kernel to launch. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ -void DecodePageDataFixed(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); - -/** - * @brief Launches kernel for reading dictionary fixed width column data stored in the pages - * - * The page data will be written to the output pointed to in the page's - * associated column chunk. - * - * @param[in,out] pages All pages to be decoded - * @param[in] chunks All chunks to be decoded - * @param[in] num_rows Total number of rows to read - * @param[in] min_row Minimum number of rows to read - * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] has_nesting Whether or not the data contains nested (but not list) data. - * @param[in] is_list Whether or not the data contains list data. - * @param[out] error_code Error code for kernel failures - * @param[in] stream CUDA stream to use - */ -void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); - -/** - * @brief Launches kernel for reading fixed width column data stored in the pages - * - * The page data will be written to the output pointed to in the page's - * associated column chunk. - * - * @param[in,out] pages All pages to be decoded - * @param[in] chunks All chunks to be decoded - * @param[in] num_rows Total number of rows to read - * @param[in] min_row Minimum number of rows to read - * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] has_nesting Whether or not the data contains nested (but not list) data. - * @param[in] is_list Whether or not the data contains list data. - * @param[out] error_code Error code for kernel failures - * @param[in] stream CUDA stream to use - */ -void DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); +void DecodePageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + decode_kernel_mask kernel_mask, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for initializing encoder row group fragments diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 689386b8957..d74ae83b635 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -97,22 +97,37 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num _stream); } + // Compute column string sizes (using page string offsets) for this subpass col_string_sizes = calculate_page_string_offsets(); - // check for overflow + // ensure cumulative column string sizes have been initialized + if (pass.cumulative_col_string_sizes.empty()) { + pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0); + } + + // Add to the cumulative column string sizes of this pass + std::transform(pass.cumulative_col_string_sizes.begin(), + pass.cumulative_col_string_sizes.end(), + col_string_sizes.begin(), + pass.cumulative_col_string_sizes.begin(), + std::plus<>{}); + + // Check for overflow in cumulative column string sizes of this pass so that the page string + // offsets of overflowing (large) string columns are treated as 64-bit. auto const threshold = static_cast(strings::detail::get_offset64_threshold()); - auto const has_large_strings = std::any_of(col_string_sizes.cbegin(), - col_string_sizes.cend(), + auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(), + pass.cumulative_col_string_sizes.cend(), [=](std::size_t sz) { return sz > threshold; }); if (has_large_strings and not strings::detail::is_large_strings_enabled()) { CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } - // mark any chunks that are large string columns + // Mark any chunks for which the cumulative column string size has exceeded the + // large strings threshold if (has_large_strings) { for (auto& chunk : pass.chunks) { auto const idx = chunk.src_col_index; - if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; } + if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; } } } } @@ -195,7 +210,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num // only do string buffer for leaf if (idx == max_depth - 1 and out_buf.string_size() == 0 and col_string_sizes[pass.chunks[c].src_col_index] > 0) { - out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index], _stream); + out_buf.create_string_data( + col_string_sizes[pass.chunks[c].src_col_index], + pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] > + static_cast(strings::detail::get_offset64_threshold()), + _stream); } if (has_strings) { str_data[idx] = out_buf.string_data(); } out_buf.user_data |= @@ -219,8 +238,20 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num int const nkernels = std::bitset<32>(kernel_mask).count(); auto streams = cudf::detail::fork_streams(_stream, nkernels); - // launch string decoder int s_idx = 0; + + auto decode_data = [&](decode_kernel_mask decoder_mask) { + DecodePageData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + decoder_mask, + error_code.data(), + streams[s_idx++]); + }; + + // launch string decoder if (BitAnd(kernel_mask, decode_kernel_mask::STRING) != 0) { DecodeStringPageData(subpass.pages, pass.chunks, @@ -266,41 +297,17 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num // launch byte stream split decoder if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) != 0) { - DecodeSplitPageFixedWidthData(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - false, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT); } // launch byte stream split decoder, for nested columns if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) != 0) { - DecodeSplitPageFixedWidthData(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED); } // launch byte stream split decoder, for list columns if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST) != 0) { - DecodeSplitPageFixedWidthData(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - true, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST); } // launch byte stream split decoder @@ -316,80 +323,47 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num // launch fixed width type decoder if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT) != 0) { - DecodePageDataFixed(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - false, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_NO_DICT); } // launch fixed width type decoder for lists if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) != 0) { - DecodePageDataFixed(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - true, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST); } // launch fixed width type decoder, for nested columns if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) != 0) { - DecodePageDataFixed(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED); + } + + // launch boolean type decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BOOLEAN) != 0) { + decode_data(decode_kernel_mask::BOOLEAN); + } + + // launch boolean type decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::BOOLEAN_NESTED) != 0) { + decode_data(decode_kernel_mask::BOOLEAN_NESTED); + } + + // launch boolean type decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::BOOLEAN_LIST) != 0) { + decode_data(decode_kernel_mask::BOOLEAN_LIST); } // launch fixed width type decoder with dictionaries if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT) != 0) { - DecodePageDataFixedDict(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - false, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_DICT); } // launch fixed width type decoder with dictionaries for lists if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_LIST) != 0) { - DecodePageDataFixedDict(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - true, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_DICT_LIST); } // launch fixed width type decoder with dictionaries, for nested columns if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) != 0) { - DecodePageDataFixedDict(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_DICT_NESTED); } // launch the catch-all page decoder diff --git a/cpp/src/io/parquet/reader_impl_chunking.hpp b/cpp/src/io/parquet/reader_impl_chunking.hpp index a0c2dbd3e44..ca46f198bb8 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.hpp +++ b/cpp/src/io/parquet/reader_impl_chunking.hpp @@ -130,6 +130,9 @@ struct pass_intermediate_data { rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()}; rmm::device_uvector str_dict_index{0, cudf::get_default_stream()}; + // cumulative strings column sizes. + std::vector cumulative_col_string_sizes{}; + int level_type_size{0}; // skip_rows / num_rows for this pass. diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 69e783a89d0..3c49de0c997 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -152,7 +152,6 @@ __device__ inline void decode(level_t* const output, } // a single rle run. may be broken up into multiple rle_batches -template struct rle_run { int size; // total size of the run int output_pos; // absolute position of this run w.r.t output @@ -183,14 +182,14 @@ struct rle_stream { level_t* output; - rle_run* runs; + rle_run* runs; int output_pos; int fill_index; int decode_index; - __device__ rle_stream(rle_run* _runs) : runs(_runs) {} + __device__ rle_stream(rle_run* _runs) : runs(_runs) {} __device__ inline bool is_last_decode_warp(int warp_id) { @@ -217,7 +216,7 @@ struct rle_stream { decode_index = -1; // signals the first iteration. Nothing to decode. } - __device__ inline int get_rle_run_info(rle_run& run) + __device__ inline int get_rle_run_info(rle_run& run) { run.start = cur; run.level_run = get_vlq32(run.start, end); @@ -384,7 +383,7 @@ struct rle_stream { // started basically we're setting up the rle_stream vars necessary to start fill_run_batch for // the first time while (cur < end) { - rle_run run; + rle_run run; int run_bytes = get_rle_run_info(run); if ((output_pos + run.size) > target_count) { diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 6d954753af8..41ed55cd090 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -63,9 +63,11 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(bool memset_d } void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes, + bool is_large_strings_col, rmm::cuda_stream_view stream) { - _string_data = rmm::device_buffer(num_bytes, stream, _mr); + _is_large_strings_col = is_large_strings_col; + _string_data = rmm::device_buffer(num_bytes, stream, _mr); } namespace { diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 31c8b781e77..da19539f509 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -246,13 +246,17 @@ class inline_column_buffer : public column_buffer_base { [[nodiscard]] size_t data_size_impl() const { return _data.size(); } std::unique_ptr make_string_column_impl(rmm::cuda_stream_view stream); - void create_string_data(size_t num_bytes, rmm::cuda_stream_view stream); + void create_string_data(size_t num_bytes, + bool is_large_strings_col, + rmm::cuda_stream_view stream); void* string_data() { return _string_data.data(); } [[nodiscard]] void const* string_data() const { return _string_data.data(); } [[nodiscard]] size_t string_size() const { return _string_data.size(); } + [[nodiscard]] bool is_large_strings_column() const { return _is_large_strings_col; } private: rmm::device_buffer _string_data{}; + bool _is_large_strings_col{}; }; using column_buffer = gather_column_buffer; diff --git a/cpp/src/io/utilities/column_buffer_strings.cu b/cpp/src/io/utilities/column_buffer_strings.cu index 4bc303a34a5..66d0a644c12 100644 --- a/cpp/src/io/utilities/column_buffer_strings.cu +++ b/cpp/src/io/utilities/column_buffer_strings.cu @@ -27,8 +27,7 @@ std::unique_ptr cudf::io::detail::inline_column_buffer::make_string_colu { // if the size of _string_data is over the threshold for 64bit size_type, _data will contain // sizes rather than offsets. need special handling for that case. - auto const threshold = static_cast(strings::detail::get_offset64_threshold()); - if (_string_data.size() > threshold) { + if (is_large_strings_column()) { if (not strings::detail::is_large_strings_enabled()) { CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index b58ca56e066..26937c9298a 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -239,7 +239,7 @@ struct JsonValidFixedPointReaderTest : public JsonFixedPointReaderTest(), scale}}) + .dtypes(std::vector{data_type{type_to_id(), scale}}) .lines(true); auto const result = cudf::io::read_json(in_opts); @@ -324,7 +324,7 @@ TEST_P(JsonReaderParamTest, FloatingPoint) cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) - .dtypes({dtype()}) + .dtypes(std::vector{dtype()}) .lines(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); @@ -348,7 +348,8 @@ TEST_P(JsonReaderParamTest, JsonLinesStrings) cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) - .dtypes({{"2", dtype()}, {"0", dtype()}, {"1", dtype()}}) + .dtypes(std::map{ + {"2", dtype()}, {"0", dtype()}, {"1", dtype()}}) .lines(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); @@ -466,7 +467,7 @@ TEST_P(JsonReaderParamTest, Booleans) cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) - .dtypes({dtype()}) + .dtypes(std::vector{dtype()}) .lines(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); @@ -508,7 +509,7 @@ TEST_P(JsonReaderParamTest, Dates) cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) - .dtypes({data_type{type_id::TIMESTAMP_MILLISECONDS}}) + .dtypes(std::vector{data_type{type_id::TIMESTAMP_MILLISECONDS}}) .lines(true) .dayfirst(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); @@ -564,7 +565,7 @@ TEST_P(JsonReaderParamTest, Durations) cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) - .dtypes({data_type{type_id::DURATION_NANOSECONDS}}) + .dtypes(std::vector{data_type{type_id::DURATION_NANOSECONDS}}) .lines(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); @@ -1022,7 +1023,7 @@ TEST_P(JsonReaderParamTest, InvalidFloatingPoint) cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) - .dtypes({dtype()}) + .dtypes(std::vector{dtype()}) .lines(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); @@ -1461,7 +1462,7 @@ TEST_F(JsonReaderTest, ErrorStrings) cudf::io::json_reader_options const in_opts = cudf::io::json_reader_options::builder(cudf::io::source_info{buffer.c_str(), buffer.size()}) - .dtypes({data_type{cudf::type_id::STRING}}) + .dtypes(std::vector{data_type{cudf::type_id::STRING}}) .lines(true); auto const result = cudf::io::read_json(in_opts); @@ -1849,7 +1850,7 @@ TYPED_TEST(JsonFixedPointReaderTest, EmptyValues) cudf::io::json_reader_options const in_opts = cudf::io::json_reader_options::builder(cudf::io::source_info{buffer.c_str(), buffer.size()}) - .dtypes({data_type{type_to_id(), 0}}) + .dtypes(std::vector{data_type{type_to_id(), 0}}) .lines(true); auto const result = cudf::io::read_json(in_opts); @@ -2827,7 +2828,7 @@ TEST_F(JsonReaderTest, JSONMixedTypeChildren) EXPECT_EQ(result.metadata.schema_info[0].name, "Root"); ASSERT_EQ(result.metadata.schema_info[0].children.size(), 1); EXPECT_EQ(result.metadata.schema_info[0].children[0].name, "Key"); - ASSERT_EQ(result.metadata.schema_info[0].children[0].children.size(), 2); + ASSERT_EQ(result.metadata.schema_info[0].children[0].children.size(), 1); EXPECT_EQ(result.metadata.schema_info[0].children[0].children[0].name, "offsets"); // types EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRUCT); @@ -2865,7 +2866,7 @@ TEST_F(JsonReaderTest, JSONMixedTypeChildren) EXPECT_EQ(result.metadata.schema_info[0].name, "Root"); ASSERT_EQ(result.metadata.schema_info[0].children.size(), 1); EXPECT_EQ(result.metadata.schema_info[0].children[0].name, "Key"); - ASSERT_EQ(result.metadata.schema_info[0].children[0].children.size(), 2); + ASSERT_EQ(result.metadata.schema_info[0].children[0].children.size(), 1); EXPECT_EQ(result.metadata.schema_info[0].children[0].children[0].name, "offsets"); // types EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRUCT); @@ -2884,9 +2885,9 @@ TEST_F(JsonReaderTest, MixedTypesWithSchema) std::map data_types; std::map child_types; child_types.insert( - std::pair{"element", cudf::io::schema_element{cudf::data_type{cudf::type_id::STRING, 0}, {}}}); - data_types.insert(std::pair{ - "data", cudf::io::schema_element{cudf::data_type{cudf::type_id::LIST, 0}, child_types}}); + std::pair{"element", cudf::io::schema_element{cudf::data_type{cudf::type_id::STRING}, {}}}); + data_types.insert( + std::pair{"data", cudf::io::schema_element{cudf::data_type{cudf::type_id::LIST}, child_types}}); cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) @@ -2991,4 +2992,264 @@ TEST_F(JsonReaderTest, LastRecordInvalid) CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), cudf::table_view{{expected}}); } +// Test case for dtype pruning with column order +TEST_F(JsonReaderTest, JsonNestedDtypeFilterWithOrder) +{ + std::string json_stringl = R"( + {"a": 1, "b": {"0": "abc", "1": [-1.]}, "c": true} + {"a": 1, "b": {"0": "abc" }, "c": false} + {"a": 1, "b": {}} + {"a": 1, "c": null} + )"; + std::string json_string = R"([ + {"a": 1, "b": {"0": "abc", "1": [-1.]}, "c": true}, + {"a": 1, "b": {"0": "abc" }, "c": false}, + {"a": 1, "b": {}}, + {"a": 1, "c": null} + ])"; + for (auto& [json_string, lines] : {std::pair{json_stringl, true}, {json_string, false}}) { + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{json_string.data(), json_string.size()}) + .prune_columns(true) + .lines(lines); + + // include all columns + //// schema with partial ordering + { + cudf::io::schema_element dtype_schema{ + data_type{cudf::type_id::STRUCT}, + { + {"b", + {data_type{cudf::type_id::STRUCT}, + {{"0", {data_type{cudf::type_id::STRING}}}, + {"1", {data_type{cudf::type_id::LIST}, {{"element", {dtype()}}}}}}, + {{"0", "1"}}}}, + {"a", {dtype()}}, + {"c", {dtype()}}, + }, + {{"b", "a", "c"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Make sure we have columns "a", "b" and "c" + ASSERT_EQ(result.tbl->num_columns(), 3); + ASSERT_EQ(result.metadata.schema_info.size(), 3); + EXPECT_EQ(result.metadata.schema_info[0].name, "b"); + EXPECT_EQ(result.metadata.schema_info[1].name, "a"); + EXPECT_EQ(result.metadata.schema_info[2].name, "c"); + // "b" children checks + ASSERT_EQ(result.metadata.schema_info[0].children.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].children[0].name, "0"); + EXPECT_EQ(result.metadata.schema_info[0].children[1].name, "1"); + ASSERT_EQ(result.metadata.schema_info[0].children[1].children.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].children[1].children[0].name, "offsets"); + EXPECT_EQ(result.metadata.schema_info[0].children[1].children[1].name, "element"); + // types + EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::INT32); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRUCT); + EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::BOOL8); + EXPECT_EQ(result.tbl->get_column(0).child(0).type().id(), cudf::type_id::STRING); + EXPECT_EQ(result.tbl->get_column(0).child(1).type().id(), cudf::type_id::LIST); + EXPECT_EQ(result.tbl->get_column(0).child(1).child(0).type().id(), cudf::type_id::INT32); + EXPECT_EQ(result.tbl->get_column(0).child(1).child(1).type().id(), cudf::type_id::FLOAT32); + } + //// schema with pruned columns and different order. + { + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"c", {dtype()}}, + {"b", + { + data_type{cudf::type_id::STRUCT}, + }}, + {"a", {dtype()}}, + }, + {{"c", "b", "a"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // "c", "b" and "a" order + ASSERT_EQ(result.tbl->num_columns(), 3); + ASSERT_EQ(result.metadata.schema_info.size(), 3); + EXPECT_EQ(result.metadata.schema_info[0].name, "c"); + EXPECT_EQ(result.metadata.schema_info[1].name, "b"); + EXPECT_EQ(result.metadata.schema_info[2].name, "a"); + // pruned + EXPECT_EQ(result.metadata.schema_info[1].children.size(), 0); + } + //// schema with pruned columns and different sub-order. + { + cudf::io::schema_element dtype_schema{ + data_type{cudf::type_id::STRUCT}, + { + {"c", {dtype()}}, + {"b", + {data_type{cudf::type_id::STRUCT}, + // {}, + {{"0", {data_type{cudf::type_id::STRING}}}, + {"1", {data_type{cudf::type_id::LIST}, {{"element", {dtype()}}}}}}, + {{"1", "0"}}}}, + {"a", {dtype()}}, + }}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Order of occurance in json + ASSERT_EQ(result.tbl->num_columns(), 3); + ASSERT_EQ(result.metadata.schema_info.size(), 3); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + EXPECT_EQ(result.metadata.schema_info[1].name, "b"); + EXPECT_EQ(result.metadata.schema_info[2].name, "c"); + // Sub-order of "b" + EXPECT_EQ(result.metadata.schema_info[1].children.size(), 2); + EXPECT_EQ(result.metadata.schema_info[1].children[0].name, "1"); + EXPECT_EQ(result.metadata.schema_info[1].children[1].name, "0"); + } + //// schema with 1 dtype, but 2 column order + { + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"a", {dtype()}}, + }, + {{"a", "b"}}}; + EXPECT_THROW(in_options.set_dtypes(dtype_schema), std::invalid_argument); + // Input schema column order size mismatch with input schema child types + } + //// repetition, Error + { + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"a", {dtype()}}, + }, + {{"a", "a"}}}; + EXPECT_THROW(in_options.set_dtypes(dtype_schema), std::invalid_argument); + // Input schema column order size mismatch with input schema child types + } + //// different column name in order, Error + { + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"a", {dtype()}}, + }, + {{"b"}}}; + EXPECT_THROW(in_options.set_dtypes(dtype_schema), std::invalid_argument); + // Column name not found in input schema map, but present in column order and + // prune_columns is enabled + } + // include only one column (nested) + { + cudf::io::schema_element dtype_schema{ + data_type{cudf::type_id::STRUCT}, + { + {"b", + {data_type{cudf::type_id::STRUCT}, + {{"1", {data_type{cudf::type_id::LIST}, {{"element", {dtype()}}}}}}, + {{"1"}}}}, + }}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Make sure we have column "b":"1":[float] + ASSERT_EQ(result.tbl->num_columns(), 1); + ASSERT_EQ(result.metadata.schema_info.size(), 1); + EXPECT_EQ(result.metadata.schema_info[0].name, "b"); + ASSERT_EQ(result.metadata.schema_info[0].children.size(), 1); + EXPECT_EQ(result.metadata.schema_info[0].children[0].name, "1"); + ASSERT_EQ(result.metadata.schema_info[0].children[0].children.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].children[0].children[0].name, "offsets"); + EXPECT_EQ(result.metadata.schema_info[0].children[0].children[1].name, "element"); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRUCT); + EXPECT_EQ(result.tbl->get_column(0).child(0).type().id(), cudf::type_id::LIST); + EXPECT_EQ(result.tbl->get_column(0).child(0).child(0).type().id(), cudf::type_id::INT32); + EXPECT_EQ(result.tbl->get_column(0).child(0).child(1).type().id(), cudf::type_id::FLOAT32); + } + // multiple - all present + { + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"a", {dtype()}}, + {"c", {dtype()}}, + }, + {{"a", "c"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Make sure we have columns "a", and "c" + ASSERT_EQ(result.tbl->num_columns(), 2); + ASSERT_EQ(result.metadata.schema_info.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + EXPECT_EQ(result.metadata.schema_info[1].name, "c"); + } + // multiple - not all present + { + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"a", {dtype()}}, + {"d", {dtype()}}, + }, + {{"a", "d"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Make sure we have column "a" + ASSERT_EQ(result.tbl->num_columns(), 2); + ASSERT_EQ(result.metadata.schema_info.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + EXPECT_EQ(result.metadata.schema_info[1].name, "d"); + auto all_null_bools = + cudf::test::fixed_width_column_wrapper{{true, true, true, true}, {0, 0, 0, 0}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), all_null_bools); + } + // test struct, list of string, list of struct. + // multiple - not all present nested + { + cudf::io::schema_element dtype_schema{ + data_type{cudf::type_id::STRUCT}, + { + {"b", + {data_type{cudf::type_id::STRUCT}, + { + {"2", {data_type{cudf::type_id::STRING}}}, + }, + {{"2"}}}}, + {"d", {data_type{cudf::type_id::LIST}, {{"element", {dtype()}}}}}, + {"e", + {data_type{cudf::type_id::LIST}, + {{"element", + { + data_type{cudf::type_id::STRUCT}, + { + {"3", {data_type{cudf::type_id::STRING}}}, + }, //{{"3"}} missing column_order, but output should not have it. + }}}}}, + }, + {{"b", "d", "e"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Make sure we have columns "b" (empty struct) and "c" + ASSERT_EQ(result.tbl->num_columns(), 3); + ASSERT_EQ(result.metadata.schema_info.size(), 3); + EXPECT_EQ(result.metadata.schema_info[0].name, "b"); + ASSERT_EQ(result.metadata.schema_info[0].children.size(), 1); + ASSERT_EQ(result.metadata.schema_info[0].children[0].name, "2"); + EXPECT_EQ(result.metadata.schema_info[1].name, "d"); + auto all_null_strings = cudf::test::strings_column_wrapper{{"", "", "", ""}, {0, 0, 0, 0}}; + EXPECT_EQ(result.tbl->get_column(0).num_children(), 1); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0).child(0), all_null_strings); + auto const all_null_list = cudf::test::lists_column_wrapper{ + {{0, 0}, {1, 1}, {2, 2}, {3, 3}}, cudf::test::iterators::all_nulls()}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), all_null_list); + EXPECT_EQ(result.metadata.schema_info[2].name, "e"); + ASSERT_EQ(result.metadata.schema_info[2].children.size(), 2); + ASSERT_EQ(result.metadata.schema_info[2].children[1].children.size(), 0); + // ASSERT_EQ(result.metadata.schema_info[2].children[1].children[0].name, "3"); + auto empty_string_col = cudf::test::strings_column_wrapper{}; + cudf::test::structs_column_wrapper expected_structs{{}, cudf::test::iterators::all_nulls()}; + // make all null column of list of struct of string + auto wrapped = make_lists_column( + 4, + cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0}.release(), + expected_structs.release(), + 4, + cudf::create_null_mask(4, cudf::mask_state::ALL_NULL)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(2), *wrapped); + } + } +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp index f47782a2d02..39cd783de00 100644 --- a/cpp/tests/large_strings/parquet_tests.cpp +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -69,3 +70,76 @@ TEST_F(ParquetStringsTest, ReadLargeStrings) // go back to normal threshold unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); } + +// Disabled as the test is too brittle and depends on empirically set `pass_read_limit`, +// encoding type, and the currently used `ZSTD` scratch space size. +TEST_F(ParquetStringsTest, DISABLED_ChunkedReadLargeStrings) +{ + // Construct a table with one large strings column > 2GB + auto const wide = this->wide_column(); + auto input = cudf::concatenate(std::vector(120000, wide)); ///< 230MB + + int constexpr multiplier = 12; + std::vector input_cols(multiplier, input->view()); + auto col0 = cudf::concatenate(input_cols); ///< 2.70GB + + // Expected table + auto const expected = cudf::table_view{{col0->view()}}; + auto expected_metadata = cudf::io::table_input_metadata{expected}; + + // Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with + // regualar ones. This may change in the future and lead to false failures. + expected_metadata.column_metadata[0].set_encoding( + cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); + + // Host buffer to write Parquet + std::vector buffer; + + // Writer options + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected) + .metadata(expected_metadata); + + // Needed to get exactly 2 Parquet subpasses: first with large-strings and the second with + // regualar ones. This may change in the future and lead to false failures. + out_opts.set_compression(cudf::io::compression_type::ZSTD); + + // Write to Parquet + cudf::io::write_parquet(out_opts); + + // Empirically set pass_read_limit of 8GB so we read almost entire table (>2GB strings) in the + // first subpass and only a small amount in the second subpass. This may change in the future + // and lead to false failures. + size_t constexpr pass_read_limit = size_t{8} * 1024 * 1024 * 1024; + + // Reader options + cudf::io::parquet_reader_options default_in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size())); + + // Chunked parquet reader + auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts); + + // Read chunked + auto tables = std::vector>{}; + while (reader.has_next()) { + tables.emplace_back(reader.read_chunk().tbl); + } + auto table_views = std::vector{}; + std::transform(tables.begin(), tables.end(), std::back_inserter(table_views), [](auto& tbl) { + return tbl->view(); + }); + auto result = cudf::concatenate(table_views); + auto const result_view = result->view(); + + // Verify offsets + for (auto const& cv : result_view) { + auto const offsets = cudf::strings_column_view(cv).offsets(); + EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64}); + } + + // Verify tables to be equal + CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected); + + // Verify that we read exactly two table chunks + EXPECT_EQ(tables.size(), 2); +} diff --git a/python/cudf/cudf/_lib/sort.pyx b/python/cudf/cudf/_lib/sort.pyx index 185552ede82..eefe37d9880 100644 --- a/python/cudf/cudf/_lib/sort.pyx +++ b/python/cudf/cudf/_lib/sort.pyx @@ -5,21 +5,10 @@ from itertools import repeat from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool -from libcpp.memory cimport unique_ptr -from libcpp.utility cimport move -from libcpp.vector cimport vector from pylibcudf.libcudf.aggregation cimport rank_method -from pylibcudf.libcudf.column.column cimport column -from pylibcudf.libcudf.search cimport lower_bound, upper_bound -from pylibcudf.libcudf.table.table_view cimport table_view -from pylibcudf.libcudf.types cimport null_order, order as cpp_order - from cudf._lib.column cimport Column -from cudf._lib.utils cimport ( - columns_from_pylibcudf_table, - table_view_from_columns, -) +from cudf._lib.utils cimport columns_from_pylibcudf_table import pylibcudf @@ -311,44 +300,19 @@ def digitize(list source_columns, list bins, bool right=False): right : Indicating whether the intervals include the right or the left bin edge. """ - - cdef table_view bins_view = table_view_from_columns(bins) - cdef table_view source_table_view = table_view_from_columns( - source_columns - ) - cdef vector[cpp_order] column_order = ( - vector[cpp_order]( - bins_view.num_columns(), - cpp_order.ASCENDING - ) - ) - cdef vector[null_order] null_precedence = ( - vector[null_order]( - bins_view.num_columns(), - null_order.BEFORE + return Column.from_pylibcudf( + getattr(pylibcudf.search, "lower_bound" if right else "upper_bound")( + pylibcudf.Table( + [c.to_pylibcudf(mode="read") for c in bins] + ), + pylibcudf.Table( + [c.to_pylibcudf(mode="read") for c in source_columns] + ), + [pylibcudf.types.Order.ASCENDING]*len(bins), + [pylibcudf.types.NullOrder.BEFORE]*len(bins) ) ) - cdef unique_ptr[column] c_result - if right: - with nogil: - c_result = move(lower_bound( - bins_view, - source_table_view, - column_order, - null_precedence) - ) - else: - with nogil: - c_result = move(upper_bound( - bins_view, - source_table_view, - column_order, - null_precedence) - ) - - return Column.from_unique_ptr(move(c_result)) - @acquire_spill_lock() def rank_columns(list source_columns, rank_method method, str na_option, diff --git a/python/cudf/cudf/_lib/strings/convert/convert_integers.pyx b/python/cudf/cudf/_lib/strings/convert/convert_integers.pyx index 8b6da2bfa1c..50113347ccb 100644 --- a/python/cudf/cudf/_lib/strings/convert/convert_integers.pyx +++ b/python/cudf/cudf/_lib/strings/convert/convert_integers.pyx @@ -1,15 +1,8 @@ # Copyright (c) 2021-2024, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr -from libcpp.utility cimport move - from cudf.core.buffer import acquire_spill_lock -from pylibcudf.libcudf.column.column cimport column -from pylibcudf.libcudf.column.column_view cimport column_view -from pylibcudf.libcudf.strings.convert.convert_integers cimport ( - is_integer as cpp_is_integer, -) +import pylibcudf as plc from cudf._lib.column cimport Column @@ -20,12 +13,8 @@ def is_integer(Column source_strings): Returns a Column of boolean values with True for `source_strings` that have integers. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() - - with nogil: - c_result = move(cpp_is_integer( - source_view - )) - - return Column.from_unique_ptr(move(c_result)) + return Column.from_pylibcudf( + plc.strings.convert.convert_integers.is_integer( + source_strings.to_pylibcudf(mode="read") + ) + )