Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional column_order in JSON reader #17029

Merged
merged 27 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1b6ca58
add optional column_order to schema_element
karthikeyann Oct 9, 2024
e0c373a
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Oct 21, 2024
732f234
doc fixes
karthikeyann Oct 21, 2024
ffdd817
fix ambiguous std::map call
karthikeyann Oct 21, 2024
02e8ab3
simplify schema_element interface
karthikeyann Oct 22, 2024
ac05ae9
create all null columns
karthikeyann Oct 23, 2024
f10d9c2
metadata for all null non-present columns
karthikeyann Oct 24, 2024
71b4142
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Oct 24, 2024
c8e223d
address review commemnts, unit test
karthikeyann Oct 24, 2024
1c871a8
Merge branch 'fea-json_column_order' of github.com:karthikeyann/cudf …
karthikeyann Oct 24, 2024
9297b7e
fix empty all-null rows issue at top level
karthikeyann Oct 25, 2024
eb9f8fc
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 4, 2024
9e31b71
add validation for dtypes with column order
karthikeyann Nov 4, 2024
82cf186
cleanup
karthikeyann Nov 4, 2024
105250b
address review comments
karthikeyann Nov 4, 2024
1a7a99c
add docs to dtype_variant
karthikeyann Nov 4, 2024
fcd8e3c
fix docs
karthikeyann Nov 4, 2024
15ef1d5
Merge branch 'branch-24.12' into fea-json_column_order
ttnghia Nov 5, 2024
b2dd7cd
moved dtype_variant alias to public
karthikeyann Nov 5, 2024
2272f72
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 5, 2024
e8e1c28
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 6, 2024
0bfe85e
remove chars in string column metadata
karthikeyann Nov 6, 2024
da24f1d
fix string col metadata in unit test
karthikeyann Nov 6, 2024
7e31f91
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 6, 2024
83b979c
add missing doc
karthikeyann Nov 7, 2024
a5442b9
Merge branch 'branch-24.12' into fea-json_column_order
karthikeyann Nov 7, 2024
4b82cf6
address review comments
karthikeyann Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ struct schema_element {
* @brief Allows specifying this column's child columns target type
*/
std::map<std::string, schema_element> child_types;

/** @brief Allows specifying the order of the columns
*/
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
std::optional<std::vector<std::string>> column_order;

/** @brief Returns the size of the child_types map
*
* @return size_t The size of the child_types map
*/
size_t size() const { return child_types.size(); }
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down Expand Up @@ -92,7 +102,8 @@ class json_reader_options {
// Data types of the column; empty to infer dtypes
std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>>
std::map<std::string, schema_element>,
schema_element>
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
_dtypes;
// Specify the compression format of the source or infer from file extension
compression_type _compression = compression_type::AUTO;
Expand Down Expand Up @@ -180,7 +191,8 @@ class json_reader_options {
*/
[[nodiscard]] std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>> const&
std::map<std::string, schema_element>,
schema_element> const&
get_dtypes() const
{
return _dtypes;
Expand Down Expand Up @@ -390,6 +402,13 @@ class json_reader_options {
*/
void set_dtypes(std::map<std::string, schema_element> types) { _dtypes = std::move(types); }

/**
* @brief Set data types for a potentially nested column hierarchy.
*
* @param types Map of column names to schema_element to support arbitrary nesting of data types
*/
void set_dtypes(schema_element types) { _dtypes = std::move(types); }

/**
* @brief Set the compression type.
*
Expand Down Expand Up @@ -624,6 +643,18 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set data types for columns to be read.
*
* @param types Column name -> schema_element with map and order
* @return this for chaining
*/
json_reader_options_builder& dtypes(schema_element types)
{
options._dtypes = std::move(types);
return *this;
}

/**
* @brief Set the compression type.
*
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/io/json/host_tree_algorithms.cu
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ std::map<std::string, schema_element> unified_schema(cudf::io::json_reader_optio
});
return dnew;
},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; }},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; },
[](schema_element const& user_dtypes) { return user_dtypes.child_types; }},
options.get_dtypes());
}

Expand Down Expand Up @@ -596,6 +597,15 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
if (user_dtypes.count(name))
mark_is_pruned(first_child_id, user_dtypes.at(name));
}
},
[&root_list_col_id, &adj, &mark_is_pruned, &column_names](
schema_element const& user_dtypes) -> void {
for (size_t i = 0; i < adj[root_list_col_id].size(); i++) {
auto const first_child_id = adj[root_list_col_id][i];
auto name = column_names[first_child_id];
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (user_dtypes.child_types.count(name))
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
mark_is_pruned(first_child_id, user_dtypes.child_types.at(name));
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
}
}},
options.get_dtypes());
} else {
Expand Down Expand Up @@ -629,7 +639,9 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
[&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](
std::map<std::string, schema_element> const& user_dtypes) -> void {
mark_is_pruned(root_struct_col_id, u_schema);
}},
},
[&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](schema_element const& user_dtypes)
-> void { mark_is_pruned(root_struct_col_id, u_schema); }},
options.get_dtypes());
}
// Useful for array of arrays
Expand Down
68 changes: 62 additions & 6 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,31 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
std::vector<std::unique_ptr<column>> child_columns;
std::vector<column_name_info> column_names{};
size_type num_rows{json_col.num_rows};
if (schema.has_value() and schema.value().column_order.has_value()) {
CUDF_EXPECTS(schema.value().child_types.size() == schema.value().column_order->size(),
"Input schema column order size mismatch with input schema child types");
}
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
// Create children columns
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto& child_col = col->second;
auto const& col_order = schema.has_value() and schema.value().column_order.has_value() and
not schema.value().column_order->empty()
? schema.value().column_order.value()
: json_col.column_order;
for (auto const& col_name : col_order) {
auto child_schema_element = get_child_schema(col_name);
auto const& found_col = json_col.child_columns.find(col_name);
if (prune_columns and found_col == std::end(json_col.child_columns)) {
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
column_names.emplace_back(make_column_name_info(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}), col_name));
auto all_null_column = make_all_nulls_column(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}),
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
num_rows,
stream,
mr);
child_columns.emplace_back(std::move(all_null_column));
continue;
}
column_names.emplace_back(found_col->first);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above we have if (prune_columns and found_col == std::end thus here if !prune_columns then we still have found_col == std::end.

Copy link
Contributor Author

@karthikeyann karthikeyann Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added prune_columns condition to col_order decision. This case won't happen.

auto& child_col = found_col->second;
if (!prune_columns or child_schema_element.has_value()) {
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, prune_columns, child_schema_element, stream, mr);
Expand Down Expand Up @@ -580,9 +599,24 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,

// Iterate over the struct's child columns and convert to cudf column
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto& json_col = root_struct_col.child_columns.find(col_name)->second;

bool const has_column_order =
options.is_enabled_prune_columns() and
std::holds_alternative<schema_element>(options.get_dtypes()) and
std::get<schema_element>(options.get_dtypes()).column_order.has_value() and
not std::get<schema_element>(options.get_dtypes()).column_order->empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw if options.is_enabled_prune_columns() is false but column order is given? The users may want to specify column order but they may forget to enable prune column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, the user wants to prune column, but maintain the order as in json file. that's why that throw was not added.

auto const& col_order = has_column_order
? std::get<schema_element>(options.get_dtypes()).column_order.value()
: root_struct_col.column_order;
if (has_column_order) {
CUDF_EXPECTS(
std::get<schema_element>(options.get_dtypes()).child_types.size() == col_order.size(),
"Input schema column order size mismatch with input schema child types");
}
auto root_col_size = root_struct_col.child_columns.empty()
? device_json_column::row_offset_t{0}
: root_struct_col.child_columns.begin()->second.num_rows;
for (auto const& col_name : col_order) {
std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
[column_index](std::vector<data_type> const& user_dtypes) -> std::optional<schema_element> {
Expand All @@ -601,6 +635,11 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
return (user_dtypes.find(col_name) != std::end(user_dtypes))
? user_dtypes.find(col_name)->second
: std::optional<schema_element>{};
},
[col_name](schema_element const& user_dtypes) -> std::optional<schema_element> {
return (user_dtypes.child_types.find(col_name) != std::end(user_dtypes.child_types))
? user_dtypes.child_types.find(col_name)->second
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
: std::optional<schema_element>{};
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
}},
options.get_dtypes());
#ifdef NJP_DEBUG_PRINT
Expand All @@ -624,6 +663,23 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
debug_schema_print(child_schema_element);
#endif

auto found_col = root_struct_col.child_columns.find(col_name);
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (options.is_enabled_prune_columns() and
found_col == std::end(root_struct_col.child_columns)) {
// inserts empty null column
out_column_names.emplace_back(make_column_name_info(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}), col_name));
auto all_null_column = make_all_nulls_column(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}),
root_col_size,
stream,
mr);
out_columns.emplace_back(std::move(all_null_column));
column_index++;
continue;
}
auto& json_col = found_col->second;

if (!options.is_enabled_prune_columns() or child_schema_element.has_value()) {
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] =
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,23 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/** @brief Create all null column of a given nested schema
*
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
* @param schema The schema of the column to create
* @param num_rows The number of rows in the column
* @param stream The CUDA stream to which kernels are dispatched
* @param mr Optional, resource with which to allocate
* @return The all null column
*/
std::unique_ptr<column> make_all_nulls_column(schema_element const& schema,
size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/** @brief Create metadata for a column of a given schema
*/
column_name_info make_column_name_info(schema_element const& schema, std::string const& col_name);

/**
* @brief Get the path data type of a column by path if present in input schema
*
Expand Down
148 changes: 148 additions & 0 deletions cpp/src/io/json/parser_features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,157 @@

#include "nested_json.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/strings/string_view.hpp>
#include <cudf/utilities/traits.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

#include <optional>
#include <string>
#include <vector>

namespace cudf::io::json::detail {

/// Created an empty column of the specified schema
struct empty_column_functor {
rmm::cuda_stream_view stream;
rmm::device_async_resource_ref mr;

template <typename T, std::enable_if_t<!cudf::is_nested<T>()>* = nullptr>
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<column> operator()(schema_element const& schema) const
{
return make_empty_column(schema.type);
}

template <typename T, std::enable_if_t<std::is_same_v<T, cudf::list_view>>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema) const
{
CUDF_EXPECTS(schema.child_types.size() == 1, "List column should have only one child");
auto const& child_name = schema.child_types.begin()->first;
std::unique_ptr<column> child = cudf::type_dispatcher(
schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name));
auto offsets = make_empty_column(data_type(type_to_id<size_type>()));
return make_lists_column(0, std::move(offsets), std::move(child), 0, {}, stream, mr);
}

template <typename T, std::enable_if_t<std::is_same_v<T, cudf::struct_view>>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema) const
{
std::vector<std::unique_ptr<column>> child_columns;
for (auto child_name : schema.column_order.value_or(std::vector<std::string>{})) {
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
child_columns.push_back(cudf::type_dispatcher(
schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name)));
}
return make_structs_column(0, std::move(child_columns), 0, {}, stream, mr);
}
};

/// Created all null column of the specified schema
struct allnull_column_functor {
rmm::cuda_stream_view stream;
rmm::device_async_resource_ref mr;

auto make_zeroed_offsets(size_type size) const
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
{
auto offsets_buff =
cudf::detail::make_zeroed_device_uvector_async<size_type>(size + 1, stream, mr);
return std::make_unique<column>(std::move(offsets_buff), rmm::device_buffer{}, 0);
}

template <typename T, std::enable_if_t<cudf::is_fixed_width<T>()>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema, size_type size) const
{
return make_fixed_width_column(schema.type, size, mask_state::ALL_NULL, stream, mr);
}

template <typename T, std::enable_if_t<cudf::is_dictionary<T>()>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema, size_type size) const
{
CUDF_EXPECTS(schema.child_types.size() == 1, "Dictionary column should have only one child");
auto const& child_name = schema.child_types.begin()->first;
std::unique_ptr<column> child = cudf::type_dispatcher(schema.child_types.at(child_name).type,
empty_column_functor{stream, mr},
schema.child_types.at(child_name));
return make_fixed_width_column(schema.type, size, mask_state::ALL_NULL, stream, mr);
auto indices = make_zeroed_offsets(size - 1);
auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr);
return make_dictionary_column(
std::move(child), std::move(indices), std::move(null_mask), size, stream, mr);
}

template <typename T, std::enable_if_t<std::is_same_v<T, cudf::string_view>>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema, size_type size) const
{
auto offsets = make_zeroed_offsets(size);
auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr);
return make_strings_column(
size, std::move(offsets), rmm::device_buffer{}, size, std::move(null_mask));
}
template <typename T, std::enable_if_t<std::is_same_v<T, cudf::list_view>>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema, size_type size) const
{
CUDF_EXPECTS(schema.child_types.size() == 1, "List column should have only one child");
auto const& child_name = schema.child_types.begin()->first;
std::unique_ptr<column> child = cudf::type_dispatcher(schema.child_types.at(child_name).type,
empty_column_functor{stream, mr},
schema.child_types.at(child_name));
auto offsets = make_zeroed_offsets(size);
auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr);
return make_lists_column(
size, std::move(offsets), std::move(child), size, std::move(null_mask), stream, mr);
}

template <typename T, std::enable_if_t<std::is_same_v<T, cudf::struct_view>>* = nullptr>
std::unique_ptr<column> operator()(schema_element const& schema, size_type size) const
{
std::vector<std::unique_ptr<column>> child_columns;
for (auto child_name : schema.column_order.value_or(std::vector<std::string>{})) {
child_columns.push_back(cudf::type_dispatcher(
schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name), size));
}
auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr);
return make_structs_column(
size, std::move(child_columns), size, std::move(null_mask), stream, mr);
}
};

std::unique_ptr<column> make_all_nulls_column(schema_element const& schema,
size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
return cudf::type_dispatcher(schema.type, allnull_column_functor{stream, mr}, schema, num_rows);
}

column_name_info make_column_name_info(schema_element const& schema, std::string const& col_name)
{
column_name_info info;
info.name = col_name;
if (schema.type.id() == type_id::STRUCT) {
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
for (auto const& child_name : schema.column_order.value_or(std::vector<std::string>{})) {
info.children.push_back(make_column_name_info(schema.child_types.at(child_name), child_name));
}
} else if (schema.type.id() == type_id::LIST) {
info.children.emplace_back("offsets");
for (auto const& [child_name, child_schema] : schema.child_types) {
info.children.push_back(make_column_name_info(child_schema, child_name));
}
} else if (schema.type.id() == type_id::DICTIONARY32) {
info.children.emplace_back("indices");
for (auto const& [child_name, child_schema] : schema.child_types) {
info.children.push_back(make_column_name_info(child_schema, child_name));
}
} else if (schema.type.id() == type_id::STRING) {
info.children.emplace_back("offsets");
info.children.emplace_back("chars");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still correct? Now strings columns no longer have chars child.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #17240.

}
return info;
}

std::optional<schema_element> child_schema_element(std::string const& col_name,
cudf::io::json_reader_options const& options)
{
Expand All @@ -46,6 +189,11 @@ std::optional<schema_element> child_schema_element(std::string const& col_name,
return (user_dtypes.find(col_name) != std::end(user_dtypes))
? user_dtypes.find(col_name)->second
: std::optional<schema_element>{};
},
[col_name](schema_element const& user_dtypes) -> std::optional<schema_element> {
return (user_dtypes.child_types.find(col_name) != std::end(user_dtypes.child_types))
? user_dtypes.child_types.find(col_name)->second
: std::optional<schema_element>{};
}},
options.get_dtypes());
}
Expand Down
Loading
Loading