Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in recovering invalid lines in JSONL inputs #17098

Merged
merged 41 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9ff3129
add option to nullify empty lines
karthikeyann Oct 9, 2024
624743b
printf debugging
shrshi Oct 11, 2024
bcecb25
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 14, 2024
f9b7e08
Merge branch 'enh-json_nullify_empty_lines' into json-quote-char-pars…
shrshi Oct 15, 2024
55c13a0
added test; fixed small bug in nullifying empty rows
shrshi Oct 16, 2024
9d2a2f0
formatting
shrshi Oct 16, 2024
3d0a51d
removing from modifications to dfa
shrshi Oct 16, 2024
911e065
remove hardcoding of delimiter
shrshi Oct 16, 2024
ab7659b
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
karthikeyann Oct 17, 2024
0ef5108
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
shrshi Oct 18, 2024
1dffbf0
Merge branch 'enh-json_nullify_empty_lines' of github.com:karthikeyan…
shrshi Oct 18, 2024
293521f
Update cpp/tests/io/json/json_test.cpp
shrshi Oct 21, 2024
ca8ee32
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
ttnghia Oct 21, 2024
ebc5275
pre-process concat
shrshi Oct 21, 2024
679833b
formatting
shrshi Oct 21, 2024
b192fd2
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
shrshi Oct 21, 2024
31d5cab
some logic fixes
shrshi Oct 22, 2024
7c3e0f0
formatting
shrshi Oct 22, 2024
35b7177
test
shrshi Oct 22, 2024
9370dc5
formatting
shrshi Oct 22, 2024
6d87031
test cleanup
shrshi Oct 22, 2024
b9005ae
formatting
shrshi Oct 22, 2024
4382ef8
pr reviews
shrshi Oct 22, 2024
f75d8ee
formatting
shrshi Oct 22, 2024
bb9584e
formatting fix
shrshi Oct 22, 2024
6ad06ca
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
shrshi Oct 22, 2024
424f90f
pr reviews
shrshi Oct 24, 2024
8b48297
Merge branch 'enh-json_nullify_empty_lines' of github.com:karthikeyan…
shrshi Oct 24, 2024
f651087
merge
shrshi Oct 24, 2024
dfba4cd
Merge branch 'json-quote-char-parsing-fix' of github.com:shrshi/cudf …
shrshi Oct 24, 2024
eb82450
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
d3193e3
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
18f1a6e
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
96dce9d
pr reviews
shrshi Oct 29, 2024
f8c5de3
formatting
shrshi Oct 29, 2024
c0d0b3e
Merge branch 'json-quote-char-parsing-fix' of github.com:shrshi/cudf …
shrshi Oct 29, 2024
234c19d
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
77b2f99
oops, undoing accidental merge
shrshi Oct 29, 2024
2e37ed4
Merge branch 'json-quote-char-parsing-fix' of github.com:shrshi/cudf …
shrshi Oct 29, 2024
3784be9
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
f351242
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class json_reader_options {
// Normalize unquoted spaces and tabs
bool _normalize_whitespace = false;

bool _nullify_empty_lines = false;

// Whether to recover after an invalid JSON line
json_recovery_mode_t _recovery_mode = json_recovery_mode_t::FAIL;

Expand Down Expand Up @@ -313,6 +315,13 @@ class json_reader_options {
*/
[[nodiscard]] bool is_enabled_normalize_whitespace() const { return _normalize_whitespace; }

/**
* @brief Whether the reader should nullify empty lines for json lines format with recovery mode
*
* @returns true if the reader should nullify empty lines, false otherwise
*/
[[nodiscard]] bool is_nullify_empty_lines() const { return _nullify_empty_lines; }

/**
* @brief Queries the JSON reader's behavior on invalid JSON lines.
*
Expand Down Expand Up @@ -502,6 +511,14 @@ class json_reader_options {
*/
void enable_normalize_whitespace(bool val) { _normalize_whitespace = val; }

/**
* @brief Set whether the reader should nullify empty lines for json lines format with recovery
* mode
*
* @param val Boolean value to indicate whether the reader should nullify empty lines
*/
void nullify_empty_lines(bool val) { _nullify_empty_lines = val; }

/**
* @brief Specifies the JSON reader's behavior on invalid JSON lines.
*
Expand Down Expand Up @@ -779,6 +796,19 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set whether the reader should nullify empty lines for json lines format with recovery
* mode
*
* @param val Boolean value to indicate whether the reader should nullify empty lines
* @return this for chaining
*/
json_reader_options_builder& nullify_empty_lines(bool val)
{
options._nullify_empty_lines = val;
return *this;
}

/**
* @brief Specifies the JSON reader's behavior on invalid JSON lines.
*
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,15 @@ void get_stack_context(device_span<SymbolT const> json_in,
*
* @param tokens The tokens to be post-processed
* @param token_indices The tokens' corresponding indices that are post-processed
* @param nullify_empty_lines Whether to nullify empty lines
* @param stream The cuda stream to dispatch GPU kernels to
* @return Returns the post-processed token stream
*/
CUDF_EXPORT
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
bool nullify_empty_lines,
rmm::cuda_stream_view stream);

/**
Expand Down
41 changes: 26 additions & 15 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ using SymbolGroupT = uint8_t;
/**
* @brief Definition of the DFA's states
*/
enum class dfa_states : StateT { VALID, INVALID, NUM_STATES };
enum class dfa_states : StateT { START, VALID, INVALID, NUM_STATES };

// Aliases for readability of the transition table
constexpr auto TT_INV = dfa_states::INVALID;
constexpr auto TT_VLD = dfa_states::VALID;
constexpr auto TT_START = dfa_states::START;
constexpr auto TT_INV = dfa_states::INVALID;
constexpr auto TT_VLD = dfa_states::VALID;

/**
* @brief Definition of the symbol groups
Expand Down Expand Up @@ -239,14 +240,17 @@ struct UnwrapTokenFromSymbolOp {
* invalid lines.
*/
struct TransduceToken {
bool nullify_empty_lines;
template <typename RelativeOffsetT, typename SymbolT>
constexpr CUDF_HOST_DEVICE SymbolT operator()(StateT const state_id,
SymbolGroupT const match_id,
RelativeOffsetT const relative_offset,
SymbolT const read_symbol) const
{
bool const is_empty_invalid =
(nullify_empty_lines && state_id == static_cast<StateT>(TT_START));
bool const is_end_of_invalid_line =
(state_id == static_cast<StateT>(TT_INV) &&
((state_id == static_cast<StateT>(TT_INV) or is_empty_invalid) &&
match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER));

if (is_end_of_invalid_line) {
Expand All @@ -266,14 +270,17 @@ struct TransduceToken {
constexpr int32_t num_inv_tokens = 2;

bool const is_delimiter = match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER);
bool const is_empty_invalid =
(nullify_empty_lines && state_id == static_cast<StateT>(TT_START));

// If state is either invalid or we're entering an invalid state, we discard tokens
bool const is_part_of_invalid_line =
(match_id != static_cast<SymbolGroupT>(dfa_symbol_group_id::ERROR) &&
state_id == static_cast<StateT>(TT_VLD));
(state_id == static_cast<StateT>(TT_VLD) or state_id == static_cast<StateT>(TT_START)));

// Indicates whether we transition from an invalid line to a potentially valid line
bool const is_end_of_invalid_line = (state_id == static_cast<StateT>(TT_INV) && is_delimiter);
bool const is_end_of_invalid_line =
((state_id == static_cast<StateT>(TT_INV) or is_empty_invalid) && is_delimiter);

int32_t const emit_count =
is_end_of_invalid_line ? num_inv_tokens : (is_part_of_invalid_line && !is_delimiter ? 1 : 0);
Expand All @@ -284,8 +291,9 @@ struct TransduceToken {
// Transition table
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE ERROR DELIM OTHER */
/* VALID */ {{TT_INV, TT_VLD, TT_VLD}},
/* INVALID */ {{TT_INV, TT_VLD, TT_INV}}}};
/* START */ {{TT_INV, TT_START, TT_VLD}},
/* VALID */ {{TT_INV, TT_START, TT_VLD}},
/* INVALID */ {{TT_INV, TT_START, TT_INV}}}};

// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_VLD);
Expand Down Expand Up @@ -1507,17 +1515,19 @@ void get_stack_context(device_span<SymbolT const> json_in,
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
bool nullify_empty_lines,
rmm::cuda_stream_view stream)
{
// Instantiate FST for post-processing the token stream to remove all tokens that belong to an
// invalid JSON line
token_filter::UnwrapTokenFromSymbolOp sgid_op{};
using symbol_t = thrust::tuple<PdaTokenT, SymbolOffsetT>;
auto filter_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor<symbol_t, 0, 2>(token_filter::TransduceToken{}),
stream);
using symbol_t = thrust::tuple<PdaTokenT, SymbolOffsetT>;
auto filter_fst =
fst::detail::make_fst(fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor<symbol_t, 0, 2>(
token_filter::TransduceToken{nullify_empty_lines}),
stream);

auto const mr = cudf::get_current_device_resource_ref();
rmm::device_scalar<SymbolOffsetT> d_num_selected_tokens(stream, mr);
Expand Down Expand Up @@ -1664,7 +1674,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
tokens.set_element(0, token_t::LineEnd, stream);
validate_token_stream(json_in, tokens, tokens_indices, options, stream);
auto [filtered_tokens, filtered_tokens_indices] =
process_token_stream(tokens, tokens_indices, stream);
process_token_stream(tokens, tokens_indices, options.is_nullify_empty_lines(), stream);
tokens = std::move(filtered_tokens);
tokens_indices = std::move(filtered_tokens_indices);
}
Expand Down Expand Up @@ -2090,6 +2100,7 @@ cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& opt
std::vector<std::string> na_values{"", "null"};
na_values.insert(na_values.end(), options.get_na_values().begin(), options.get_na_values().end());
parse_opts.trie_na = cudf::detail::create_serialized_trie(na_values, stream);

return parse_opts;
}

Expand Down
41 changes: 35 additions & 6 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

std::size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1);
auto const num_extra_delimiters = num_delimiter_chars * sources.size();
compression_type const reader_compression = reader_opts.get_compression();
std::size_t const chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;
auto should_load_till_last_source = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_till_last_source ? total_source_size - chunk_offset : chunk_size;

int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced;
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
Expand All @@ -165,7 +165,7 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
// return empty owning datasource buffer
auto empty_buf = rmm::device_buffer(0, stream);
return datasource::owning_buffer<rmm::device_buffer>(std::move(empty_buf));
} else if (!should_load_all_sources) {
} else if (!should_load_till_last_source) {
// Find next delimiter
std::int64_t next_delim_pos = -1;
std::size_t next_subchunk_start = chunk_offset + chunk_size;
Expand Down Expand Up @@ -209,10 +209,37 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
next_delim_pos - first_delim_pos - shift_for_nonzero_offset);
}

// Add delimiter to end of buffer iff
// (i) We are reading till the end of the last source i.e. should_load_till_last_source is
// true (ii) The last character in bufspan is not newline.
// For (ii) in the case of Spark, if the last character is not a newline, it could be the case
// that there are characters after the newline in the last record. We then consider those
// characters to be a part of a new (possibly empty) line.
size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset;
if (num_chars) {
char last_char;
CUDF_CUDA_TRY(cudaMemcpyAsync(&last_char,
reinterpret_cast<char*>(buffer.data()) + readbufspan.size() - 1,
sizeof(char),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
shrshi marked this conversation as resolved.
Show resolved Hide resolved
if (last_char != '\n') {
last_char = '\n';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hardcoded to \n, should it be delimiter mentioned in json reader options instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Qn: Does this force reader to do extra copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I've changed the hardcoded \n to the reader options delimiter. Also fixed it in a few other places.

Yes, we now have two extra copies between host and device each of size 1 byte. We also perform a stream sync between the copies. I'll run the JSON benchmarks to see what the impact of this change is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we merge #17028, we need to check the last character in the buffer only when both nullify_empty_rows and recover_with_null are enabled. Otherwise, I think we can always add a delimiter since empty rows are anyway ignored.

CUDF_CUDA_TRY(cudaMemcpyAsync(reinterpret_cast<char*>(buffer.data()) + readbufspan.size(),
&last_char,
sizeof(char),
cudaMemcpyHostToDevice,
stream.value()));
num_chars++;
}
}

return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
readbufspan.size() - first_delim_pos - shift_for_nonzero_offset);
num_chars);
}

// Helper function to read the current batch using byte range offsets and size
Expand All @@ -223,6 +250,7 @@ table_with_metadata read_batch(host_span<std::unique_ptr<datasource>> sources,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();

datasource::owning_buffer<rmm::device_buffer> bufview =
get_record_range_raw_input(sources, reader_opts, stream);

Expand All @@ -235,6 +263,7 @@ table_with_metadata read_batch(host_span<std::unique_ptr<datasource>> sources,
auto buffer =
cudf::device_span<char const>(reinterpret_cast<char const*>(bufview.data()), bufview.size());
stream.synchronize();

return device_parse_nested_json(buffer, reader_opts, stream, mr);
}

Expand Down
45 changes: 45 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/debug_utilities.hpp>
shrshi marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/random.hpp>
Expand Down Expand Up @@ -2975,4 +2976,48 @@ TEST_F(JsonReaderTest, JsonDtypeSchema)
cudf::test::debug_output_level::ALL_ERRORS);
}

/**
* @brief Test fixture for parametrized JSON reader tests
*/
struct JsonReaderEmptyRecordTest : public cudf::test::BaseFixture,
public testing::WithParamInterface<bool> {};

// Parametrize qualifying JSON tests for optionally nullifying empty records
INSTANTIATE_TEST_CASE_P(JsonReaderEmptyRecordTest,
JsonReaderEmptyRecordTest,
::testing::Values(true, false));

TEST_P(JsonReaderEmptyRecordTest, HandlingEmptyRecords)
{
std::string data = R"(
{"key": "1"}
{"key": "})";
bool const enable_nullify_empty_rows = GetParam();
std::map<std::string, cudf::io::schema_element> schema{{"key", {dtype<cudf::string_view>()}}};
auto opts =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.dtypes(schema)
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL)
.nullify_empty_lines(enable_nullify_empty_rows)
.build();
auto const result = cudf::io::read_json(opts);

EXPECT_EQ(result.tbl->num_columns(), 1);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING);
EXPECT_EQ(result.metadata.schema_info[0].name, "key");
auto const result_view = result.tbl->view().column(0);

if (!enable_nullify_empty_rows) {
EXPECT_EQ(result.tbl->num_rows(), 2);
cudf::test::strings_column_wrapper expected{{"1", ""}, cudf::test::iterators::nulls_at({1})};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result_view, expected);
} else {
EXPECT_EQ(result.tbl->num_rows(), 3);
cudf::test::strings_column_wrapper expected{{"", "1", ""},
cudf::test::iterators::nulls_at({0, 2})};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result_view, expected);
}
}

CUDF_TEST_PROGRAM_MAIN()
2 changes: 1 addition & 1 deletion cpp/tests/io/json/nested_json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ TEST_F(JsonTest, PostProcessTokenStream)

// Run system-under-test
auto [d_filtered_tokens, d_filtered_indices] =
cuio_json::detail::process_token_stream(d_tokens, d_offsets, stream);
cuio_json::detail::process_token_stream(d_tokens, d_offsets, false, stream);

auto const filtered_tokens = cudf::detail::make_std_vector_async(d_filtered_tokens, stream);
auto const filtered_indices = cudf::detail::make_std_vector_async(d_filtered_indices, stream);
Expand Down
Loading