Skip to content

Commit

Permalink
Cast only time of day to nanos to avoid an overflow in Parquet INT96 …
Browse files Browse the repository at this point in the history
…write (#13776)

Rework extraction of nanoseconds of the last day in INT96 write call path to avoid overflow. Contributes to NVIDIA/spark-rapids#8625

Fixes #8070

Authors:
  - Gera Shegalov (https://github.com/gerashegalov)

Approvers:
  - Robert (Bobby) Evans (https://github.com/revans2)
  - MithunR (https://github.com/mythrocks)
  - Karthikeyan (https://github.com/karthikeyann)
  - Nghia Truong (https://github.com/ttnghia)

URL: #13776
  • Loading branch information
gerashegalov authored Aug 2, 2023
1 parent 5e8fd8e commit c412480
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
39 changes: 21 additions & 18 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -933,22 +933,24 @@ constexpr auto julian_calendar_epoch_diff()
}

/**
* @brief Converts a timestamp_ns into a pair with nanoseconds since midnight and number of Julian
* days. Does not deal with time zones. Used by INT96 code.
* @brief Converts number `v` of periods of type `PeriodT` into a pair with nanoseconds since
* midnight and number of Julian days. Does not deal with time zones. Used by INT96 code.
*
* @param ns number of nanoseconds since epoch
* @return std::pair<nanoseconds,days> where nanoseconds is the number of nanoseconds
* @tparam PeriodT a ratio representing the tick period in duration
* @param v count of ticks since epoch
* @return A pair of (nanoseconds, days) where nanoseconds is the number of nanoseconds
* elapsed in the day and days is the number of days from Julian epoch.
*/
static __device__ std::pair<duration_ns, duration_D> convert_nanoseconds(timestamp_ns const ns)
template <typename PeriodT>
__device__ auto julian_days_with_time(int64_t v)
{
using namespace cuda::std::chrono;
auto const nanosecond_ticks = ns.time_since_epoch();
auto const gregorian_days = floor<days>(nanosecond_ticks);
auto const julian_days = gregorian_days + ceil<days>(julian_calendar_epoch_diff());

auto const last_day_ticks = nanosecond_ticks - gregorian_days;
return {last_day_ticks, julian_days};
auto const dur_total = duration<int64_t, PeriodT>{v};
auto const dur_days = floor<days>(dur_total);
auto const dur_time_of_day = dur_total - dur_days;
auto const dur_time_of_day_nanos = duration_cast<nanoseconds>(dur_time_of_day);
auto const julian_days = dur_days + ceil<days>(julian_calendar_epoch_diff());
return std::make_pair(dur_time_of_day_nanos, julian_days);
}

// blockDim(128, 1, 1)
Expand Down Expand Up @@ -1236,22 +1238,23 @@ __global__ void __launch_bounds__(128, 8)
}
}

auto const ret = convert_nanoseconds([&]() {
auto const [last_day_nanos, julian_days] = [&] {
using namespace cuda::std::chrono;
switch (s->col.leaf_column->type().id()) {
case type_id::TIMESTAMP_SECONDS:
case type_id::TIMESTAMP_MILLISECONDS: {
return timestamp_ns{duration_ms{v}};
return julian_days_with_time<cuda::std::milli>(v);
} break;
case type_id::TIMESTAMP_MICROSECONDS:
case type_id::TIMESTAMP_NANOSECONDS: {
return timestamp_ns{duration_us{v}};
return julian_days_with_time<cuda::std::micro>(v);
} break;
}
return timestamp_ns{duration_ns{0}};
}());
return julian_days_with_time<cuda::std::nano>(0);
}();

// the 12 bytes of fixed length data.
v = ret.first.count();
v = last_day_nanos.count();
dst[pos + 0] = v;
dst[pos + 1] = v >> 8;
dst[pos + 2] = v >> 16;
Expand All @@ -1260,7 +1263,7 @@ __global__ void __launch_bounds__(128, 8)
dst[pos + 5] = v >> 40;
dst[pos + 6] = v >> 48;
dst[pos + 7] = v >> 56;
uint32_t w = ret.second.count();
uint32_t w = julian_days.count();
dst[pos + 8] = w;
dst[pos + 9] = w >> 8;
dst[pos + 10] = w >> 16;
Expand Down
25 changes: 25 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <cudf/transform.hpp>
#include <cudf/unary.hpp>
#include <cudf/utilities/span.hpp>
#include <cudf/wrappers/timestamps.hpp>

#include <src/io/parquet/compact_protocol_reader.hpp>
#include <src/io/parquet/parquet.hpp>
Expand Down Expand Up @@ -6411,4 +6412,28 @@ TEST_F(ParquetReaderTest, FilterFloatNAN)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected1->view(), result1);
}

TEST_F(ParquetWriterTest, TimestampMicrosINT96NoOverflow)
{
using namespace cuda::std::chrono;
using namespace cudf::io;

column_wrapper<cudf::timestamp_us> big_ts_col{
sys_days{year{3023} / month{7} / day{14}} + 7h + 38min + 45s + 418688us,
sys_days{year{723} / month{3} / day{21}} + 14h + 20min + 13s + microseconds{781ms}};

table_view expected({big_ts_col});
auto filepath = temp_env->get_temp_filepath("BigINT96Timestamp.parquet");

auto const out_opts =
parquet_writer_options::builder(sink_info{filepath}, expected).int96_timestamps(true).build();
write_parquet(out_opts);

auto const in_opts = parquet_reader_options::builder(source_info(filepath))
.timestamp_type(cudf::data_type(cudf::type_id::TIMESTAMP_MICROSECONDS))
.build();
auto const result = read_parquet(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit c412480

Please sign in to comment.