diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 5a77c6749fe..edcc140b191 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -116,7 +116,7 @@ jobs: build_type: pull-request script: ci/test_wheel_cudf.sh wheel-build-dask-cudf: - needs: wheel-tests-cudf + needs: wheel-build-cudf secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.02 with: diff --git a/ci/build_wheel.sh b/ci/build_wheel.sh index ae1d9c3fb1a..9c674518810 100755 --- a/ci/build_wheel.sh +++ b/ci/build_wheel.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. set -euo pipefail @@ -23,7 +23,7 @@ pyproject_file="${package_dir}/pyproject.toml" sed -i "s/^name = \"${package_name}\"/name = \"${package_name}${PACKAGE_CUDA_SUFFIX}\"/g" ${pyproject_file} echo "${version}" > VERSION -sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_dir}/${package_name}/_version.py" +sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_dir}/${package_name//-/_}/_version.py" # For nightlies we want to ensure that we're pulling in alphas as well. The # easiest way to do so is to augment the spec with a constraint containing a @@ -34,7 +34,7 @@ if ! rapids-is-release-build; then alpha_spec=',>=0.0.0a0' fi -if [[ ${package_name} == "dask_cudf" ]]; then +if [[ ${package_name} == "dask-cudf" ]]; then sed -r -i "s/cudf==(.*)\"/cudf${PACKAGE_CUDA_SUFFIX}==\1${alpha_spec}\"/g" ${pyproject_file} sed -r -i "s/dask-cuda==(.*)\"/dask-cuda==\1${alpha_spec}\"/g" ${pyproject_file} sed -r -i "s/rapids-dask-dependency==(.*)\"/rapids-dask-dependency==\1${alpha_spec}\"/g" ${pyproject_file} diff --git a/ci/build_wheel_dask_cudf.sh b/ci/build_wheel_dask_cudf.sh index 47e35c46004..b09c1e51271 100755 --- a/ci/build_wheel_dask_cudf.sh +++ b/ci/build_wheel_dask_cudf.sh @@ -1,11 +1,11 @@ #!/bin/bash -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. set -euo pipefail package_dir="python/dask_cudf" -./ci/build_wheel.sh dask_cudf ${package_dir} +./ci/build_wheel.sh dask-cudf ${package_dir} RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})" RAPIDS_PY_WHEEL_NAME="dask_cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 ${package_dir}/dist diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 2c723146f35..21b540e24ab 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -51,7 +51,6 @@ dependencies: - librdkafka>=1.9.0,<1.10.0a0 - librmm==24.2.* - make -- mimesis>=4.1.0 - moto>=4.0.8 - msgpack-python - myst-nb diff --git a/conda/environments/all_cuda-120_arch-x86_64.yaml b/conda/environments/all_cuda-120_arch-x86_64.yaml index 52ef95c335a..c109dcca625 100644 --- a/conda/environments/all_cuda-120_arch-x86_64.yaml +++ b/conda/environments/all_cuda-120_arch-x86_64.yaml @@ -50,7 +50,6 @@ dependencies: - librdkafka>=1.9.0,<1.10.0a0 - librmm==24.2.* - make -- mimesis>=4.1.0 - moto>=4.0.8 - msgpack-python - myst-nb diff --git a/cpp/include/cudf/detail/offsets_iterator.cuh b/cpp/include/cudf/detail/offsets_iterator.cuh index 3eb77b32353..15b334245ff 100644 --- a/cpp/include/cudf/detail/offsets_iterator.cuh +++ b/cpp/include/cudf/detail/offsets_iterator.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,10 +63,11 @@ struct input_offsetalator : base_normalator { * * Use the indexalator_factory to create an iterator instance. * - * @param data Pointer to an integer array in device memory. - * @param dtype Type of data in data + * @param data Pointer to an integer array in device memory + * @param dtype Type of data in data + * @param offset Index value within `offsets` to use as the beginning of the iterator */ - CUDF_HOST_DEVICE input_offsetalator(void const* data, data_type dtype) + CUDF_HOST_DEVICE input_offsetalator(void const* data, data_type dtype, size_type offset = 0) : base_normalator( dtype, dtype.id() == type_id::INT32 ? sizeof(int32_t) : sizeof(int64_t)), p_{static_cast(data)} @@ -78,6 +79,7 @@ struct input_offsetalator : base_normalator { cudf_assert((dtype.id() == type_id::INT32 || dtype.id() == type_id::INT64) && "Unexpected offsets type"); #endif + p_ += (this->width_ * offset); } protected: diff --git a/cpp/include/cudf/detail/offsets_iterator_factory.cuh b/cpp/include/cudf/detail/offsets_iterator_factory.cuh index 5b4c6b825d2..e234f9ec627 100644 --- a/cpp/include/cudf/detail/offsets_iterator_factory.cuh +++ b/cpp/include/cudf/detail/offsets_iterator_factory.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,14 +28,19 @@ namespace detail { struct offsetalator_factory { /** * @brief Create an input offsetalator instance from an offsets column + * + * @param offsets Column to wrap with an offsetalator + * @param offset Index value within `offsets` to use as the beginning of the iterator */ - static input_offsetalator make_input_iterator(column_view const& offsets) + static input_offsetalator make_input_iterator(column_view const& offsets, size_type offset = 0) { - return input_offsetalator(offsets.head(), offsets.type()); + return input_offsetalator(offsets.head(), offsets.type(), offset); } /** * @brief Create an output offsetalator instance from an offsets column + * + * @param offsets Column to wrap with an offsetalator */ static output_offsetalator make_output_iterator(mutable_column_view const& offsets) { diff --git a/cpp/include/cudf/io/detail/orc.hpp b/cpp/include/cudf/io/detail/orc.hpp index 4ec8d2e8c2a..a0bf8b24b80 100644 --- a/cpp/include/cudf/io/detail/orc.hpp +++ b/cpp/include/cudf/io/detail/orc.hpp @@ -124,6 +124,14 @@ class writer { * @brief Finishes the chunked/streamed write process. */ void close(); + + /** + * @brief Skip work done in `close()`; should be called if `write()` failed. + * + * Calling skip_close() prevents the writer from writing the (invalid) file footer and the + * postscript. + */ + void skip_close(); }; } // namespace orc::detail } // namespace cudf::io diff --git a/cpp/include/cudf/strings/detail/gather.cuh b/cpp/include/cudf/strings/detail/gather.cuh index 1523a81d63f..e681373e6e0 100644 --- a/cpp/include/cudf/strings/detail/gather.cuh +++ b/cpp/include/cudf/strings/detail/gather.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -79,7 +80,7 @@ __forceinline__ __device__ uint4 load_uint4(char const* ptr) template __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin, char* out_chars, - cudf::device_span const out_offsets, + cudf::detail::input_offsetalator const out_offsets, MapIterator string_indices, size_type total_out_strings) { @@ -109,28 +110,25 @@ __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin, // between `[out_start_aligned, out_end_aligned)` will be copied using uint4. // `out_start + 4` and `out_end - 4` are used instead of `out_start` and `out_end` to avoid // `load_uint4` reading beyond string boundaries. - int32_t out_start_aligned = + int64_t const out_start_aligned = (out_start + in_datatype_size + alignment_offset + out_datatype_size - 1) / out_datatype_size * out_datatype_size - alignment_offset; - int32_t out_end_aligned = + int64_t const out_end_aligned = (out_end - in_datatype_size + alignment_offset) / out_datatype_size * out_datatype_size - alignment_offset; - for (size_type ichar = out_start_aligned + warp_lane * out_datatype_size; - ichar < out_end_aligned; + for (int64_t ichar = out_start_aligned + warp_lane * out_datatype_size; ichar < out_end_aligned; ichar += cudf::detail::warp_size * out_datatype_size) { *(out_chars_aligned + (ichar + alignment_offset) / out_datatype_size) = load_uint4(in_start + ichar - out_start); } - // Tail logic: copy characters of the current string outside `[out_start_aligned, - // out_end_aligned)`. + // Copy characters of the current string outside [out_start_aligned, out_end_aligned) if (out_end_aligned <= out_start_aligned) { // In this case, `[out_start_aligned, out_end_aligned)` is an empty set, and we copy the // entire string. - for (int32_t ichar = out_start + warp_lane; ichar < out_end; - ichar += cudf::detail::warp_size) { + for (auto ichar = out_start + warp_lane; ichar < out_end; ichar += cudf::detail::warp_size) { out_chars[ichar] = in_start[ichar - out_start]; } } else { @@ -139,7 +137,7 @@ __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin, out_chars[out_start + warp_lane] = in_start[warp_lane]; } // Copy characters in range `[out_end_aligned, out_end)`. - int32_t ichar = out_end_aligned + warp_lane; + auto const ichar = out_end_aligned + warp_lane; if (ichar < out_end) { out_chars[ichar] = in_start[ichar - out_start]; } } } @@ -164,11 +162,11 @@ __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin, template __global__ void gather_chars_fn_char_parallel(StringIterator strings_begin, char* out_chars, - cudf::device_span const out_offsets, + cudf::detail::input_offsetalator const out_offsets, MapIterator string_indices, size_type total_out_strings) { - __shared__ int32_t out_offsets_threadblock[strings_per_threadblock + 1]; + __shared__ int64_t out_offsets_threadblock[strings_per_threadblock + 1]; // Current thread block will process output strings starting at `begin_out_string_idx`. size_type begin_out_string_idx = blockIdx.x * strings_per_threadblock; @@ -185,7 +183,7 @@ __global__ void gather_chars_fn_char_parallel(StringIterator strings_begin, } __syncthreads(); - for (int32_t out_ibyte = threadIdx.x + out_offsets_threadblock[0]; + for (int64_t out_ibyte = threadIdx.x + out_offsets_threadblock[0]; out_ibyte < out_offsets_threadblock[strings_current_threadblock]; out_ibyte += blockDim.x) { // binary search for the string index corresponding to out_ibyte @@ -197,7 +195,7 @@ __global__ void gather_chars_fn_char_parallel(StringIterator strings_begin, size_type string_idx = thrust::distance(out_offsets_threadblock, string_idx_iter); // calculate which character to load within the string - int32_t icharacter = out_ibyte - out_offsets_threadblock[string_idx]; + auto const icharacter = out_ibyte - out_offsets_threadblock[string_idx]; size_type in_string_idx = string_indices[begin_out_string_idx + string_idx]; out_chars[out_ibyte] = strings_begin[in_string_idx].data()[icharacter]; @@ -227,7 +225,7 @@ template std::unique_ptr gather_chars(StringIterator strings_begin, MapIterator map_begin, MapIterator map_end, - cudf::device_span const offsets, + cudf::detail::input_offsetalator const offsets, size_type chars_bytes, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) @@ -300,7 +298,9 @@ std::unique_ptr gather(strings_column_view const& strings, // build offsets column auto const d_strings = column_device_view::create(strings.parent(), stream); - auto const d_in_offsets = !strings.is_empty() ? strings.offsets_begin() : nullptr; + auto const d_in_offsets = cudf::detail::offsetalator_factory::make_input_iterator( + strings.is_empty() ? make_empty_column(type_id::INT32)->view() : strings.offsets(), + strings.offset()); auto offsets_itr = thrust::make_transform_iterator( begin, @@ -308,14 +308,15 @@ std::unique_ptr gather(strings_column_view const& strings, [d_strings = *d_strings, d_in_offsets] __device__(size_type idx) { if (NullifyOutOfBounds && (idx < 0 || idx >= d_strings.size())) { return 0; } if (not d_strings.is_valid(idx)) { return 0; } - return d_in_offsets[idx + 1] - d_in_offsets[idx]; + return static_cast(d_in_offsets[idx + 1] - d_in_offsets[idx]); })); auto [out_offsets_column, total_bytes] = cudf::detail::make_offsets_child_column(offsets_itr, offsets_itr + output_count, stream, mr); // build chars column - auto const offsets_view = out_offsets_column->view(); - auto out_chars_column = gather_chars( + auto const offsets_view = + cudf::detail::offsetalator_factory::make_input_iterator(out_offsets_column->view()); + auto out_chars_column = gather_chars( d_strings->begin(), begin, end, offsets_view, total_bytes, stream, mr); return make_strings_column(output_count, diff --git a/cpp/include/cudf/strings/detail/strings_column_factories.cuh b/cpp/include/cudf/strings/detail/strings_column_factories.cuh index 15b1c2bfec4..de7db4ce47b 100644 --- a/cpp/include/cudf/strings/detail/strings_column_factories.cuh +++ b/cpp/include/cudf/strings/detail/strings_column_factories.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,9 +103,8 @@ std::unique_ptr make_strings_column(IndexPairIterator begin, auto const avg_bytes_per_row = bytes / std::max(strings_count - null_count, 1); // use a character-parallel kernel for long string lengths if (avg_bytes_per_row > FACTORY_BYTES_PER_ROW_THRESHOLD) { - auto const d_data = offsets_view.template data(); auto const d_offsets = - device_span{d_data, static_cast(offsets_view.size())}; + cudf::detail::offsetalator_factory::make_input_iterator(offsets_view); auto const str_begin = thrust::make_transform_iterator( begin, cuda::proclaim_return_type([] __device__(auto ip) { return string_view{ip.first, ip.second}; diff --git a/cpp/include/cudf/strings/detail/utilities.hpp b/cpp/include/cudf/strings/detail/utilities.hpp index 41a2654dce3..e279ee2eb65 100644 --- a/cpp/include/cudf/strings/detail/utilities.hpp +++ b/cpp/include/cudf/strings/detail/utilities.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,20 @@ rmm::device_uvector create_string_vector_from_column( rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); +/** + * @brief Return a normalized offset value from a strings offsets column + * + * @throw std::invalid_argument if `offsets` is neither INT32 nor INT64 + * + * @param offsets Input column of type INT32 or INT64 + * @param index Row value to retrieve + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Value at `offsets[index]` + */ +int64_t get_offset_value(cudf::column_view const& offsets, + size_type index, + rmm::cuda_stream_view stream); + } // namespace detail } // namespace strings } // namespace cudf diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index dea8bdaef79..a9049d5640e 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -436,7 +436,16 @@ void write_orc(orc_writer_options const& options) auto writer = std::make_unique( std::move(sinks[0]), options, io_detail::single_write_mode::YES, cudf::get_default_stream()); - writer->write(options.get_table()); + try { + writer->write(options.get_table()); + } catch (...) { + // If an exception is thrown, the output is incomplete/corrupted. + // Make sure the writer will not close with such corrupted data. + // In addition, the writer may throw an exception while trying to close, which would terminate + // the process. + writer->skip_close(); + throw; + } } /** diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 7436e22e125..1afc0200bfa 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -282,7 +282,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint64 maximum = 2; // optional sint64 sum = 3; // } - if (s->chunk.has_minmax || s->chunk.has_sum) { + { *cur = 2 * 8 + ProtofType::FIXEDLEN; cur += 2; if (s->chunk.has_minmax) { @@ -301,7 +301,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional double maximum = 2; // optional double sum = 3; // } - if (s->chunk.has_minmax || s->chunk.has_sum) { + { *cur = 3 * 8 + ProtofType::FIXEDLEN; cur += 2; if (s->chunk.has_minmax) { @@ -319,14 +319,14 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional string maximum = 2; // optional sint64 sum = 3; // sum will store the total length of all strings // } - if (s->chunk.has_minmax || s->chunk.has_sum) { + { uint32_t sz = 0; if (s->chunk.has_minmax) { sz += (pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) + (pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) + s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length; } - if (s->chunk.has_sum) { sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur; } + sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur; cur[0] = 4 * 8 + ProtofType::FIXEDLEN; cur = pb_encode_uint(cur + 1, sz); @@ -337,7 +337,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) cur = pb_put_binary( cur, 2, s->chunk.max_value.str_val.ptr, s->chunk.max_value.str_val.length); } - if (s->chunk.has_sum) { cur = pb_put_int(cur, 3, s->chunk.sum.i_val); } + cur = pb_put_int(cur, 3, s->chunk.sum.i_val); } break; case dtype_bool: @@ -345,7 +345,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // message BucketStatistics { // repeated uint64 count = 1 [packed=true]; // } - if (s->chunk.has_sum) { + { cur[0] = 5 * 8 + ProtofType::FIXEDLEN; // count is equal to the number of 'true' values, despite what specs say cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val); @@ -360,7 +360,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional string maximum = 2; // optional string sum = 3; // } - if (s->chunk.has_minmax or s->chunk.has_sum) { + { auto const scale = s->group.col_dtype.scale(); uint32_t sz = 0; @@ -373,9 +373,8 @@ __global__ void __launch_bounds__(encode_threads_per_block) sz += (pb_put_uint(cur, 1, min_size) - cur) + min_size + (pb_put_uint(cur, 1, max_size) - cur) + max_size; } - auto const sum_size = - s->chunk.has_sum ? fixed_point_string_size(s->chunk.sum.d128_val, scale) : 0; - if (s->chunk.has_sum) { sz += (pb_put_uint(cur, 1, sum_size) - cur) + sum_size; } + auto const sum_size = fixed_point_string_size(s->chunk.sum.d128_val, scale); + sz += (pb_put_uint(cur, 1, sum_size) - cur) + sum_size; cur[0] = 6 * 8 + ProtofType::FIXEDLEN; cur = pb_encode_uint(cur + 1, sz); @@ -384,9 +383,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) cur = pb_put_decimal(cur, 1, s->chunk.min_value.d128_val, scale, min_size); // minimum cur = pb_put_decimal(cur, 2, s->chunk.max_value.d128_val, scale, max_size); // maximum } - if (s->chunk.has_sum) { - cur = pb_put_decimal(cur, 3, s->chunk.sum.d128_val, scale, sum_size); // sum - } + cur = pb_put_decimal(cur, 3, s->chunk.sum.d128_val, scale, sum_size); // sum } break; case dtype_date32: @@ -395,11 +392,13 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint32 minimum = 1; // optional sint32 maximum = 2; // } - if (s->chunk.has_minmax) { + { cur[0] = 7 * 8 + ProtofType::FIXEDLEN; cur += 2; - cur = pb_put_int(cur, 1, s->chunk.min_value.i_val); - cur = pb_put_int(cur, 2, s->chunk.max_value.i_val); + if (s->chunk.has_minmax) { + cur = pb_put_int(cur, 1, s->chunk.min_value.i_val); + cur = pb_put_int(cur, 2, s->chunk.max_value.i_val); + } fld_start[1] = cur - (fld_start + 2); } break; @@ -414,31 +413,32 @@ __global__ void __launch_bounds__(encode_threads_per_block) // precision // optional int32 maximumNanos = 6; // } - if (s->chunk.has_minmax) { + { cur[0] = 9 * 8 + ProtofType::FIXEDLEN; cur += 2; - auto const [min_ms, min_ns_remainder] = - split_nanosecond_timestamp(s->chunk.min_value.i_val); - auto const [max_ms, max_ns_remainder] = - split_nanosecond_timestamp(s->chunk.max_value.i_val); - - // minimum/maximum are the same as minimumUtc/maximumUtc as we always write files in UTC - cur = pb_put_int(cur, 1, min_ms); // minimum - cur = pb_put_int(cur, 2, max_ms); // maximum - cur = pb_put_int(cur, 3, min_ms); // minimumUtc - cur = pb_put_int(cur, 4, max_ms); // maximumUtc - - if constexpr (enable_nanosecond_statistics) { - if (min_ns_remainder != DEFAULT_MIN_NANOS) { - // using uint because positive values are not zigzag encoded - cur = pb_put_uint(cur, 5, min_ns_remainder + 1); // minimumNanos - } - if (max_ns_remainder != DEFAULT_MAX_NANOS) { - // using uint because positive values are not zigzag encoded - cur = pb_put_uint(cur, 6, max_ns_remainder + 1); // maximumNanos + if (s->chunk.has_minmax) { + auto const [min_ms, min_ns_remainder] = + split_nanosecond_timestamp(s->chunk.min_value.i_val); + auto const [max_ms, max_ns_remainder] = + split_nanosecond_timestamp(s->chunk.max_value.i_val); + + // minimum/maximum are the same as minimumUtc/maximumUtc as we always write files in UTC + cur = pb_put_int(cur, 1, min_ms); // minimum + cur = pb_put_int(cur, 2, max_ms); // maximum + cur = pb_put_int(cur, 3, min_ms); // minimumUtc + cur = pb_put_int(cur, 4, max_ms); // maximumUtc + + if constexpr (enable_nanosecond_statistics) { + if (min_ns_remainder != DEFAULT_MIN_NANOS) { + // using uint because positive values are not zigzag encoded + cur = pb_put_uint(cur, 5, min_ns_remainder + 1); // minimumNanos + } + if (max_ns_remainder != DEFAULT_MAX_NANOS) { + // using uint because positive values are not zigzag encoded + cur = pb_put_uint(cur, 6, max_ns_remainder + 1); // maximumNanos + } } } - fld_start[1] = cur - (fld_start + 2); } break; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 42eea37e695..cef4915e0c9 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -379,11 +379,26 @@ __global__ void copy_string_data(char* string_pool, } // namespace +intermediate_statistics::intermediate_statistics(orc_table_view const& table, + rmm::cuda_stream_view stream) + : stripe_stat_chunks(0, stream) +{ + std::transform( + table.columns.begin(), table.columns.end(), std::back_inserter(col_types), [](auto const& col) { + return col.type(); + }); +} + void persisted_statistics::persist(int num_table_rows, single_write_mode write_mode, intermediate_statistics&& intermediate_stats, rmm::cuda_stream_view stream) { + stats_dtypes = std::move(intermediate_stats.stats_dtypes); + col_types = std::move(intermediate_stats.col_types); + num_rows = num_table_rows; + if (num_rows == 0) { return; } + if (write_mode == single_write_mode::NO) { // persist the strings in the chunks into a string pool and update pointers auto const num_chunks = static_cast(intermediate_stats.stripe_stat_chunks.size()); @@ -417,9 +432,6 @@ void persisted_statistics::persist(int num_table_rows, stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks)); stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge)); - stats_dtypes = std::move(intermediate_stats.stats_dtypes); - col_types = std::move(intermediate_stats.col_types); - num_rows = num_table_rows; } namespace { @@ -1127,7 +1139,7 @@ void set_stat_desc_leaf_cols(device_span columns, cudf::detail::hostdevice_vector allocate_and_encode_blobs( cudf::detail::hostdevice_vector& stats_merge_groups, - rmm::device_uvector& stat_chunks, + device_span stat_chunks, int num_stat_blobs, rmm::cuda_stream_view stream) { @@ -1148,6 +1160,24 @@ cudf::detail::hostdevice_vector allocate_and_encode_blobs( return blobs; } +[[nodiscard]] statistics_dtype kind_to_stats_type(TypeKind kind) +{ + switch (kind) { + case TypeKind::BOOLEAN: return dtype_bool; + case TypeKind::BYTE: return dtype_int8; + case TypeKind::SHORT: return dtype_int16; + case TypeKind::INT: return dtype_int32; + case TypeKind::LONG: return dtype_int64; + case TypeKind::FLOAT: return dtype_float32; + case TypeKind::DOUBLE: return dtype_float64; + case TypeKind::STRING: return dtype_string; + case TypeKind::DATE: return dtype_int32; + case TypeKind::TIMESTAMP: return dtype_timestamp64; + case TypeKind::DECIMAL: return dtype_decimal64; + default: return dtype_none; + } +} + /** * @brief Returns column statistics in an intermediate format. * @@ -1166,7 +1196,7 @@ intermediate_statistics gather_statistic_blobs(statistics_freq const stats_freq, auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns(); auto const are_statistics_enabled = stats_freq != statistics_freq::STATISTICS_NONE; if (not are_statistics_enabled or num_rowgroup_blobs + num_stripe_blobs == 0) { - return intermediate_statistics{stream}; + return intermediate_statistics{orc_table, stream}; } cudf::detail::hostdevice_vector stat_desc(orc_table.num_columns(), stream); @@ -1180,22 +1210,9 @@ intermediate_statistics gather_statistic_blobs(statistics_freq const stats_freq, for (auto const& column : orc_table.columns) { stats_column_desc* desc = &stat_desc[column.index()]; - switch (column.orc_kind()) { - case TypeKind::BYTE: desc->stats_dtype = dtype_int8; break; - case TypeKind::SHORT: desc->stats_dtype = dtype_int16; break; - case TypeKind::INT: desc->stats_dtype = dtype_int32; break; - case TypeKind::LONG: desc->stats_dtype = dtype_int64; break; - case TypeKind::FLOAT: desc->stats_dtype = dtype_float32; break; - case TypeKind::DOUBLE: desc->stats_dtype = dtype_float64; break; - case TypeKind::BOOLEAN: desc->stats_dtype = dtype_bool; break; - case TypeKind::DATE: desc->stats_dtype = dtype_int32; break; - case TypeKind::DECIMAL: desc->stats_dtype = dtype_decimal64; break; - case TypeKind::TIMESTAMP: desc->stats_dtype = dtype_timestamp64; break; - case TypeKind::STRING: desc->stats_dtype = dtype_string; break; - default: desc->stats_dtype = dtype_none; break; - } - desc->num_rows = column.size(); - desc->num_values = column.size(); + desc->stats_dtype = kind_to_stats_type(column.orc_kind()); + desc->num_rows = column.size(); + desc->num_values = column.size(); if (desc->stats_dtype == dtype_timestamp64) { // Timestamp statistics are in milliseconds switch (column.scale()) { @@ -1293,20 +1310,53 @@ intermediate_statistics gather_statistic_blobs(statistics_freq const stats_freq, * @param stream CUDA stream used for device memory operations and kernel launches * @return The encoded statistic blobs */ -encoded_footer_statistics finish_statistic_blobs(int num_stripes, +encoded_footer_statistics finish_statistic_blobs(FileFooter const& file_footer, persisted_statistics& per_chunk_stats, rmm::cuda_stream_view stream) { auto stripe_size_iter = thrust::make_transform_iterator(per_chunk_stats.stripe_stat_merge.begin(), - [](auto const& i) { return i.size(); }); + [](auto const& s) { return s.size(); }); + + auto const num_columns = file_footer.types.size() - 1; + auto const num_stripes = file_footer.stripes.size(); - auto const num_columns = per_chunk_stats.col_types.size(); auto const num_stripe_blobs = thrust::reduce(stripe_size_iter, stripe_size_iter + per_chunk_stats.stripe_stat_merge.size()); auto const num_file_blobs = num_columns; auto const num_blobs = static_cast(num_stripe_blobs + num_file_blobs); - if (num_stripe_blobs == 0) { return {}; } + if (num_stripe_blobs == 0) { + if (num_file_blobs == 0) { return {}; } + + // Create empty file stats and merge groups + std::vector h_stat_chunks(num_file_blobs); + cudf::detail::hostdevice_vector stats_merge(num_file_blobs, stream); + // Fill in stats_merge and stat_chunks on the host + for (auto i = 0u; i < num_file_blobs; ++i) { + stats_merge[i].col_dtype = per_chunk_stats.col_types[i]; + stats_merge[i].stats_dtype = kind_to_stats_type(file_footer.types[i + 1].kind); + // Write the sum for empty columns, equal to zero + h_stat_chunks[i].has_sum = true; + } + // Copy to device + auto const d_stat_chunks = cudf::detail::make_device_uvector_async( + h_stat_chunks, stream, rmm::mr::get_current_device_resource()); + stats_merge.host_to_device_async(stream); + + // Encode and return + cudf::detail::hostdevice_vector hd_file_blobs = + allocate_and_encode_blobs(stats_merge, d_stat_chunks, num_file_blobs, stream); + + // Copy blobs to host (actual size) + std::vector file_blobs(num_file_blobs); + for (auto i = 0u; i < num_file_blobs; i++) { + auto const stat_begin = hd_file_blobs.host_ptr(stats_merge[i].start_chunk); + auto const stat_end = stat_begin + stats_merge[i].num_chunks; + file_blobs[i].assign(stat_begin, stat_end); + } + + return {{}, std::move(file_blobs)}; + } // merge the stripe persisted data and add file data rmm::device_uvector stat_chunks(num_blobs, stream); @@ -2277,7 +2327,7 @@ auto convert_table_to_orc_data(table_view const& input, rmm::device_uvector{0, stream}, // compressed_data cudf::detail::hostdevice_vector{}, // comp_results std::move(strm_descs), - intermediate_statistics{stream}, + intermediate_statistics{orc_table, stream}, std::optional{}, std::move(streams), std::move(stripes), @@ -2488,10 +2538,8 @@ void writer::impl::update_statistics( intermediate_statistics&& intermediate_stats, std::optional const& compression_stats) { - if (intermediate_stats.stripe_stat_chunks.size() > 0) { - _persisted_stripe_statistics.persist( - num_rows, _single_write_mode, std::move(intermediate_stats), _stream); - } + _persisted_stripe_statistics.persist( + num_rows, _single_write_mode, std::move(intermediate_stats), _stream); if (compression_stats.has_value() and _compression_statistics != nullptr) { *_compression_statistics += compression_stats.value(); @@ -2637,36 +2685,50 @@ void writer::impl::close() _closed = true; PostScript ps; - auto const statistics = - finish_statistic_blobs(_ffooter.stripes.size(), _persisted_stripe_statistics, _stream); - - // File-level statistics - if (not statistics.file_level.empty()) { - ProtobufWriter pbw; - pbw.put_uint(encode_field_number(1)); - pbw.put_uint(_persisted_stripe_statistics.num_rows); - // First entry contains total number of rows - _ffooter.statistics.reserve(_ffooter.types.size()); - _ffooter.statistics.emplace_back(pbw.release()); - // Add file stats, stored after stripe stats in `column_stats` - _ffooter.statistics.insert(_ffooter.statistics.end(), - std::make_move_iterator(statistics.file_level.begin()), - std::make_move_iterator(statistics.file_level.end())); - } - - // Stripe-level statistics - if (not statistics.stripe_level.empty()) { - _orc_meta.stripeStats.resize(_ffooter.stripes.size()); - for (size_t stripe_id = 0; stripe_id < _ffooter.stripes.size(); stripe_id++) { - _orc_meta.stripeStats[stripe_id].colStats.resize(_ffooter.types.size()); + if (_stats_freq != statistics_freq::STATISTICS_NONE) { + // Write column statistics + auto statistics = finish_statistic_blobs(_ffooter, _persisted_stripe_statistics, _stream); + + // File-level statistics + { + _ffooter.statistics.reserve(_ffooter.types.size()); ProtobufWriter pbw; + + // Root column: number of rows pbw.put_uint(encode_field_number(1)); - pbw.put_uint(_ffooter.stripes[stripe_id].numberOfRows); - _orc_meta.stripeStats[stripe_id].colStats[0] = pbw.release(); - for (size_t col_idx = 0; col_idx < _ffooter.types.size() - 1; col_idx++) { - size_t idx = _ffooter.stripes.size() * col_idx + stripe_id; - _orc_meta.stripeStats[stripe_id].colStats[1 + col_idx] = - std::move(statistics.stripe_level[idx]); + pbw.put_uint(_persisted_stripe_statistics.num_rows); + // Root column: has nulls + pbw.put_uint(encode_field_number(10)); + pbw.put_uint(0); + _ffooter.statistics.emplace_back(pbw.release()); + + // Add file stats, stored after stripe stats in `column_stats` + _ffooter.statistics.insert(_ffooter.statistics.end(), + std::make_move_iterator(statistics.file_level.begin()), + std::make_move_iterator(statistics.file_level.end())); + } + + // Stripe-level statistics + if (_stats_freq == statistics_freq::STATISTICS_ROWGROUP or + _stats_freq == statistics_freq::STATISTICS_PAGE) { + _orc_meta.stripeStats.resize(_ffooter.stripes.size()); + for (size_t stripe_id = 0; stripe_id < _ffooter.stripes.size(); stripe_id++) { + _orc_meta.stripeStats[stripe_id].colStats.resize(_ffooter.types.size()); + ProtobufWriter pbw; + + // Root column: number of rows + pbw.put_uint(encode_field_number(1)); + pbw.put_uint(_ffooter.stripes[stripe_id].numberOfRows); + // Root column: has nulls + pbw.put_uint(encode_field_number(10)); + pbw.put_uint(0); + _orc_meta.stripeStats[stripe_id].colStats[0] = pbw.release(); + + for (size_t col_idx = 0; col_idx < _ffooter.types.size() - 1; col_idx++) { + size_t idx = _ffooter.stripes.size() * col_idx + stripe_id; + _orc_meta.stripeStats[stripe_id].colStats[1 + col_idx] = + std::move(statistics.stripe_level[idx]); + } } } } @@ -2733,6 +2795,9 @@ writer::~writer() = default; // Forward to implementation void writer::write(table_view const& table) { _impl->write(table); } +// Forward to implementation +void writer::skip_close() { _impl->skip_close(); } + // Forward to implementation void writer::close() { _impl->close(); } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 5e5caa30873..f8ac5515f2e 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -167,6 +167,8 @@ struct stripe_size_limits { struct intermediate_statistics { explicit intermediate_statistics(rmm::cuda_stream_view stream) : stripe_stat_chunks(0, stream) {} + intermediate_statistics(orc_table_view const& table, rmm::cuda_stream_view stream); + intermediate_statistics(std::vector rb, rmm::device_uvector sc, cudf::detail::hostdevice_vector smg, @@ -282,6 +284,11 @@ class writer::impl { */ void close(); + /** + * @brief Skip writing the footer when closing/deleting the writer. + */ + void skip_close() { _closed = true; } + private: /** * @brief Write the intermediate ORC data into the data sink. diff --git a/cpp/src/strings/copying/concatenate.cu b/cpp/src/strings/copying/concatenate.cu index 26cd4fff09b..027466ef13c 100644 --- a/cpp/src/strings/copying/concatenate.cu +++ b/cpp/src/strings/copying/concatenate.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,8 @@ #include #include -#include #include +#include #include #include #include @@ -60,8 +60,8 @@ struct chars_size_transform { __device__ size_t operator()(column_device_view const& col) const { if (col.size() > 0) { - constexpr auto offsets_index = strings_column_view::offsets_column_index; - auto d_offsets = col.child(offsets_index).data(); + auto const offsets = col.child(strings_column_view::offsets_column_index); + auto const d_offsets = cudf::detail::input_offsetalator(offsets.head(), offsets.type()); return d_offsets[col.size() + col.offset()] - d_offsets[col.offset()]; } else { return 0; @@ -112,14 +112,15 @@ auto create_strings_device_views(host_span views, rmm::cuda_s } template -__global__ void fused_concatenate_string_offset_kernel(column_device_view const* input_views, - size_t const* input_offsets, - size_t const* partition_offsets, - size_type const num_input_views, - size_type const output_size, - int32_t* output_data, - bitmask_type* output_mask, - size_type* out_valid_count) +__global__ void fused_concatenate_string_offset_kernel( + column_device_view const* input_views, + size_t const* input_offsets, + size_t const* partition_offsets, + size_type const num_input_views, + size_type const output_size, + cudf::detail::output_offsetalator output_data, + bitmask_type* output_mask, + size_type* out_valid_count) { cudf::thread_index_type output_index = threadIdx.x + blockIdx.x * blockDim.x; size_type warp_valid_count = 0; @@ -132,10 +133,11 @@ __global__ void fused_concatenate_string_offset_kernel(column_device_view const* thrust::seq, input_offsets, input_offsets + num_input_views, output_index)); size_type const partition_index = offset_it - input_offsets; - auto const offset_index = output_index - *offset_it; - auto const& input_view = input_views[partition_index]; - constexpr auto offsets_child = strings_column_view::offsets_column_index; - auto const* input_data = input_view.child(offsets_child).data(); + auto const offset_index = output_index - *offset_it; + auto const& input_view = input_views[partition_index]; + auto const offsets_child = input_view.child(strings_column_view::offsets_column_index); + auto const input_data = + cudf::detail::input_offsetalator(offsets_child.head(), offsets_child.type()); output_data[output_index] = input_data[offset_index + input_view.offset()] // handle parent offset - input_data[input_view.offset()] // subtract first offset if non-zero @@ -186,8 +188,9 @@ __global__ void fused_concatenate_string_chars_kernel(column_device_view const* auto const offset_index = output_index - *offset_it; auto const& input_view = input_views[partition_index]; - constexpr auto offsets_child = strings_column_view::offsets_column_index; - auto const* input_offsets_data = input_view.child(offsets_child).data(); + auto const offsets_child = input_view.child(strings_column_view::offsets_column_index); + auto const input_offsets_data = + cudf::detail::input_offsetalator(offsets_child.head(), offsets_child.type()); constexpr auto chars_child = strings_column_view::chars_column_index; auto const* input_chars_data = input_view.child(chars_child).data(); @@ -225,16 +228,16 @@ std::unique_ptr concatenate(host_span columns, bool const has_nulls = std::any_of(columns.begin(), columns.end(), [](auto const& col) { return col.has_nulls(); }); - // create chars column + // create output chars column auto chars_column = create_chars_child_column(total_bytes, stream, mr); auto d_new_chars = chars_column->mutable_view().data(); chars_column->set_null_count(0); - // create offsets column + // create output offsets column auto offsets_column = make_numeric_column( data_type{type_id::INT32}, offsets_count, mask_state::UNALLOCATED, stream, mr); - auto d_new_offsets = offsets_column->mutable_view().data(); - offsets_column->set_null_count(0); + auto itr_new_offsets = + cudf::detail::offsetalator_factory::make_output_iterator(offsets_column->mutable_view()); rmm::device_buffer null_mask{0, stream, mr}; size_type null_count{}; @@ -256,7 +259,7 @@ std::unique_ptr concatenate(host_span columns, d_partition_offsets.data(), static_cast(columns.size()), strings_count, - d_new_offsets, + itr_new_offsets, reinterpret_cast(null_mask.data()), d_valid_count.data()); @@ -286,14 +289,11 @@ std::unique_ptr concatenate(host_span columns, column_view offsets_child = column->child(strings_column_view::offsets_column_index); column_view chars_child = column->child(strings_column_view::chars_column_index); - auto bytes_offset = - cudf::detail::get_value(offsets_child, column_offset, stream); - + auto const bytes_offset = get_offset_value(offsets_child, column_offset, stream); + auto const bytes_end = get_offset_value(offsets_child, column_size + column_offset, stream); // copy the chars column data - auto d_chars = chars_child.data() + bytes_offset; - auto const bytes = - cudf::detail::get_value(offsets_child, column_size + column_offset, stream) - - bytes_offset; + auto d_chars = chars_child.data() + bytes_offset; + auto const bytes = bytes_end - bytes_offset; CUDF_CUDA_TRY( cudaMemcpyAsync(d_new_chars, d_chars, bytes, cudaMemcpyDefault, stream.value())); diff --git a/cpp/src/strings/utilities.cu b/cpp/src/strings/utilities.cu index c8c68d19ce6..13f4776ca33 100644 --- a/cpp/src/strings/utilities.cu +++ b/cpp/src/strings/utilities.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -128,6 +129,18 @@ special_case_mapping const* get_special_case_mapping_table() }); } +int64_t get_offset_value(cudf::column_view const& offsets, + size_type index, + rmm::cuda_stream_view stream) +{ + auto const otid = offsets.type().id(); + CUDF_EXPECTS(otid == type_id::INT64 || otid == type_id::INT32, + "Offsets must be of type INT32 or INT64", + std::invalid_argument); + return otid == type_id::INT64 ? cudf::detail::get_value(offsets, index, stream) + : cudf::detail::get_value(offsets, index, stream); +} + } // namespace detail } // namespace strings } // namespace cudf diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 234716749ff..5124ac579fd 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1021,6 +1021,8 @@ TEST_F(OrcStatisticsTest, Basic) ASSERT_EQ(stats.size(), expected.num_columns() + 1); auto& s0 = stats[0]; EXPECT_EQ(*s0.number_of_values, 9ul); + EXPECT_TRUE(s0.has_null.has_value()); + EXPECT_FALSE(*s0.has_null); auto& s1 = stats[1]; EXPECT_EQ(*s1.number_of_values, 4ul); @@ -1960,4 +1962,95 @@ TEST_F(OrcWriterTest, UnorderedDictionary) CUDF_TEST_EXPECT_TABLES_EQUAL(*from_sorted, *from_unsorted); } +TEST_F(OrcStatisticsTest, Empty) +{ + int32_col col0{}; + float64_col col1{}; + str_col col2{}; + dec64_col col3{}; + column_wrapper col4; + bool_col col5{}; + table_view expected({col0, col1, col2, col3, col4, col5}); + + std::vector out_buffer; + + cudf::io::orc_writer_options out_opts = + cudf::io::orc_writer_options::builder(cudf::io::sink_info{&out_buffer}, expected); + cudf::io::write_orc(out_opts); + + auto const stats = cudf::io::read_parsed_orc_statistics( + cudf::io::source_info{out_buffer.data(), out_buffer.size()}); + + auto expected_column_names = std::vector{""}; + std::generate_n( + std::back_inserter(expected_column_names), + expected.num_columns(), + [starting_index = 0]() mutable { return "_col" + std::to_string(starting_index++); }); + EXPECT_EQ(stats.column_names, expected_column_names); + + EXPECT_EQ(stats.column_names.size(), 7); + EXPECT_EQ(stats.stripes_stats.size(), 0); + + auto const& fstats = stats.file_stats; + ASSERT_EQ(fstats.size(), 7); + auto& s0 = fstats[0]; + EXPECT_TRUE(s0.number_of_values.has_value()); + EXPECT_EQ(*s0.number_of_values, 0ul); + EXPECT_TRUE(s0.has_null.has_value()); + EXPECT_FALSE(*s0.has_null); + + auto& s1 = fstats[1]; + EXPECT_EQ(*s1.number_of_values, 0ul); + EXPECT_FALSE(*s1.has_null); + auto& ts1 = std::get(s1.type_specific_stats); + EXPECT_FALSE(ts1.minimum.has_value()); + EXPECT_FALSE(ts1.maximum.has_value()); + EXPECT_TRUE(ts1.sum.has_value()); + EXPECT_EQ(*ts1.sum, 0); + + auto& s2 = fstats[2]; + EXPECT_EQ(*s2.number_of_values, 0ul); + EXPECT_FALSE(*s2.has_null); + auto& ts2 = std::get(s2.type_specific_stats); + EXPECT_FALSE(ts2.minimum.has_value()); + EXPECT_FALSE(ts2.maximum.has_value()); + EXPECT_TRUE(ts2.sum.has_value()); + EXPECT_EQ(*ts2.sum, 0); + + auto& s3 = fstats[3]; + EXPECT_EQ(*s3.number_of_values, 0ul); + EXPECT_FALSE(*s3.has_null); + auto& ts3 = std::get(s3.type_specific_stats); + EXPECT_FALSE(ts3.minimum.has_value()); + EXPECT_FALSE(ts3.maximum.has_value()); + EXPECT_TRUE(ts3.sum.has_value()); + EXPECT_EQ(*ts3.sum, 0); + + auto& s4 = fstats[4]; + EXPECT_EQ(*s4.number_of_values, 0ul); + EXPECT_FALSE(*s4.has_null); + auto& ts4 = std::get(s4.type_specific_stats); + EXPECT_FALSE(ts4.minimum.has_value()); + EXPECT_FALSE(ts4.maximum.has_value()); + EXPECT_TRUE(ts4.sum.has_value()); + EXPECT_EQ(*ts4.sum, "0"); + + auto& s5 = fstats[5]; + EXPECT_EQ(*s5.number_of_values, 0ul); + EXPECT_FALSE(*s5.has_null); + auto& ts5 = std::get(s5.type_specific_stats); + EXPECT_FALSE(ts5.minimum.has_value()); + EXPECT_FALSE(ts5.maximum.has_value()); + EXPECT_FALSE(ts5.minimum_utc.has_value()); + EXPECT_FALSE(ts5.maximum_utc.has_value()); + EXPECT_FALSE(ts5.minimum_nanos.has_value()); + EXPECT_FALSE(ts5.maximum_nanos.has_value()); + + auto& s6 = fstats[6]; + EXPECT_EQ(*s6.number_of_values, 0ul); + EXPECT_FALSE(*s6.has_null); + auto& ts6 = std::get(s6.type_specific_stats); + EXPECT_EQ(ts6.count[0], 0); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/dependencies.yaml b/dependencies.yaml index 91ac8371308..94f31240797 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -603,7 +603,6 @@ dependencies: - cramjam - fastavro>=0.22.9 - hypothesis - - mimesis>=4.1.0 - pytest-benchmark - pytest-cases - python-snappy>=0.6.0 @@ -755,7 +754,12 @@ dependencies: - ipython - openpyxl notebook_cuda_version: - common: - - output_types: conda - packages: - - cuda-version=12.0 + specific: + - output_types: conda + matrices: + - matrix: {cuda: "12.0"} + packages: + - cuda-version=12.0 + - matrix: {cuda: "11.8"} + packages: + - cuda-version=11.8 diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index fcfe8a21f05..8d2506403d4 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. from __future__ import annotations @@ -1836,7 +1836,7 @@ def __array_function__(self, func, types, args, kwargs): return NotImplemented @classmethod - def from_pandas(cls, index, nan_as_null=no_default): + def from_pandas(cls, index: pd.Index, nan_as_null=no_default): """ Convert from a Pandas Index. diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index e1b2f7d674d..123f13f8733 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from __future__ import annotations @@ -99,9 +99,14 @@ def _has_nulls(self): @_cudf_nvtx_annotate def serialize(self): + # TODO: See if self._data can be serialized outright header = { "type-serialized": pickle.dumps(type(self)), "column_names": pickle.dumps(tuple(self._data.names)), + "column_rangeindex": pickle.dumps(self._data.rangeindex), + "column_multiindex": pickle.dumps(self._data.multiindex), + "column_label_dtype": pickle.dumps(self._data.label_dtype), + "column_level_names": pickle.dumps(self._data._level_names), } header["columns"], frames = serialize_columns(self._columns) return header, frames @@ -112,7 +117,20 @@ def deserialize(cls, header, frames): cls_deserialize = pickle.loads(header["type-serialized"]) column_names = pickle.loads(header["column_names"]) columns = deserialize_columns(header["columns"], frames) - return cls_deserialize._from_data(dict(zip(column_names, columns))) + kwargs = {} + for metadata in [ + "rangeindex", + "multiindex", + "label_dtype", + "level_names", + ]: + key = f"column_{metadata}" + if key in header: + kwargs[metadata] = pickle.loads(header[key]) + col_accessor = ColumnAccessor( + data=dict(zip(column_names, columns)), **kwargs + ) + return cls_deserialize._from_data(col_accessor) @classmethod @_cudf_nvtx_annotate diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index 4f98a878792..489f0e74dd6 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. from __future__ import annotations @@ -1601,7 +1601,7 @@ def to_pandas(self, *, nullable: bool = False) -> pd.MultiIndex: @classmethod @_cudf_nvtx_annotate - def from_pandas(cls, multiindex, nan_as_null=no_default): + def from_pandas(cls, multiindex: pd.MultiIndex, nan_as_null=no_default): """ Convert from a Pandas MultiIndex diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 3562c83e797..fcb4e77f6a5 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. from __future__ import annotations @@ -57,7 +57,6 @@ TimeDeltaColumn, arange, as_column, - column, full, ) from cudf.core.column.categorical import ( @@ -202,7 +201,6 @@ def __getitem__(self, arg): @_cudf_nvtx_annotate def __setitem__(self, key, value): - from cudf.core.column import column if isinstance(key, tuple): key = list(key) @@ -264,7 +262,7 @@ def __setitem__(self, key, value): self._frame._column.dtype, (cudf.ListDtype, cudf.StructDtype) ) ): - value = column.as_column(value) + value = as_column(value) if ( ( @@ -568,7 +566,7 @@ def from_masked_array(cls, data, mask, null_count=None): 4 14 dtype: int64 """ - col = column.as_column(data).set_mask(mask) + col = as_column(data).set_mask(mask) return cls(data=col) @_cudf_nvtx_annotate @@ -593,73 +591,33 @@ def __init__( "to silence this warning.", FutureWarning, ) - if isinstance(data, pd.Series): - if name is None: - name = data.name - if isinstance(data.index, pd.MultiIndex): - index = cudf.from_pandas(data.index) - else: - index = as_index(data.index) - elif isinstance(data, pd.Index): - if name is None: - name = data.name - data = as_column(data, nan_as_null=nan_as_null, dtype=dtype) - elif isinstance(data, BaseIndex): - if name is None: - name = data.name - data = data._values - if dtype is not None: - data = data.astype(dtype) + index_from_data = None + name_from_data = None + if data is None: + data = {} + + if isinstance(data, (pd.Series, pd.Index, BaseIndex, Series)): + if copy: + data = data.copy(deep=True) + name_from_data = data.name + column = as_column(data, nan_as_null=nan_as_null, dtype=dtype) + if isinstance(data, (pd.Series, Series)): + index_from_data = as_index(data.index) elif isinstance(data, ColumnAccessor): raise TypeError( "Use cudf.Series._from_data for constructing a Series from " "ColumnAccessor" ) - - if isinstance(data, Series): - if index is not None: - data = data.reindex(index) - else: - index = data._index - if name is None: - name = data.name - data = data._column - if copy: - data = data.copy(deep=True) - if dtype is not None: - data = data.astype(dtype) - - if isinstance(data, dict): + elif isinstance(data, dict): if not data: - current_index = RangeIndex(0) + column = as_column(data, nan_as_null=nan_as_null, dtype=dtype) + index_from_data = RangeIndex(0) else: - current_index = data.keys() - if index is not None: - series = Series( - list(data.values()), - nan_as_null=nan_as_null, - dtype=dtype, - index=current_index, - ) - new_index = as_index(index) - if not series.index.equals(new_index): - series = series.reindex(new_index) - data = series._column - index = series._index - else: - data = column.as_column( + column = as_column( list(data.values()), nan_as_null=nan_as_null, dtype=dtype ) - index = current_index - if data is None: - if index is not None: - data = column.column_empty( - row_count=len(index), dtype=None, masked=True - ) - else: - data = {} - - if not isinstance(data, ColumnBase): + index_from_data = as_index(list(data.keys())) + else: # Using `getattr_static` to check if # `data` is on device memory and perform # a deep copy later. This is different @@ -677,25 +635,42 @@ def __init__( ) is property ) - data = column.as_column( + column = as_column( data, nan_as_null=nan_as_null, dtype=dtype, length=len(index) if index is not None else None, ) if copy and has_cai: - data = data.copy(deep=True) - else: - if dtype is not None: - data = data.astype(dtype) + column = column.copy(deep=True) - if index is not None and not isinstance(index, BaseIndex): - index = as_index(index) + assert isinstance(column, ColumnBase) + + if dtype is not None: + column = column.astype(dtype) - assert isinstance(data, ColumnBase) + if name_from_data is not None and name is None: + name = name_from_data - super().__init__({name: data}) - self._index = RangeIndex(len(data)) if index is None else index + if index is not None: + index = as_index(index) + + if index_from_data is not None: + first_index = index_from_data + second_index = index + elif index is None: + first_index = RangeIndex(len(column)) + second_index = None + else: + first_index = index + second_index = None + + super().__init__({name: column}, index=first_index) + if second_index is not None: + # TODO: This there a better way to do this? + reindexed = self.reindex(index=second_index, copy=False) + self._data = reindexed._data + self._index = second_index self._check_data_index_length_match() @classmethod @@ -717,7 +692,7 @@ def __contains__(self, item): @classmethod @_cudf_nvtx_annotate - def from_pandas(cls, s, nan_as_null=no_default): + def from_pandas(cls, s: pd.Series, nan_as_null=no_default): """ Convert from a Pandas Series. @@ -760,7 +735,7 @@ def from_pandas(cls, s, nan_as_null=no_default): False if cudf.get_option("mode.pandas_compatible") else None ) with warnings.catch_warnings(): - warnings.simplefilter("ignore") + warnings.simplefilter("ignore", FutureWarning) result = cls(s, nan_as_null=nan_as_null) return result @@ -5250,16 +5225,16 @@ def isclose(a, b, rtol=1e-05, atol=1e-08, equal_nan=False): b = b.reindex(a.index) index = as_index(a.index) - a_col = column.as_column(a) + a_col = as_column(a) a_array = cupy.asarray(a_col.data_array_view(mode="read")) - b_col = column.as_column(b) + b_col = as_column(b) b_array = cupy.asarray(b_col.data_array_view(mode="read")) result = cupy.isclose( a=a_array, b=b_array, rtol=rtol, atol=atol, equal_nan=equal_nan ) - result_col = column.as_column(result) + result_col = as_column(result) if a_col.null_count and b_col.null_count: a_nulls = a_col.isnull() diff --git a/python/cudf/cudf/core/tools/numeric.py b/python/cudf/cudf/core/tools/numeric.py index b1bcf4b98c5..a28c679b8be 100644 --- a/python/cudf/cudf/core/tools/numeric.py +++ b/python/cudf/cudf/core/tools/numeric.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. import warnings @@ -161,7 +161,7 @@ def to_numeric(arg, errors="raise", downcast=None): break if isinstance(arg, (cudf.Series, pd.Series)): - return cudf.Series(col) + return cudf.Series(col, index=arg.index, name=arg.name) else: if col.has_nulls(): # To match pandas, always return a floating type filled with nan. diff --git a/python/cudf/cudf/testing/dataset_generator.py b/python/cudf/cudf/testing/dataset_generator.py index 1ba205275f3..13c194d6be0 100644 --- a/python/cudf/cudf/testing/dataset_generator.py +++ b/python/cudf/cudf/testing/dataset_generator.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # This module is for generating "synthetic" datasets. It was originally # designed for testing filtered reading. Generally, it should be useful @@ -11,11 +11,9 @@ import uuid from multiprocessing import Pool -import mimesis import numpy as np import pandas as pd import pyarrow as pa -from mimesis import Generic from pyarrow import parquet as pq import cudf @@ -35,8 +33,7 @@ class ColumnParameters: null_frequency : 0.1 Probability of a generated value being null generator : Callable - Function for generating random data. It is passed a Mimesis Generic - provider and returns an Iterable that generates data. + Function for generating random data. is_sorted : bool Sort this column. Columns are sorted in same order as ColumnParameters instances stored in column_params of Parameters. If there are one or @@ -51,7 +48,10 @@ def __init__( self, cardinality=100, null_frequency=0.1, - generator=lambda g: [g.address.country for _ in range(100)], + generator=lambda: [ + _generate_string(string.ascii_letters, random.randint(4, 8)) + for _ in range(100) + ], is_sorted=True, dtype=None, ): @@ -235,15 +235,9 @@ def get_dataframe(parameters, use_threads): if parameters.seed is not None: np.random.seed(parameters.seed) - # For each column, use a generic Mimesis producer to create an Iterable - # for generating data - for i, column_params in enumerate(parameters.column_parameters): - if column_params.dtype is None: - column_params.generator = column_params.generator( - Generic("en", seed=parameters.seed) - ) - else: - column_params.generator = column_params.generator() + # For each column, invoke the data generator + for column_params in parameters.column_parameters: + column_params.generator = column_params.generator() # Get schema for each column table_fields = [] @@ -343,7 +337,6 @@ def rand_dataframe( # Apply seed random.seed(seed) np.random.seed(seed) - mimesis.random.random.seed(seed) column_params = [] for meta in dtypes_meta: diff --git a/python/cudf/cudf/tests/test_numerical.py b/python/cudf/cudf/tests/test_numerical.py index 5bb55c164fe..fee5cc0ad21 100644 --- a/python/cudf/cudf/tests/test_numerical.py +++ b/python/cudf/cudf/tests/test_numerical.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. import numpy as np import pandas as pd @@ -425,3 +425,11 @@ def test_series_to_numeric_bool(data, downcast): got = cudf.to_numeric(gs, downcast=downcast) assert_eq(expect, got) + + +@pytest.mark.parametrize("klass", [cudf.Series, pd.Series]) +def test_series_to_numeric_preserve_index_name(klass): + ser = klass(["1"] * 8, index=range(2, 10), name="name") + result = cudf.to_numeric(ser) + expected = cudf.Series([1] * 8, index=range(2, 10), name="name") + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 5c9e3aa3d9f..007349ab551 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. import datetime import glob @@ -6,6 +6,7 @@ import os import pathlib import random +import string from contextlib import contextmanager from io import BytesIO from string import ascii_letters @@ -432,13 +433,20 @@ def test_parquet_read_filtered(tmpdir, rdg_seed): dg.ColumnParameters( cardinality=40, null_frequency=0.05, - generator=lambda g: [g.address.city() for _ in range(40)], + generator=lambda: [ + "".join( + random.sample( + string.ascii_letters, random.randint(4, 8) + ) + ) + for _ in range(40) + ], is_sorted=False, ), dg.ColumnParameters( 40, 0.2, - lambda g: [g.person.age() for _ in range(40)], + lambda: np.random.default_rng().integers(0, 100, size=40), True, ), ], diff --git a/python/cudf/cudf/tests/test_serialize.py b/python/cudf/cudf/tests/test_serialize.py index e2788e4f03b..cac170cce55 100644 --- a/python/cudf/cudf/tests/test_serialize.py +++ b/python/cudf/cudf/tests/test_serialize.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. import pickle @@ -8,7 +8,6 @@ import pytest import cudf -from cudf.core._compat import PANDAS_GE_200 from cudf.testing import _utils as utils from cudf.testing._utils import assert_eq @@ -302,12 +301,9 @@ def test_serialize_string(): "frames", [ (cudf.Series([], dtype="str"), pd.Series([], dtype="str")), - pytest.param( - (cudf.DataFrame([]), pd.DataFrame([])), - marks=pytest.mark.xfail( - not PANDAS_GE_200, reason=".column returns Index[object]" - ), - ), + (cudf.DataFrame(), pd.DataFrame()), + (cudf.DataFrame([]), pd.DataFrame([])), + (cudf.DataFrame({}), pd.DataFrame({})), (cudf.DataFrame([1]).head(0), pd.DataFrame([1]).head(0)), (cudf.DataFrame({"a": []}), pd.DataFrame({"a": []})), ( @@ -401,3 +397,19 @@ def test_serialize_sliced_string(): recreated = cudf.Series.deserialize(*sliced.serialize()) assert_eq(recreated.to_pandas(nullable=True), pd_series) + + +@pytest.mark.parametrize( + "columns", + [ + cudf.RangeIndex(2), + cudf.Index([1, 2], dtype="int8"), + cudf.MultiIndex( + levels=[["a", "b"], [1, 2]], codes=[[0, 1], [0, 1]], names=["a", 0] + ), + ], +) +def test_serialize_column_types_preserved(columns): + expected = cudf.DataFrame([[10, 11]], columns=columns) + result = cudf.DataFrame.deserialize(*expected.serialize()) + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/test_series.py b/python/cudf/cudf/tests/test_series.py index 39da34fa89c..248ac201e12 100644 --- a/python/cudf/cudf/tests/test_series.py +++ b/python/cudf/cudf/tests/test_series.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. import decimal import hashlib @@ -2683,3 +2683,42 @@ def test_series_duplicate_index_reindex(): lfunc_args_and_kwargs=([10, 11, 12, 13], {}), rfunc_args_and_kwargs=([10, 11, 12, 13], {}), ) + + +@pytest.mark.parametrize( + "klass", [cudf.Series, cudf.Index, pd.Series, pd.Index] +) +def test_series_from_named_object_name_priority(klass): + result = cudf.Series(klass([1], name="a"), name="b") + assert result.name == "b" + + +@pytest.mark.parametrize( + "data", + [ + {"a": 1, "b": 2, "c": 3}, + cudf.Series([1, 2, 3], index=list("abc")), + pd.Series([1, 2, 3], index=list("abc")), + ], +) +def test_series_from_object_with_index_index_arg_reindex(data): + result = cudf.Series(data, index=list("bca")) + expected = cudf.Series([2, 3, 1], index=list("bca")) + assert_eq(result, expected) + + +@pytest.mark.parametrize( + "data", + [ + {0: 1, 1: 2, 2: 3}, + cudf.Series([1, 2, 3]), + cudf.Index([1, 2, 3]), + pd.Series([1, 2, 3]), + pd.Index([1, 2, 3]), + [1, 2, 3], + ], +) +def test_series_dtype_astypes(data): + result = cudf.Series(data, dtype="float64") + expected = cudf.Series([1.0, 2.0, 3.0]) + assert_eq(result, expected) diff --git a/python/cudf/pyproject.toml b/python/cudf/pyproject.toml index 17c8ba02d3a..7c3f4a97a5e 100644 --- a/python/cudf/pyproject.toml +++ b/python/cudf/pyproject.toml @@ -56,7 +56,6 @@ test = [ "cramjam", "fastavro>=0.22.9", "hypothesis", - "mimesis>=4.1.0", "msgpack", "pytest", "pytest-benchmark", diff --git a/python/dask_cudf/pyproject.toml b/python/dask_cudf/pyproject.toml index e5237d206d4..33065da6e8d 100644 --- a/python/dask_cudf/pyproject.toml +++ b/python/dask_cudf/pyproject.toml @@ -8,7 +8,7 @@ requires = [ ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. [project] -name = "dask_cudf" +name = "dask-cudf" dynamic = ["version"] description = "Utilities for Dask and cuDF interactions" readme = { file = "README.md", content-type = "text/markdown" }