From 8db9871580ca5f55d328b01cef8854a1596fbee4 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 19:00:01 +0100 Subject: [PATCH] Fix FOR bug, also fix bench to compile (#341) @robert3005 1. Fix issue with FoR compression where nullability was getting stripped, causing `compress` benchmark to fail 2. Fix some random tokio bs that was also causing `compress` benchmark to fail due to recursive runtime creations --- Cargo.lock | 1 + bench-vortex/Cargo.toml | 1 + bench-vortex/benches/compress_benchmark.rs | 4 +- bench-vortex/benches/random_access.rs | 1 + bench-vortex/src/bin/compress.rs | 22 +- bench-vortex/src/reader.rs | 13 +- bench-vortex/src/taxi_data.rs | 31 ++- .../src/array/bool/compute/as_contiguous.rs | 27 --- vortex-array/src/array/bool/compute/mod.rs | 6 - vortex-array/src/array/chunked/compute/mod.rs | 23 +-- .../src/array/chunked/compute/take.rs | 16 +- vortex-array/src/array/chunked/flatten.rs | 192 ++++++++++++++++++ vortex-array/src/array/chunked/mod.rs | 15 +- vortex-array/src/array/constant/as_arrow.rs | 50 +++++ vortex-array/src/array/constant/compute.rs | 32 +-- vortex-array/src/array/constant/flatten.rs | 2 +- vortex-array/src/array/constant/mod.rs | 1 + .../src/array/datetime/localdatetime.rs | 24 ++- vortex-array/src/array/extension/compute.rs | 16 -- .../array/primitive/compute/as_contiguous.rs | 31 --- .../src/array/primitive/compute/mod.rs | 6 - vortex-array/src/array/sparse/compute/mod.rs | 65 +----- vortex-array/src/array/sparse/flatten.rs | 79 +++++-- vortex-array/src/array/struct/compute.rs | 39 ---- vortex-array/src/array/struct/mod.rs | 21 +- vortex-array/src/array/varbin/compute/mod.rs | 46 +---- vortex-array/src/array/varbin/mod.rs | 2 +- vortex-array/src/compress.rs | 68 +++++-- vortex-array/src/compute/as_contiguous.rs | 36 ---- vortex-array/src/compute/mod.rs | 6 - vortex-array/src/flatten.rs | 13 ++ vortex-array/src/lib.rs | 12 ++ vortex-array/src/typed.rs | 6 +- vortex-array/src/validity.rs | 29 +-- vortex-datetime-parts/src/array.rs | 7 +- vortex-datetime-parts/src/compute.rs | 159 ++++++++++++++- vortex-fastlanes/src/bitpacking/compress.rs | 4 +- vortex-fastlanes/src/for/compress.rs | 9 +- vortex-scalar/src/primitive.rs | 8 + 39 files changed, 684 insertions(+), 439 deletions(-) delete mode 100644 vortex-array/src/array/bool/compute/as_contiguous.rs create mode 100644 vortex-array/src/array/chunked/flatten.rs create mode 100644 vortex-array/src/array/constant/as_arrow.rs delete mode 100644 vortex-array/src/array/primitive/compute/as_contiguous.rs delete mode 100644 vortex-array/src/compute/as_contiguous.rs diff --git a/Cargo.lock b/Cargo.lock index ab3f9a7da2..d0d5358626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -768,6 +768,7 @@ dependencies = [ "uuid", "vortex-alp", "vortex-array", + "vortex-buffer", "vortex-datetime-parts", "vortex-dict", "vortex-dtype", diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 548d2ed325..47228faed0 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -37,6 +37,7 @@ tokio = { workspace = true } uuid = { workspace = true } vortex-alp = { path = "../vortex-alp" } vortex-array = { path = "../vortex-array" } +vortex-buffer = { path = "../vortex-buffer" } vortex-datetime-parts = { path = "../vortex-datetime-parts" } vortex-dict = { path = "../vortex-dict" } vortex-dtype = { path = "../vortex-dtype" } diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index b41bda0499..e586cc404d 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -7,7 +7,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn vortex_compress_taxi(c: &mut Criterion) { taxi_data_parquet(); - let mut group = c.benchmark_group("end to end"); + let mut group = c.benchmark_group("end to end - taxi"); group.sample_size(10); group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); group.finish() @@ -16,7 +16,7 @@ fn vortex_compress_taxi(c: &mut Criterion) { fn vortex_compress_medicare1(c: &mut Criterion) { let dataset = BenchmarkDatasets::PBI(Medicare1); dataset.as_uncompressed(); - let mut group = c.benchmark_group("end to end"); + let mut group = c.benchmark_group("end to end - medicare"); group.sample_size(10); group.bench_function("compress", |b| { b.iter(|| black_box(dataset.compress_to_vortex())) diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index f472683366..eaff19ecf6 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -27,6 +27,7 @@ fn random_access(c: &mut Criterion) { }); let dataset = BenchmarkDatasets::PBI(Medicare1); + dataset.write_as_parquet(); dataset.write_as_lance(); // NB: our parquet benchmarks read from a single file, and we (currently) write each // file to an individual lance dataset for comparison parity. diff --git a/bench-vortex/src/bin/compress.rs b/bench-vortex/src/bin/compress.rs index 8946496544..050f5fe92f 100644 --- a/bench-vortex/src/bin/compress.rs +++ b/bench-vortex/src/bin/compress.rs @@ -4,28 +4,26 @@ use std::path::PathBuf; use bench_vortex::data_downloads::BenchmarkDataset; use bench_vortex::public_bi_data::BenchmarkDatasets::PBI; use bench_vortex::public_bi_data::PBIDataset; -use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex}; +use bench_vortex::reader::{open_vortex_async, rewrite_parquet_as_vortex}; use bench_vortex::taxi_data::taxi_data_parquet; use bench_vortex::{setup_logger, IdempotentPath}; -use futures::executor::block_on; use log::{info, LevelFilter}; use tokio::fs::File; +use vortex_error::VortexResult; -pub fn main() { +#[tokio::main] +pub async fn main() { setup_logger(LevelFilter::Info); // compress_pbi(PBIDataset::Medicare1); - compress_taxi(); + compress_taxi().await.unwrap(); } -fn compress_taxi() { +async fn compress_taxi() -> VortexResult<()> { let path: PathBuf = "taxi_data.vortex".to_data_path(); - block_on(async { - let output_file = File::create(&path).await?; - rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await - }) - .unwrap(); + let output_file = File::create(&path).await?; + rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?; - let taxi_vortex = open_vortex(&path).unwrap(); + let taxi_vortex = open_vortex_async(&path).await?; info!("{}", taxi_vortex.tree_display()); let pq_size = taxi_data_parquet().metadata().unwrap().size(); @@ -33,6 +31,8 @@ fn compress_taxi() { info!("Parquet size: {}, Vortex size: {}", pq_size, vx_size); info!("Compression ratio: {}", vx_size as f32 / pq_size as f32); + + Ok(()) } #[allow(dead_code)] diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 3a27c1d9aa..b032070eca 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -47,6 +47,17 @@ pub fn open_vortex(path: &Path) -> VortexResult { .map(|a| a.into_array()) } +pub async fn open_vortex_async(path: &Path) -> VortexResult { + let file = tokio::fs::File::open(path).await.unwrap(); + let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap(); + msgs.array_stream_from_messages(&CTX) + .await + .unwrap() + .collect_chunked() + .await + .map(|a| a.into_array()) +} + pub async fn rewrite_parquet_as_vortex( parquet_path: PathBuf, write: W, @@ -103,7 +114,7 @@ pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult { let array = open_vortex(path)?; let taken = take(&array, &indices.to_vec().into_array())?; // For equivalence.... we flatten to make sure we're not cheating too much. - taken.flatten().map(|x| x.into_array()) + Ok(taken.flatten()?.into_array()) } pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult { diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index f392b67ddd..10a1e0a0f5 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -1,8 +1,11 @@ +use std::future::{ready, Future}; +use std::io::Write; use std::path::PathBuf; use futures::executor::block_on; -use tokio::fs::File; +use vortex_buffer::io_buf::IoBuf; use vortex_error::VortexError; +use vortex_ipc::io::VortexWrite; use crate::data_downloads::{data_vortex_uncompressed, download_data, parquet_to_lance}; use crate::reader::rewrite_parquet_as_vortex; @@ -33,10 +36,34 @@ pub fn taxi_data_vortex_uncompressed() -> PathBuf { pub fn taxi_data_vortex() -> PathBuf { idempotent("taxi.vortex", |output_fname| { block_on(async { - let output_file = File::create(output_fname).await?; + let output_file = std::fs::File::create(output_fname)?; + let output_file = StdFile(output_file); rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?; Ok::(output_fname.to_path_buf()) }) }) .unwrap() } + +// +// Test code uses futures_executor with a local pool, and nothing in VortexWrite ties us to Tokio, +// so this is a simple bridge to allow us to use a `std::fs::File` as a `VortexWrite`. +// + +struct StdFile(std::fs::File); + +impl VortexWrite for StdFile { + async fn write_all(&mut self, buffer: B) -> std::io::Result { + self.0.write_all(buffer.as_slice())?; + Ok(buffer) + } + + async fn flush(&mut self) -> std::io::Result<()> { + self.0.flush()?; + Ok(()) + } + + fn shutdown(&mut self) -> impl Future> { + ready(Ok(())) + } +} diff --git a/vortex-array/src/array/bool/compute/as_contiguous.rs b/vortex-array/src/array/bool/compute/as_contiguous.rs deleted file mode 100644 index 64bb11b0c5..0000000000 --- a/vortex-array/src/array/bool/compute/as_contiguous.rs +++ /dev/null @@ -1,27 +0,0 @@ -use arrow_buffer::BooleanBuffer; -use vortex_error::VortexResult; - -use crate::array::bool::BoolArray; -use crate::compute::as_contiguous::AsContiguousFn; -use crate::validity::Validity; -use crate::{Array, ArrayDType, IntoArray}; - -impl AsContiguousFn for BoolArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut bools = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum()); - for buffer in arrays - .iter() - .map(|a| Self::try_from(a.clone()).unwrap().boolean_buffer()) - { - bools.extend(buffer.iter()) - } - - Ok(Self::try_new(BooleanBuffer::from(bools), validity)?.into_array()) - } -} diff --git a/vortex-array/src/array/bool/compute/mod.rs b/vortex-array/src/array/bool/compute/mod.rs index b8832b8113..36970c63c6 100644 --- a/vortex-array/src/array/bool/compute/mod.rs +++ b/vortex-array/src/array/bool/compute/mod.rs @@ -1,6 +1,5 @@ use crate::array::bool::BoolArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; use crate::compute::scalar_at::ScalarAtFn; @@ -9,7 +8,6 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; -mod as_contiguous; mod compare; mod fill; mod flatten; @@ -22,10 +20,6 @@ impl ArrayCompute for BoolArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn compare(&self) -> Option<&dyn CompareFn> { Some(self) } diff --git a/vortex-array/src/array/chunked/compute/mod.rs b/vortex-array/src/array/chunked/compute/mod.rs index 7d2950a583..0469a11b97 100644 --- a/vortex-array/src/array/chunked/compute/mod.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -2,23 +2,21 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::chunked::ChunkedArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::scalar_subtract::SubtractScalarFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::Array; mod slice; mod take; impl ArrayCompute for ChunkedArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> { Some(self) } @@ -29,23 +27,6 @@ impl ArrayCompute for ChunkedArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> { - Some(self) - } -} - -impl AsContiguousFn for ChunkedArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - // Combine all the chunks into one, then call as_contiguous again. - let mut chunks = Vec::with_capacity(self.nchunks()); - for array in arrays { - for chunk in Self::try_from(array).unwrap().chunks() { - chunks.push(chunk); - } - } - as_contiguous(&chunks) - } } impl ScalarAtFn for ChunkedArray { diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs index 96b357af7c..a4c6e5671b 100644 --- a/vortex-array/src/array/chunked/compute/take.rs +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -52,10 +52,7 @@ impl TakeFn for ChunkedArray { #[cfg(test)] mod test { - use itertools::Itertools; - use crate::array::chunked::ChunkedArray; - use crate::compute::as_contiguous::as_contiguous; use crate::compute::take::take; use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray}; @@ -68,14 +65,11 @@ mod test { assert_eq!(arr.len(), 9); let indices = vec![0, 0, 6, 4].into_array(); - let result = as_contiguous( - &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) - .unwrap() - .chunks() - .collect_vec(), - ) - .unwrap() - .into_primitive(); + let result = &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) + .unwrap() + .into_array() + .flatten_primitive() + .unwrap(); assert_eq!(result.typed_data::(), &[1, 1, 1, 2]); } } diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs new file mode 100644 index 0000000000..c70b5083d6 --- /dev/null +++ b/vortex-array/src/array/chunked/flatten.rs @@ -0,0 +1,192 @@ +use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; +use itertools::Itertools; +use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType, StructDType}; +use vortex_error::{vortex_bail, ErrString, VortexResult}; +use vortex_scalar::Scalar; + +use crate::accessor::ArrayAccessor; +use crate::array::bool::BoolArray; +use crate::array::chunked::ChunkedArray; +use crate::array::constant::ConstantArray; +use crate::array::extension::ExtensionArray; +use crate::array::primitive::PrimitiveArray; +use crate::array::r#struct::StructArray; +use crate::array::varbin::builder::VarBinBuilder; +use crate::array::varbin::VarBinArray; +use crate::validity::Validity; +use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; + +impl ArrayFlatten for ChunkedArray { + fn flatten(self) -> VortexResult { + try_flatten_chunks(self.chunks().collect(), self.dtype().clone()) + } +} + +pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { + let mismatched = chunks + .iter() + .filter(|chunk| !chunk.dtype().eq(&dtype)) + .collect::>(); + if !mismatched.is_empty() { + vortex_bail!(MismatchedTypes: dtype, ErrString::from(format!("{:?}", mismatched))) + } + + match &dtype { + // Structs can have their internal field pointers swizzled to push the chunking down + // one level internally without copying or decompressing any data. + DType::Struct(struct_dtype, _) => { + let struct_array = swizzle_struct_chunks(chunks.as_slice(), struct_dtype)?; + Ok(Flattened::Struct(struct_array)) + } + + // Extension arrays wrap an internal storage array, which can hold a ChunkedArray until + // it is safe to unpack them. + DType::Extension(ext_dtype, _) => { + let ext_array = ExtensionArray::new( + ext_dtype.clone(), + ChunkedArray::try_new(chunks, dtype.clone())?.into_array(), + ); + + Ok(Flattened::Extension(ext_array)) + } + + // Lists just flatten into their inner PType + DType::List(..) => { + todo!() + } + + DType::Bool(nullability) => { + let bool_array = pack_bools(chunks.as_slice(), *nullability)?; + Ok(Flattened::Bool(bool_array)) + } + DType::Primitive(ptype, nullability) => { + let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?; + Ok(Flattened::Primitive(prim_array)) + } + DType::Utf8(nullability) => { + let varbin_array = pack_varbin(chunks.as_slice(), &dtype, *nullability)?; + Ok(Flattened::VarBin(varbin_array)) + } + DType::Binary(nullability) => { + let varbin_array = pack_varbin(chunks.as_slice(), &dtype, *nullability)?; + Ok(Flattened::VarBin(varbin_array)) + } + DType::Null => { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let const_array = ConstantArray::new(Scalar::null(DType::Null), len); + Ok(Flattened::Null(const_array)) + } + } +} + +/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single +/// StructArray, where the Array for each Field is a ChunkedArray. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn swizzle_struct_chunks( + chunks: &[Array], + struct_dtype: &StructDType, +) -> VortexResult { + let chunks: Vec = chunks.iter().map(StructArray::try_from).try_collect()?; + + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = chunks + .iter() + .map(|chunk| chunk.logical_validity()) + .collect::(); + + let mut field_arrays = Vec::new(); + + for (field_idx, field_dtype) in struct_dtype.dtypes().iter().enumerate() { + let mut field_chunks = Vec::new(); + for chunk in &chunks { + field_chunks.push( + chunk + .field(field_idx) + .expect("all chunks must have same dtype"), + ); + } + let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; + field_arrays.push(field_array.into_array()); + } + + StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity) +} + +/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = validity_from_chunks(chunks, nullability); + let mut bools = Vec::with_capacity(len); + for chunk in chunks { + let chunk = chunk.clone().flatten_bool()?; + bools.extend(chunk.boolean_buffer().iter()); + } + + BoolArray::try_new(BooleanBuffer::from(bools), validity) +} + +/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single +/// contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn pack_primitives( + chunks: &[Array], + ptype: PType, + nullability: Nullability, +) -> VortexResult { + let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = validity_from_chunks(chunks, nullability); + let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width()); + for chunk in chunks { + let chunk = chunk.clone().flatten_primitive()?; + buffer.extend_from_slice(chunk.buffer()); + } + + match_each_native_ptype!(ptype, |$T| { + Ok(PrimitiveArray::try_new( + ScalarBuffer::<$T>::from(buffer), + validity)?) + }) +} + +/// Builds a new [VarBinArray] by repacking the values from the chunks into a single +/// contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. +fn pack_varbin( + chunks: &[Array], + dtype: &DType, + _nullability: Nullability, +) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let mut builder = VarBinBuilder::::with_capacity(len); + + for chunk in chunks { + let chunk = chunk.clone().flatten_varbin()?; + chunk.with_iterator(|iter| { + for datum in iter { + builder.push(datum); + } + })?; + } + + Ok(builder.finish(dtype.clone())) +} + +fn validity_from_chunks(chunks: &[Array], nullability: Nullability) -> Validity { + if nullability == Nullability::NonNullable { + Validity::NonNullable + } else { + chunks + .iter() + .map(|chunk| chunk.with_dyn(|a| a.logical_validity())) + .collect() + } +} diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 25d5eb360e..8d13e642aa 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -6,7 +6,6 @@ use vortex_error::vortex_bail; use vortex_scalar::Scalar; use crate::array::primitive::PrimitiveArray; -use crate::compute::as_contiguous::as_contiguous; use crate::compute::scalar_at::scalar_at; use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn}; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; @@ -15,9 +14,10 @@ use crate::stream::{ArrayStream, ArrayStreamAdapter}; use crate::validity::Validity::NonNullable; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ArrayDType, ArrayFlatten, IntoArrayData, ToArrayData}; +use crate::{impl_encoding, ArrayDType, IntoArrayData, ToArrayData}; mod compute; +mod flatten; mod stats; impl_encoding!("vortex.chunked", Chunked); @@ -114,17 +114,6 @@ impl FromIterator for ChunkedArray { } } -impl ArrayFlatten for ChunkedArray { - fn flatten(self) -> VortexResult { - let chunks = self.chunks().collect_vec(); - if chunks.is_empty() { - // TODO(ngates): return an empty FlattenedArray with the correct DType. - panic!("Cannot yet flatten an empty chunked array"); - } - as_contiguous(chunks.as_slice())?.flatten() - } -} - impl AcceptArrayVisitor for ChunkedArray { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("chunk_ends", &self.chunk_ends())?; diff --git a/vortex-array/src/array/constant/as_arrow.rs b/vortex-array/src/array/constant/as_arrow.rs new file mode 100644 index 0000000000..f2e75cb0e0 --- /dev/null +++ b/vortex-array/src/array/constant/as_arrow.rs @@ -0,0 +1,50 @@ +//! Implementation of the [AsArrowArray] trait for [ConstantArray] that is representing +//! [DType::Null] values. + +use std::sync::Arc; + +use arrow_array::{ArrayRef as ArrowArrayRef, NullArray}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::array::constant::ConstantArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::{ArrayDType, ArrayTrait}; + +impl AsArrowArray for ConstantArray { + fn as_arrow(&self) -> VortexResult { + if self.dtype() != &DType::Null { + vortex_bail!(InvalidArgument: "only null ConstantArrays convert to arrow"); + } + + let arrow_null = NullArray::new(self.len()); + Ok(Arc::new(arrow_null)) + } +} + +#[cfg(test)] +mod test { + use arrow_array::{Array, NullArray}; + + use crate::array::constant::ConstantArray; + use crate::arrow::FromArrowArray; + use crate::compute::as_arrow::AsArrowArray; + use crate::{ArrayData, IntoArray}; + + #[test] + fn test_round_trip() { + let arrow_nulls = NullArray::new(10); + let vortex_nulls = ArrayData::from_arrow(&arrow_nulls, true).into_array(); + + assert_eq!( + *ConstantArray::try_from(vortex_nulls) + .unwrap() + .as_arrow() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + arrow_nulls + ); + } +} diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index 4183e7e8ba..103b1693f7 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -1,19 +1,13 @@ -use itertools::Itertools; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::constant::ConstantArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::{Array, ArrayTrait, IntoArray}; +use crate::{Array, IntoArray}; impl ArrayCompute for ConstantArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -23,28 +17,6 @@ impl ArrayCompute for ConstantArray { } } -impl AsContiguousFn for ConstantArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let chunks = arrays - .iter() - .map(|a| Self::try_from(a).unwrap()) - .collect_vec(); - - if chunks.iter().map(|c| c.scalar()).all_equal() { - Ok(Self::new( - chunks.first().unwrap().scalar().clone(), - chunks.iter().map(|c| c.len()).sum(), - ) - .into_array()) - } else { - // TODO(ngates): we need to flatten the constant arrays and then concatenate them - Err(vortex_err!( - "Cannot concatenate constant arrays with differing scalars" - )) - } - } -} - impl ScalarAtFn for ConstantArray { fn scalar_at(&self, _index: usize) -> VortexResult { Ok(self.scalar().clone()) diff --git a/vortex-array/src/array/constant/flatten.rs b/vortex-array/src/array/constant/flatten.rs index ef9beb1036..cde9ec3ba1 100644 --- a/vortex-array/src/array/constant/flatten.rs +++ b/vortex-array/src/array/constant/flatten.rs @@ -29,7 +29,7 @@ impl ArrayFlatten for ConstantArray { if let Ok(ptype) = PType::try_from(self.scalar().dtype()) { return match_each_native_ptype!(ptype, |$P| { Ok(Flattened::Primitive(PrimitiveArray::from_vec::<$P>( - vec![$P::try_from(self.scalar())?; self.len()], + vec![$P::try_from(self.scalar()).unwrap_or_else(|_| $P::default()); self.len()], validity, ))) }); diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 0292398166..d3344fe6c9 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -7,6 +7,7 @@ use crate::impl_encoding; use crate::stats::Stat; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; +mod as_arrow; mod compute; mod flatten; mod stats; diff --git a/vortex-array/src/array/datetime/localdatetime.rs b/vortex-array/src/array/datetime/localdatetime.rs index a3aa000278..5d854731bc 100644 --- a/vortex-array/src/array/datetime/localdatetime.rs +++ b/vortex-array/src/array/datetime/localdatetime.rs @@ -16,7 +16,7 @@ use crate::validity::ArrayValidity; use crate::{Array, ArrayDType, ArrayData, IntoArrayData}; lazy_static! { - static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); + pub static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); } pub struct LocalDateTimeArray { @@ -54,6 +54,26 @@ impl LocalDateTimeArray { } } +impl TryFrom for ExtensionArray { + type Error = VortexError; + + fn try_from(value: LocalDateTimeArray) -> Result { + Self::try_from(&value) + } +} + +impl TryFrom<&LocalDateTimeArray> for ExtensionArray { + type Error = VortexError; + + fn try_from(value: &LocalDateTimeArray) -> Result { + let DType::Extension(ext_dtype, _) = value.dtype().clone() else { + vortex_bail!(ComputeError: "expected dtype to be Extension variant"); + }; + + Ok(Self::new(ext_dtype, value.ext.storage())) + } +} + impl TryFrom<&ExtensionArray> for LocalDateTimeArray { type Error = VortexError; @@ -93,7 +113,7 @@ impl IntoArrayData for LocalDateTimeArray { } } -fn try_parse_time_unit(ext_dtype: &ExtDType) -> VortexResult { +pub fn try_parse_time_unit(ext_dtype: &ExtDType) -> VortexResult { let byte: [u8; 1] = ext_dtype .metadata() .ok_or_else(|| vortex_err!("Missing metadata"))? diff --git a/vortex-array/src/array/extension/compute.rs b/vortex-array/src/array/extension/compute.rs index 70ab9706c3..d7611d3f73 100644 --- a/vortex-array/src/array/extension/compute.rs +++ b/vortex-array/src/array/extension/compute.rs @@ -5,7 +5,6 @@ use vortex_scalar::Scalar; use crate::array::datetime::LocalDateTimeArray; use crate::array::extension::ExtensionArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::CastFn; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; @@ -18,10 +17,6 @@ impl ArrayCompute for ExtensionArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { // It's not possible to cast an extension array to another type. // TODO(ngates): we should allow some extension arrays to implement a callback @@ -54,17 +49,6 @@ impl AsArrowArray for ExtensionArray { } } -impl AsContiguousFn for ExtensionArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let storage_arrays = arrays - .iter() - .map(|a| Self::try_from(a).expect("not an extension array").storage()) - .collect::>(); - - Ok(Self::new(self.ext_dtype().clone(), as_contiguous(&storage_arrays)?).into_array()) - } -} - impl ScalarAtFn for ExtensionArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::extension( diff --git a/vortex-array/src/array/primitive/compute/as_contiguous.rs b/vortex-array/src/array/primitive/compute/as_contiguous.rs deleted file mode 100644 index 150849ef32..0000000000 --- a/vortex-array/src/array/primitive/compute/as_contiguous.rs +++ /dev/null @@ -1,31 +0,0 @@ -use arrow_buffer::{MutableBuffer, ScalarBuffer}; -use vortex_dtype::match_each_native_ptype; -use vortex_error::VortexResult; - -use crate::array::primitive::PrimitiveArray; -use crate::compute::as_contiguous::AsContiguousFn; -use crate::validity::Validity; -use crate::ArrayDType; -use crate::{Array, IntoArray}; - -impl AsContiguousFn for PrimitiveArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut buffer = MutableBuffer::with_capacity( - arrays.iter().map(|a| a.len()).sum::() * self.ptype().byte_width(), - ); - for array in arrays { - buffer.extend_from_slice(array.as_primitive().buffer()) - } - match_each_native_ptype!(self.ptype(), |$T| { - Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity) - .unwrap() - .into_array()) - }) - } -} diff --git a/vortex-array/src/array/primitive/compute/mod.rs b/vortex-array/src/array/primitive/compute/mod.rs index 8b87128a11..637f63f366 100644 --- a/vortex-array/src/array/primitive/compute/mod.rs +++ b/vortex-array/src/array/primitive/compute/mod.rs @@ -1,6 +1,5 @@ use crate::array::primitive::PrimitiveArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::cast::CastFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; @@ -13,7 +12,6 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; -mod as_contiguous; mod cast; mod compare; mod fill; @@ -29,10 +27,6 @@ impl ArrayCompute for PrimitiveArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { Some(self) } diff --git a/vortex-array/src/array/sparse/compute/mod.rs b/vortex-array/src/array/sparse/compute/mod.rs index 722ba3e082..bb772bd0bd 100644 --- a/vortex-array/src/array/sparse/compute/mod.rs +++ b/vortex-array/src/array/sparse/compute/mod.rs @@ -2,25 +2,20 @@ use std::collections::HashMap; use itertools::Itertools; use vortex_dtype::match_each_integer_ptype; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::SliceFn; use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; -use crate::{Array, ArrayDType, ArrayTrait, IntoArray}; +use crate::{Array, ArrayDType, IntoArray}; mod slice; impl ArrayCompute for SparseArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -34,27 +29,6 @@ impl ArrayCompute for SparseArray { } } -impl AsContiguousFn for SparseArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let sparse = arrays - .iter() - .map(|a| Self::try_from(a).unwrap()) - .collect_vec(); - - if !sparse.iter().map(|a| a.fill_value()).all_equal() { - vortex_bail!("Cannot concatenate SparseArrays with differing fill values"); - } - - Ok(Self::new( - as_contiguous(&sparse.iter().map(|a| a.indices()).collect_vec())?, - as_contiguous(&sparse.iter().map(|a| a.values()).collect_vec())?, - sparse.iter().map(|a| a.len()).sum(), - self.fill_value().clone(), - ) - .into_array()) - } -} - impl ScalarAtFn for SparseArray { fn scalar_at(&self, index: usize) -> VortexResult { match self.find_index(index)? { @@ -144,8 +118,6 @@ mod test { use crate::array::primitive::PrimitiveArray; use crate::array::sparse::compute::take_map; use crate::array::sparse::SparseArray; - use crate::compute::as_contiguous::as_contiguous; - use crate::compute::slice::slice; use crate::compute::take::take; use crate::validity::Validity; use crate::{Array, ArrayTrait, IntoArray}; @@ -206,39 +178,6 @@ mod test { assert_eq!(taken.len(), 2); } - #[test] - fn take_slices_and_reassemble() { - let sparse = sparse_array(); - let slices = (0..10) - .map(|i| slice(&sparse, i * 10, (i + 1) * 10).unwrap()) - .collect_vec(); - - let taken = slices - .iter() - .map(|s| take(s, &(0u64..10).collect_vec().into_array()).unwrap()) - .collect_vec(); - for i in [1, 2, 5, 6, 7, 8] { - assert_eq!(SparseArray::try_from(&taken[i]).unwrap().indices().len(), 0); - } - for i in [0, 3, 4, 9] { - assert_eq!(SparseArray::try_from(&taken[i]).unwrap().indices().len(), 1); - } - - let contiguous = SparseArray::try_from(as_contiguous(&taken).unwrap()).unwrap(); - assert_eq!( - contiguous.indices().into_primitive().typed_data::(), - [0u64, 7, 7, 9] // relative offsets - ); - assert_eq!( - contiguous.values().into_primitive().typed_data::(), - SparseArray::try_from(sparse) - .unwrap() - .values() - .into_primitive() - .typed_data::() - ); - } - #[test] fn test_take_map() { let sparse = SparseArray::try_from(sparse_array()).unwrap(); diff --git a/vortex-array/src/array/sparse/flatten.rs b/vortex-array/src/array/sparse/flatten.rs index 8d13350d8b..0ebbef70cb 100644 --- a/vortex-array/src/array/sparse/flatten.rs +++ b/vortex-array/src/array/sparse/flatten.rs @@ -1,13 +1,14 @@ -use arrow_buffer::BooleanBufferBuilder; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use itertools::Itertools; -use vortex_dtype::{match_each_native_ptype, NativePType}; +use vortex_dtype::{match_each_native_ptype, DType, NativePType}; use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; +use crate::array::bool::BoolArray; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; use crate::validity::Validity; -use crate::{ArrayFlatten, ArrayTrait, Flattened}; +use crate::{ArrayDType, ArrayFlatten, ArrayTrait, Flattened}; impl ArrayFlatten for SparseArray { fn flatten(self) -> VortexResult { @@ -16,20 +17,50 @@ impl ArrayFlatten for SparseArray { let mut validity = BooleanBufferBuilder::new(self.len()); validity.append_n(self.len(), false); - let values = self.values().flatten_primitive()?; - match_each_native_ptype!(values.ptype(), |$P| { - flatten_sparse_values( - values.typed_data::<$P>(), - &indices, - self.len(), - self.fill_value(), - validity - ) - }) + + if matches!(self.dtype(), DType::Bool(_)) { + let values = self.values().flatten_bool()?.boolean_buffer(); + flatten_sparse_bools(values, &indices, self.len(), self.fill_value(), validity) + } else { + let values = self.values().flatten_primitive()?; + match_each_native_ptype!(values.ptype(), |$P| { + flatten_sparse_primitives( + values.typed_data::<$P>(), + &indices, + self.len(), + self.fill_value(), + validity + ) + }) + } + } +} + +fn flatten_sparse_bools( + values: BooleanBuffer, + indices: &[usize], + len: usize, + fill_value: &Scalar, + mut validity: BooleanBufferBuilder, +) -> VortexResult { + let fill_bool: bool = if fill_value.is_null() { + bool::default() + } else { + fill_value.try_into()? + }; + let mut flat_bools = vec![fill_bool; len]; + for idx in indices { + flat_bools[*idx] = values.value(*idx); + validity.set_bit(*idx, true); } + + let validity = Validity::from(validity.finish()); + let bool_values = BoolArray::from_vec(flat_bools, validity); + + Ok(Flattened::Bool(bool_values)) } -fn flatten_sparse_values TryFrom<&'a Scalar, Error = VortexError>>( +fn flatten_sparse_primitives TryFrom<&'a Scalar, Error = VortexError>>( values: &[T], indices: &[usize], len: usize, @@ -56,3 +87,23 @@ fn flatten_sparse_values TryFrom<&'a Scalar, Error = Vo }; Ok(Flattened::Primitive(array)) } + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, Nullability}; + + use crate::array::bool::BoolArray; + use crate::array::sparse::SparseArray; + use crate::validity::Validity; + use crate::{ArrayDType, ArrayFlatten, Flattened, IntoArray}; + + #[test] + fn test_sparse_bool() { + let indices = vec![0u64].into_array(); + let values = BoolArray::from_vec(vec![true], Validity::NonNullable).into_array(); + let sparse_bools = SparseArray::new(indices, values, 10, true.into()); + assert_eq!(*sparse_bools.dtype(), DType::Bool(Nullability::NonNullable)); + let flat_bools = sparse_bools.flatten().unwrap(); + assert!(matches!(flat_bools, Flattened::Bool(_))); + } +} diff --git a/vortex-array/src/array/struct/compute.rs b/vortex-array/src/array/struct/compute.rs index 3af6e1bcd9..7b46317b5b 100644 --- a/vortex-array/src/array/struct/compute.rs +++ b/vortex-array/src/array/struct/compute.rs @@ -10,13 +10,10 @@ use vortex_scalar::Scalar; use crate::array::r#struct::StructArray; use crate::compute::as_arrow::{as_arrow, AsArrowArray}; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; -use crate::validity::Validity; -use crate::ArrayTrait; use crate::{Array, ArrayDType, IntoArray}; impl ArrayCompute for StructArray { @@ -24,10 +21,6 @@ impl ArrayCompute for StructArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -69,38 +62,6 @@ impl AsArrowArray for StructArray { } } -impl AsContiguousFn for StructArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let struct_arrays = arrays - .iter() - .map(Self::try_from) - .collect::>>()?; - let mut fields = vec![Vec::new(); self.dtypes().len()]; - for array in struct_arrays.iter() { - for (f, field) in fields.iter_mut().enumerate() { - field.push(array.field(f).unwrap()); - } - } - - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - Self::try_new( - self.names().clone(), - fields - .iter() - .map(|field_arrays| as_contiguous(field_arrays)) - .try_collect()?, - self.len(), - validity, - ) - .map(|a| a.into_array()) - } -} - impl ScalarAtFn for StructArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::r#struct( diff --git a/vortex-array/src/array/struct/mod.rs b/vortex-array/src/array/struct/mod.rs index 901926dbb1..f25ddcc0ba 100644 --- a/vortex-array/src/array/struct/mod.rs +++ b/vortex-array/src/array/struct/mod.rs @@ -99,20 +99,9 @@ impl StructArray { } impl ArrayFlatten for StructArray { + /// StructEncoding is the canonical form for a [DType::Struct] array, so return self. fn flatten(self) -> VortexResult { - Ok(Flattened::Struct(Self::try_new( - self.names().clone(), - (0..self.nfields()) - .map(|i| { - self.field(i) - .expect("Missing child") - .flatten() - .map(|f| f.into_array()) - }) - .collect::>>()?, - self.len(), - self.validity(), - )?)) + Ok(Flattened::Struct(self)) } } @@ -123,12 +112,12 @@ impl ArrayTrait for StructArray { } impl ArrayValidity for StructArray { - fn is_valid(&self, _index: usize) -> bool { - todo!() + fn is_valid(&self, index: usize) -> bool { + self.validity().is_valid(index) } fn logical_validity(&self) -> LogicalValidity { - todo!() + self.validity().to_logical(self.len()) } } diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index fbf94b6e77..7d3c31401b 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -3,24 +3,21 @@ use std::sync::Arc; use arrow_array::{ ArrayRef as ArrowArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray, }; -use itertools::Itertools; use vortex_dtype::DType; use vortex_dtype::PType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; -use crate::array::primitive::PrimitiveArray; use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::arrow::wrappers::as_offset_buffer; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::cast; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::validity::{ArrayValidity, Validity}; -use crate::{Array, ArrayDType, IntoArray, ToArray}; +use crate::validity::ArrayValidity; +use crate::{ArrayDType, ToArray}; mod slice; mod take; @@ -30,10 +27,6 @@ impl ArrayCompute for VarBinArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -47,41 +40,6 @@ impl ArrayCompute for VarBinArray { } } -impl AsContiguousFn for VarBinArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let bytes_chunks: Vec = arrays - .iter() - .map(|a| Self::try_from(a).unwrap().sliced_bytes()) - .try_collect()?; - let bytes = as_contiguous(&bytes_chunks)?; - - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut offsets = Vec::new(); - offsets.push(0); - for a in arrays.iter().map(|a| Self::try_from(a).unwrap()) { - let first_offset: u64 = a.first_offset()?; - let offsets_array = cast(&a.offsets(), PType::U64.into())?.flatten_primitive()?; - let shift = offsets.last().copied().unwrap_or(0); - offsets.extend( - offsets_array - .typed_data::() - .iter() - .skip(1) // Ignore the zero offset for each array - .map(|o| o + shift - first_offset), - ); - } - - let offsets_array = PrimitiveArray::from(offsets).into_array(); - - Self::try_new(offsets_array, bytes, self.dtype().clone(), validity).map(|a| a.into_array()) - } -} - impl AsArrowArray for VarBinArray { fn as_arrow(&self) -> VortexResult { // Ensure the offsets are either i32 or i64 diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 4a7ec6afab..cfd8de1337 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -41,7 +41,7 @@ impl VarBinArray { if !offsets.dtype().is_int() || offsets.dtype().is_nullable() { vortex_bail!(MismatchedTypes: "non nullable int", offsets.dtype()); } - if !matches!(bytes.dtype(), &DType::BYTES,) { + if !matches!(bytes.dtype(), &DType::BYTES) { vortex_bail!(MismatchedTypes: "u8", bytes.dtype()); } if !matches!(dtype, DType::Binary(_) | DType::Utf8(_)) { diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 7fa5263f88..8e1d0843d2 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -13,7 +13,7 @@ use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::sampling::stratified_slices; use crate::stats::ArrayStatistics; use crate::validity::Validity; -use crate::{compute, Array, ArrayDType, ArrayDef, ArrayTrait, Context, IntoArray}; +use crate::{Array, ArrayDType, ArrayDef, ArrayFlatten, ArrayTrait, Context, IntoArray}; pub trait EncodingCompression: ArrayEncoding { fn cost(&self) -> u8 { @@ -146,14 +146,10 @@ impl<'a> Compressor<'a> { .map(|c| c.compress(arr, Some(l), self.for_encoding(l.encoding().compression()))) { let compressed = compressed?; - if compressed.dtype() != arr.dtype() { - panic!( - "Compression changed dtype: {:?} -> {:?} for {}", - arr.dtype(), - compressed.dtype(), - compressed.tree_display(), - ); - } + + check_validity_unchanged(arr, &compressed); + check_dtype_unchanged(arr, &compressed); + return Ok(compressed); } else { warn!( @@ -165,14 +161,6 @@ impl<'a> Compressor<'a> { // Otherwise, attempt to compress the array let compressed = self.compress_array(arr)?; - if compressed.dtype() != arr.dtype() { - panic!( - "Compression changed dtype: {:?} -> {:?} for {}", - arr.dtype(), - compressed.dtype(), - compressed.tree_display(), - ); - } Ok(compressed) } @@ -226,6 +214,39 @@ impl<'a> Compressor<'a> { } } +/// Check that compression did not alter the length of the validity array. +fn check_validity_unchanged(arr: &Array, compressed: &Array) { + let _ = arr; + let _ = compressed; + #[cfg(debug_assertions)] + { + let old_validity = arr.with_dyn(|a| a.logical_validity().len()); + let new_validity = compressed.with_dyn(|a| a.logical_validity().len()); + + debug_assert!( + old_validity == new_validity, + "validity length changed after compression: {old_validity} -> {new_validity} for {}", + compressed.tree_display() + ); + } +} + +/// Check that compression did not alter the dtype. +fn check_dtype_unchanged(arr: &Array, compressed: &Array) { + let _ = arr; + let _ = compressed; + #[cfg(debug_assertions)] + { + debug_assert!( + arr.dtype() == compressed.dtype(), + "Compression changed dtype: {:?} -> {:?} for {}", + arr.dtype(), + compressed.dtype(), + compressed.tree_display(), + ); + } +} + pub fn sampled_compression(array: &Array, compressor: &Compressor) -> VortexResult> { // First, we try constant compression and shortcut any sampling. if !array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false) { @@ -290,16 +311,19 @@ pub fn sampled_compression(array: &Array, compressor: &Compressor) -> VortexResu } // Take a sample of the array, then ask codecs for their best compression estimate. - let sample = compute::as_contiguous::as_contiguous( - &stratified_slices( + let sample = ChunkedArray::try_new( + stratified_slices( array.len(), compressor.options.sample_size, compressor.options.sample_count, ) .into_iter() - .map(|(start, stop)| slice(array, start, stop).unwrap()) - .collect::>(), - )?; + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone(), + )? + .flatten()? + .into_array(); find_best_compression(candidates, &sample, compressor)? .map(|(compression, best)| { diff --git a/vortex-array/src/compute/as_contiguous.rs b/vortex-array/src/compute/as_contiguous.rs deleted file mode 100644 index fc88d86920..0000000000 --- a/vortex-array/src/compute/as_contiguous.rs +++ /dev/null @@ -1,36 +0,0 @@ -use itertools::Itertools; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; - -use crate::{Array, ArrayDType}; - -pub trait AsContiguousFn { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult; -} - -pub fn as_contiguous(arrays: &[Array]) -> VortexResult { - if arrays.is_empty() { - vortex_bail!(ComputeError: "No arrays to concatenate"); - } - if !arrays.iter().map(|chunk| chunk.encoding().id()).all_equal() { - vortex_bail!(ComputeError: - "Chunks have differing encodings", - ); - } - if !arrays.iter().map(|chunk| chunk.dtype()).all_equal() { - vortex_bail!(ComputeError: - "Chunks have differing dtypes", - ); - } - - let first = arrays.first().unwrap(); - first.with_dyn(|a| { - a.as_contiguous() - .map(|f| f.as_contiguous(arrays)) - .unwrap_or_else(|| { - Err(vortex_err!( - NotImplemented: "as_contiguous", - first.encoding().id() - )) - }) - }) -} diff --git a/vortex-array/src/compute/mod.rs b/vortex-array/src/compute/mod.rs index 8ca8fd1815..9f2f18df63 100644 --- a/vortex-array/src/compute/mod.rs +++ b/vortex-array/src/compute/mod.rs @@ -1,5 +1,4 @@ use as_arrow::AsArrowArray; -use as_contiguous::AsContiguousFn; use cast::CastFn; use compare::CompareFn; use fill::FillForwardFn; @@ -13,7 +12,6 @@ use crate::compute::filter_indices::FilterIndicesFn; use crate::compute::scalar_subtract::SubtractScalarFn; pub mod as_arrow; -pub mod as_contiguous; pub mod cast; pub mod compare; pub mod fill; @@ -30,10 +28,6 @@ pub trait ArrayCompute { None } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - None - } - fn cast(&self) -> Option<&dyn CastFn> { None } diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index 42fcbbcb3d..da991117a7 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -1,6 +1,7 @@ use vortex_error::VortexResult; use crate::array::bool::BoolArray; +use crate::array::constant::ConstantArray; use crate::array::extension::ExtensionArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; @@ -11,6 +12,7 @@ use crate::{Array, IntoArray}; /// The set of encodings that can be converted to Arrow with zero-copy. pub enum Flattened { + Null(ConstantArray), Bool(BoolArray), Primitive(PrimitiveArray), Struct(StructArray), @@ -19,6 +21,12 @@ pub enum Flattened { Extension(ExtensionArray), } +/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding. +/// +/// Flattening an Array ensures that the array's encoding matches one of the builtin canonical +/// encodings, each of which has a corresponding [Flattened] variant. +/// +/// **Important**: DType remains the same before and after a flatten operation. pub trait ArrayFlatten { fn flatten(self) -> VortexResult; } @@ -28,6 +36,10 @@ impl Array { ArrayEncoding::flatten(self.encoding(), self) } + pub fn flatten_extension(self) -> VortexResult { + ExtensionArray::try_from(self.flatten()?.into_array()) + } + pub fn flatten_bool(self) -> VortexResult { BoolArray::try_from(self.flatten()?.into_array()) } @@ -44,6 +56,7 @@ impl Array { impl IntoArray for Flattened { fn into_array(self) -> Array { match self { + Self::Null(a) => a.into_array(), Self::Bool(a) => a.into_array(), Self::Primitive(a) => a.into_array(), Self::Struct(a) => a.into_array(), diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 134d91781f..b60e554d7e 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,3 +1,13 @@ +//! Vortex crate containing core logic for encoding and memory representation of [arrays](Array). +//! +//! At the heart of Vortex are [arrays](Array) and [encodings](crate::encoding::EncodingCompression). +//! Arrays are typed views of memory buffers that hold [scalars](vortex_scalar::Scalar). These +//! buffers can be held in a number of physical encodings to perform lightweight compression that +//! exploits the particular data distribution of the array's values. +//! +//! Every data type recognized by Vortex also has a canonical physical encoding format, which +//! arrays can be [flattened](Flattened) into for ease of access in compute functions. +//! pub mod accessor; pub mod array; pub mod arrow; @@ -59,6 +69,7 @@ pub mod flatbuffers { #[allow(unused_imports)] pub use vortex_dtype::flatbuffers as dtype; } + pub mod scalar { #[allow(unused_imports)] pub use vortex_scalar::flatbuffers as scalar; @@ -177,6 +188,7 @@ pub trait ArrayDType { } struct NBytesVisitor(usize); + impl ArrayVisitor for NBytesVisitor { fn visit_child(&mut self, _name: &str, array: &Array) -> VortexResult<()> { self.0 += array.with_dyn(|a| a.nbytes()); diff --git a/vortex-array/src/typed.rs b/vortex-array/src/typed.rs index 49184e8c5e..4284284d6d 100644 --- a/vortex-array/src/typed.rs +++ b/vortex-array/src/typed.rs @@ -48,7 +48,11 @@ impl TryFrom for TypedArray { fn try_from(array: Array) -> Result { if array.encoding().id() != D::ENCODING.id() { - vortex_bail!("incorrect encoding"); + vortex_bail!( + "incorrect encoding {}, expected {}", + array.encoding().id().as_ref(), + D::ENCODING.id().as_ref(), + ); } let metadata = match &array { Array::Data(d) => d diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index bf0aa4967b..a88fa66714 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -1,10 +1,9 @@ -use arrow_buffer::{BooleanBuffer, NullBuffer}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use serde::{Deserialize, Serialize}; use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, VortexResult}; use crate::array::bool::BoolArray; -use crate::compute::as_contiguous::as_contiguous; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; use crate::compute::take::take; @@ -196,17 +195,23 @@ impl FromIterator for Validity { return Self::AllInvalid; } - // Otherwise, map each to a bool array and concatenate them. - let arrays = validities - .iter() - .map(|v| { - v.to_present_null_buffer() - .unwrap() - .into_array_data() + // Else, construct the boolean buffer + let mut buffer = BooleanBufferBuilder::new(validities.iter().map(|v| v.len()).sum()); + for validity in validities { + let present = match validity { + LogicalValidity::AllValid(count) => BooleanBuffer::new_set(count), + LogicalValidity::AllInvalid(count) => BooleanBuffer::new_unset(count), + LogicalValidity::Array(array) => array .into_array() - }) - .collect::>(); - Self::Array(as_contiguous(&arrays).unwrap()) + .flatten_bool() + .expect("validity must flatten to BoolArray") + .boolean_buffer(), + }; + buffer.append_buffer(&present); + } + let bool_array = BoolArray::try_new(buffer.finish(), Validity::NonNullable) + .expect("BoolArray::try_new from BooleanBuffer should always succeed"); + Self::Array(bool_array.into_array()) } } diff --git a/vortex-datetime-parts/src/array.rs b/vortex-datetime-parts/src/array.rs index 94e7f6a18e..32abbe2443 100644 --- a/vortex-datetime-parts/src/array.rs +++ b/vortex-datetime-parts/src/array.rs @@ -5,6 +5,8 @@ use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor}; use vortex::{impl_encoding, ArrayDType, ArrayFlatten, ToArrayData}; use vortex_error::vortex_bail; +use crate::compute::decode_to_localdatetime; + impl_encoding!("vortex.datetimeparts", DateTimeParts); #[derive(Clone, Debug, Serialize, Deserialize)] @@ -81,8 +83,9 @@ impl DateTimePartsArray { impl ArrayFlatten for DateTimePartsArray { fn flatten(self) -> VortexResult { - // TODO(ngates): flatten into vortex.localdatetime or appropriate per dtype - todo!() + Ok(Flattened::Extension( + decode_to_localdatetime(&self.into_array())?.try_into()?, + )) } } diff --git a/vortex-datetime-parts/src/compute.rs b/vortex-datetime-parts/src/compute.rs index 31d4e5f003..e19241a29f 100644 --- a/vortex-datetime-parts/src/compute.rs +++ b/vortex-datetime-parts/src/compute.rs @@ -1,12 +1,22 @@ +use vortex::array::datetime::{try_parse_time_unit, LocalDateTimeArray, TimeUnit}; +use vortex::array::primitive::PrimitiveArray; +use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; +use vortex::validity::ArrayValidity; use vortex::{Array, ArrayDType, IntoArray}; -use vortex_error::VortexResult; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_scalar::Scalar; use crate::DateTimePartsArray; impl ArrayCompute for DateTimePartsArray { + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + fn slice(&self) -> Option<&dyn SliceFn> { Some(self) } @@ -39,3 +49,150 @@ impl SliceFn for DateTimePartsArray { .into_array()) } } + +impl ScalarAtFn for DateTimePartsArray { + fn scalar_at(&self, index: usize) -> VortexResult { + let DType::Extension(ext, nullability) = self.dtype().clone() else { + panic!("DateTimePartsArray must have extension dtype"); + }; + + match ext.id().as_ref() { + LocalDateTimeArray::ID => { + let time_unit = try_parse_time_unit(&ext)?; + let divisor = match time_unit { + TimeUnit::Ns => 1_000_000_000, + TimeUnit::Us => 1_000_000, + TimeUnit::Ms => 1_000, + TimeUnit::S => 1, + }; + + let days: i64 = scalar_at(&self.days(), index)?.try_into()?; + let seconds: i64 = scalar_at(&self.seconds(), index)?.try_into()?; + let subseconds: i64 = scalar_at(&self.subsecond(), index)?.try_into()?; + + let scalar = days * 86_400 * divisor + seconds * divisor + subseconds; + + Ok(Scalar::primitive(scalar, nullability)) + } + _ => { + vortex_bail!(MismatchedTypes: LocalDateTimeArray::ID.to_string(), ext.id().as_ref().to_string()) + } + } + } +} + +/// Decode an [Array] to a [LocalDateTimeArray]. +/// +/// Enforces that the passed array is actually a [DateTimePartsArray] with proper metadata. +pub fn decode_to_localdatetime(array: &Array) -> VortexResult { + // Ensure we can process it + let array = DateTimePartsArray::try_from(array)?; + + let DType::Extension(ext, _) = array.dtype().clone() else { + vortex_bail!(ComputeError: "expected dtype to be DType::Extension variant") + }; + + if ext.id().as_ref() != LocalDateTimeArray::ID { + vortex_bail!(ComputeError: "DateTimeParts extension type must be vortex.localdatetime") + } + + let time_unit = try_parse_time_unit(&ext)?; + let divisor = match time_unit { + TimeUnit::Ns => 1_000_000_000, + TimeUnit::Us => 1_000_000, + TimeUnit::Ms => 1_000, + TimeUnit::S => 1, + }; + + let days_buf = array + .days() + .flatten()? + .into_array() + .as_primitive() + .scalar_buffer::(); + let seconds_buf = array + .seconds() + .flatten()? + .into_array() + .as_primitive() + .scalar_buffer::(); + let subsecond_buf = array + .subsecond() + .flatten()? + .into_array() + .as_primitive() + .scalar_buffer::(); + + // TODO(aduffy): replace with vectorized implementation? + let values = days_buf + .iter() + .zip(seconds_buf.iter()) + .zip(subsecond_buf.iter()) + .map(|((d, s), ss)| d * 86_400 * divisor + s * divisor + ss) + .collect::>(); + + LocalDateTimeArray::try_new( + time_unit, + PrimitiveArray::from_vec(values, array.logical_validity().into_validity()).into_array(), + ) +} + +#[cfg(test)] +mod test { + use vortex::array::datetime::{LocalDateTimeArray, TimeUnit}; + use vortex::array::primitive::PrimitiveArray; + use vortex::compute::scalar_at::scalar_at; + use vortex::validity::Validity; + use vortex::IntoArray; + use vortex_dtype::{DType, ExtDType, ExtID, Nullability}; + + use crate::compute::decode_to_localdatetime; + use crate::DateTimePartsArray; + + #[test] + fn test_decode_to_localdatetime() { + let nanos = TimeUnit::Ns; + + let days = PrimitiveArray::from_vec(vec![2i64, 3], Validity::NonNullable).into_array(); + let seconds = PrimitiveArray::from_vec(vec![2i64, 3], Validity::NonNullable).into_array(); + let subsecond = PrimitiveArray::from_vec(vec![2i64, 3], Validity::NonNullable).into_array(); + + let date_times = DateTimePartsArray::try_new( + DType::Extension( + ExtDType::new( + ExtID::from(LocalDateTimeArray::ID), + Some(nanos.metadata().clone()), + ), + Nullability::NonNullable, + ), + days, + seconds, + subsecond, + ) + .unwrap(); + + let local = decode_to_localdatetime(&date_times.into_array()).unwrap(); + + let elem0: i64 = scalar_at(&local.timestamps(), 0) + .unwrap() + .try_into() + .unwrap(); + let elem1: i64 = scalar_at(&local.timestamps(), 1) + .unwrap() + .try_into() + .unwrap(); + + assert_eq!( + elem0, + vec![(2i64 * 86_400 * 1_000_000_000), 2i64 * 1_000_000_000, 2i64] + .into_iter() + .sum::(), + ); + assert_eq!( + elem1, + vec![(3i64 * 86_400 * 1_000_000_000), 3i64 * 1_000_000_000, 3i64] + .into_iter() + .sum::(), + ); + } +} diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 62c2cd7fed..bde98d42cf 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -300,7 +300,9 @@ pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult( packed: &[u8], bit_width: usize, diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index cbb05a2151..e7a3cc82f0 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -4,9 +4,11 @@ use vortex::array::constant::ConstantArray; use vortex::array::primitive::PrimitiveArray; use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; +use vortex::validity::ArrayValidity; use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; use vortex_dtype::{match_each_integer_ptype, NativePType, PType}; use vortex_error::{vortex_err, VortexResult}; +use vortex_scalar::Scalar; use crate::{FoRArray, FoREncoding}; @@ -28,6 +30,11 @@ impl EncodingCompression for FoREncoding { return None; } + // For all-null, cannot encode. + if parray.logical_validity().all_invalid() { + return None; + } + // Nothing for us to do if the min is already zero and tz == 0 let shift = trailing_zeros(array); let min = parray.statistics().compute_as_cast::(Stat::Min)?; @@ -53,7 +60,7 @@ impl EncodingCompression for FoREncoding { let child = match_each_integer_ptype!(parray.ptype(), |$T| { if shift == <$T>::PTYPE.bit_width() as u8 { - ConstantArray::new($T::default(), parray.len()).into_array() + ConstantArray::new(Scalar::zero::<$T>(parray.dtype().nullability()), parray.len()).into_array() } else { compress_primitive::<$T>(parray, shift, $T::try_from(&min)?).into_array() } diff --git a/vortex-scalar/src/primitive.rs b/vortex-scalar/src/primitive.rs index 71a16d52c6..f1e25f3958 100644 --- a/vortex-scalar/src/primitive.rs +++ b/vortex-scalar/src/primitive.rs @@ -123,6 +123,14 @@ macro_rules! primitive_scalar { .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) } } + + impl TryFrom for $T { + type Error = VortexError; + + fn try_from(value: Scalar) -> Result { + <$T>::try_from(&value) + } + } }; }