From 0a8ba7ebf4ba52a49f544b6ce2906590c64dba37 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 20 Mar 2024 10:40:40 +0000 Subject: [PATCH 1/2] Composite Arrays (#122) --- Cargo.lock | 2 +- bench-vortex/src/lib.rs | 31 ++- vortex-array/Cargo.toml | 2 +- vortex-array/src/array/composite/array.rs | 175 ++++++++++++++++ vortex-array/src/array/composite/as_arrow.rs | 96 --------- vortex-array/src/array/composite/compute.rs | 40 +++- vortex-array/src/array/composite/mod.rs | 190 ++---------------- vortex-array/src/array/composite/serde.rs | 48 +---- vortex-array/src/array/composite/typed.rs | 97 +++++++++ .../src/arrow/{convert.rs => dtypes.rs} | 15 +- vortex-array/src/arrow/mod.rs | 2 +- vortex-array/src/composite_dtypes.rs | 90 --------- vortex-array/src/datetime/README.md | 13 ++ vortex-array/src/datetime/localdatetime.rs | 72 +++++++ vortex-array/src/datetime/mod.rs | 41 ++++ vortex-array/src/dtype.rs | 16 +- vortex-array/src/encode.rs | 31 ++- vortex-array/src/lib.rs | 2 +- vortex-array/src/scalar/mod.rs | 4 +- vortex-array/src/serde/dtype.rs | 20 +- vortex-array/src/serde/mod.rs | 9 + 21 files changed, 546 insertions(+), 450 deletions(-) create mode 100644 vortex-array/src/array/composite/array.rs delete mode 100644 vortex-array/src/array/composite/as_arrow.rs create mode 100644 vortex-array/src/array/composite/typed.rs rename vortex-array/src/arrow/{convert.rs => dtypes.rs} (90%) delete mode 100644 vortex-array/src/composite_dtypes.rs create mode 100644 vortex-array/src/datetime/README.md create mode 100644 vortex-array/src/datetime/localdatetime.rs create mode 100644 vortex-array/src/datetime/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f899fc5db..1e9c15356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2579,7 +2579,7 @@ dependencies = [ "log", "num-traits", "num_enum", - "once_cell", + "paste", "rand", "rayon", "roaring", diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index eb0bd3456..06f9bf137 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -87,13 +87,13 @@ pub fn download_taxi_data() -> PathBuf { pub fn compress_taxi_data() -> ArrayRef { let file = File::open(download_taxi_data()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]); + let _mask = ProjectionMask::roots(builder.parquet_schema(), [1]); let _no_datetime_mask = ProjectionMask::roots( builder.parquet_schema(), [0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18], ); let reader = builder - //.with_projection(mask) + .with_projection(_mask) //.with_projection(no_datetime_mask) .with_batch_size(65_536) // .with_batch_size(5_000_000) @@ -152,6 +152,7 @@ mod test { use vortex::array::ArrayRef; use vortex::compute::as_arrow::as_arrow; use vortex::encode::FromArrow; + use vortex::serde::{ReadCtx, WriteCtx}; use crate::{compress_ctx, compress_taxi_data, download_taxi_data}; @@ -169,10 +170,32 @@ mod test { #[ignore] #[test] fn compression_ratio() { - setup_logger(LevelFilter::Warn); + setup_logger(LevelFilter::Info); _ = compress_taxi_data(); } + #[ignore] + #[test] + fn round_trip_serde() { + let file = File::open(download_taxi_data()).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let reader = builder.with_limit(1).build().unwrap(); + + for record_batch in reader.map(|batch_result| batch_result.unwrap()) { + let struct_arrow: ArrowStructArray = record_batch.into(); + let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); + let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false); + + let mut buf = Vec::::new(); + let mut write_ctx = WriteCtx::new(&mut buf); + write_ctx.write(vortex_array.as_ref()).unwrap(); + + let mut read = buf.as_slice(); + let mut read_ctx = ReadCtx::new(vortex_array.dtype(), &mut read); + read_ctx.read().unwrap(); + } + } + #[ignore] #[test] fn round_trip_arrow() { @@ -189,6 +212,8 @@ mod test { } } + // Ignoring since Struct arrays don't currently support equality. + // https://github.com/apache/arrow-rs/issues/5199 #[ignore] #[test] fn round_trip_arrow_compressed() { diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 8d6a41287..34258b0b0 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -32,7 +32,7 @@ linkme = "0.3.23" log = "0.4.20" num-traits = "0.2.18" num_enum = "0.7.2" -once_cell = "1.19.0" +paste = "1.0.14" rand = { version = "0.8.5", features = [] } rayon = "1.8.1" roaring = "0.10.3" diff --git a/vortex-array/src/array/composite/array.rs b/vortex-array/src/array/composite/array.rs new file mode 100644 index 000000000..0ab03c624 --- /dev/null +++ b/vortex-array/src/array/composite/array.rs @@ -0,0 +1,175 @@ +use std::any::Any; +use std::fmt::{Debug, Display}; +use std::sync::{Arc, RwLock}; + +use linkme::distributed_slice; + +use crate::array::composite::{ + find_extension, CompositeExtensionRef, CompositeID, TypedCompositeArray, +}; +use crate::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS}; +use crate::compress::EncodingCompression; +use crate::compute::ArrayCompute; +use crate::dtype::DType; +use crate::error::VortexResult; +use crate::formatter::{ArrayDisplay, ArrayFormatter}; +use crate::serde::{ArraySerde, BytesSerde, EncodingSerde}; +use crate::stats::{Stats, StatsCompute, StatsSet}; + +pub trait CompositeMetadata: + 'static + Debug + Display + Send + Sync + Sized + Clone + BytesSerde +{ + fn id(&self) -> CompositeID; +} + +#[derive(Debug, Clone)] +pub struct CompositeArray { + extension: CompositeExtensionRef, + metadata: Arc>, + underlying: ArrayRef, + dtype: DType, + stats: Arc>, +} + +impl CompositeArray { + pub fn new(id: CompositeID, metadata: Arc>, underlying: ArrayRef) -> Self { + let dtype = DType::Composite(id, underlying.dtype().is_nullable().into()); + let extension = find_extension(id.0).expect("Unrecognized composite extension"); + Self { + extension, + metadata, + underlying, + dtype, + stats: Arc::new(RwLock::new(StatsSet::new())), + } + } + + #[inline] + pub fn id(&self) -> CompositeID { + self.extension.id() + } + + #[inline] + pub fn extension(&self) -> CompositeExtensionRef { + self.extension + } + + pub fn metadata(&self) -> Arc> { + self.metadata.clone() + } + + #[inline] + pub fn underlying(&self) -> &dyn Array { + self.underlying.as_ref() + } + + pub fn as_typed(&self) -> TypedCompositeArray { + TypedCompositeArray::new( + M::deserialize(self.metadata().as_slice()).unwrap(), + dyn_clone::clone_box(self.underlying()), + ) + } + + pub fn as_typed_compute(&self) -> Box { + self.extension.as_typed_compute(self) + } +} + +impl Array for CompositeArray { + #[inline] + fn as_any(&self) -> &dyn Any { + self + } + + #[inline] + fn boxed(self) -> ArrayRef { + Box::new(self) + } + + #[inline] + fn into_any(self: Box) -> Box { + self + } + + #[inline] + fn len(&self) -> usize { + self.underlying.len() + } + + #[inline] + fn is_empty(&self) -> bool { + self.underlying.is_empty() + } + + #[inline] + fn dtype(&self) -> &DType { + &self.dtype + } + + #[inline] + fn stats(&self) -> Stats { + Stats::new(&self.stats, self) + } + + fn slice(&self, start: usize, stop: usize) -> VortexResult { + Ok(Self::new( + self.id(), + self.metadata.clone(), + self.underlying.slice(start, stop)?, + ) + .boxed()) + } + + #[inline] + fn encoding(&self) -> EncodingRef { + &CompositeEncoding + } + + #[inline] + fn nbytes(&self) -> usize { + self.underlying.nbytes() + } + + fn serde(&self) -> Option<&dyn ArraySerde> { + Some(self) + } +} + +impl StatsCompute for CompositeArray {} + +impl<'arr> AsRef<(dyn Array + 'arr)> for CompositeArray { + fn as_ref(&self) -> &(dyn Array + 'arr) { + self + } +} + +impl ArrayDisplay for CompositeArray { + fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { + f.property("metadata", format!("{:#?}", self.metadata().as_slice()))?; + f.child("underlying", self.underlying.as_ref()) + } +} + +#[derive(Debug)] +pub struct CompositeEncoding; + +impl CompositeEncoding { + pub const ID: EncodingId = EncodingId::new("vortex.composite"); +} + +#[distributed_slice(ENCODINGS)] +static ENCODINGS_COMPOSITE: EncodingRef = &CompositeEncoding; + +impl Encoding for CompositeEncoding { + fn id(&self) -> &EncodingId { + &Self::ID + } + + fn compression(&self) -> Option<&dyn EncodingCompression> { + Some(self) + } + + fn serde(&self) -> Option<&dyn EncodingSerde> { + Some(self) + } +} diff --git a/vortex-array/src/array/composite/as_arrow.rs b/vortex-array/src/array/composite/as_arrow.rs deleted file mode 100644 index 825c73bbe..000000000 --- a/vortex-array/src/array/composite/as_arrow.rs +++ /dev/null @@ -1,96 +0,0 @@ -use crate::array::composite::CompositeArray; -use crate::array::Array; -use crate::arrow::wrappers::as_nulls; -use crate::composite_dtypes::{TimeUnit, TimeUnitSerializer}; -use crate::compute::as_arrow::AsArrowArray; -use crate::compute::cast::cast; -use crate::compute::flatten::{flatten_bool, flatten_primitive, flatten_struct}; -use crate::compute::scalar_at::scalar_at; -use crate::dtype::DType; -use crate::error::{VortexError, VortexResult}; -use crate::ptype::PType; -use crate::stats::Stat; -use arrow_array::{ - ArrayRef as ArrowArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; -use arrow_buffer::NullBuffer; -use std::sync::Arc; - -impl AsArrowArray for CompositeArray { - fn as_arrow(&self) -> VortexResult { - // Decide based on the DType if we know how to do this or not... - match self.dtype() { - DType::Composite(id, _dtype, metadata) => match id.as_str() { - "zoneddatetime" => hacky_zoneddatetime_as_arrow(self.underlying(), metadata), - &_ => Err(VortexError::InvalidArgument( - format!("Cannot convert composite DType {} to arrow", id).into(), - )), - }, - _ => Err(VortexError::InvalidArgument( - format!("Cannot convert {} into Arrow array", self.dtype().clone()).into(), - )), - } - } -} - -fn hacky_zoneddatetime_as_arrow(array: &dyn Array, metadata: &[u8]) -> VortexResult { - // A ZonedDateTime is currently just a primitive that ignores the timezone... - let array = flatten_primitive(cast(array, &PType::I64.into())?.as_ref())?; - - let values = array.scalar_buffer::(); - let validity = as_nulls(array.validity())?; - - let time_unit = TimeUnitSerializer::deserialize(metadata); - Ok(match time_unit { - TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(values, validity)), - TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(values, validity)), - TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(values, validity)), - TimeUnit::S => Arc::new(TimestampSecondArray::new(values, validity)), - }) -} - -// FIXME(ngates): this is what ZonedDateTime should look like, but it's not implemented yet. -#[allow(dead_code)] -fn zoneddatetime_as_arrow(array: &dyn Array, metadata: &[u8]) -> VortexResult { - // A ZonedDateTime is a composite of {instant, timezone}. - // TODO(ngates): make this actually a composite of Instant, instead of directly a primitive. - let array = flatten_struct(array)?; - assert_eq!(array.names()[0].as_str(), "instant"); - assert_eq!(array.names()[1].as_str(), "timezone"); - - // Map the instant into an i64 primitive - let instant = array.fields().first().unwrap(); - let instant = flatten_primitive(cast(instant.as_ref(), &PType::I64.into())?.as_ref())?; - - // Extract the values and validity buffer - let values = instant.scalar_buffer::(); - let validity = instant - .validity() - .map(flatten_bool) - .transpose()? - .map(|b| NullBuffer::new(b.buffer().clone())); - - // Unwrap the constant timezone - let timezone = array.fields().get(1).unwrap(); - if !timezone - .stats() - .get_or_compute_as::(&Stat::IsConstant) - .unwrap_or(false) - { - return Err(VortexError::InvalidArgument( - "Timezone must be constant to convert into Arrow".into(), - )); - } - let _timezone = scalar_at(timezone.as_ref(), 0)?; - - // Extract the instant unit - let time_unit = TimeUnitSerializer::deserialize(metadata); - - Ok(match time_unit { - TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(values, validity)), - TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(values, validity)), - TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(values, validity)), - TimeUnit::S => Arc::new(TimestampSecondArray::new(values, validity)), - }) -} diff --git a/vortex-array/src/array/composite/compute.rs b/vortex-array/src/array/composite/compute.rs index 19a452340..fc4a72d75 100644 --- a/vortex-array/src/array/composite/compute.rs +++ b/vortex-array/src/array/composite/compute.rs @@ -1,6 +1,4 @@ -use itertools::Itertools; - -use crate::array::composite::CompositeArray; +use crate::array::composite::array::CompositeArray; use crate::array::downcast::DowncastArrayBuiltin; use crate::array::{Array, ArrayRef}; use crate::compute::as_arrow::AsArrowArray; @@ -8,8 +6,10 @@ use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::flatten::{FlattenFn, FlattenedArray}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::ArrayCompute; -use crate::error::VortexResult; +use crate::error::{VortexError, VortexResult}; use crate::scalar::Scalar; +use arrow_array::ArrayRef as ArrowArrayRef; +use itertools::Itertools; impl ArrayCompute for CompositeArray { fn as_arrow(&self) -> Option<&dyn AsArrowArray> { @@ -29,17 +29,35 @@ impl ArrayCompute for CompositeArray { } } +impl AsArrowArray for CompositeArray { + fn as_arrow(&self) -> VortexResult { + self.extension() + .as_typed_compute(self) + .as_arrow() + .map(|a| a.as_arrow()) + .unwrap_or_else(|| { + Err(VortexError::InvalidArgument( + format!( + "as_arrow not implemented for composite extension {}", + self.id() + ) + .into(), + )) + }) + } +} + impl AsContiguousFn for CompositeArray { fn as_contiguous(&self, arrays: Vec) -> VortexResult { + let composites = arrays + .iter() + .map(|array| array.as_composite().underlying()) + .map(dyn_clone::clone_box) + .collect_vec(); Ok(CompositeArray::new( self.id(), self.metadata().clone(), - as_contiguous( - arrays - .into_iter() - .map(|array| dyn_clone::clone_box(array.as_composite().underlying())) - .collect_vec(), - )?, + as_contiguous(composites)?, ) .boxed()) } @@ -53,6 +71,8 @@ impl FlattenFn for CompositeArray { impl ScalarAtFn for CompositeArray { fn scalar_at(&self, index: usize) -> VortexResult { + // TODO(ngates): this seems wrong... I don't think we just cast scalars like this. + // e.g. how do we know what a datetime is in? let underlying = scalar_at(self.underlying(), index)?; underlying.cast(self.dtype()) } diff --git a/vortex-array/src/array/composite/mod.rs b/vortex-array/src/array/composite/mod.rs index 327b0668f..02c2c0369 100644 --- a/vortex-array/src/array/composite/mod.rs +++ b/vortex-array/src/array/composite/mod.rs @@ -1,187 +1,31 @@ -use std::any::Any; -use std::sync::{Arc, RwLock}; +use std::fmt::{Display, Formatter}; use linkme::distributed_slice; -use crate::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS}; -use crate::compress::EncodingCompression; -use crate::dtype::{DType, Metadata}; -use crate::error::VortexResult; -use crate::formatter::{ArrayDisplay, ArrayFormatter}; -use crate::serde::{ArraySerde, EncodingSerde}; -use crate::stats::{Stats, StatsCompute, StatsSet}; +pub use array::*; +pub use typed::*; -mod as_arrow; +mod array; mod compress; mod compute; mod serde; +mod typed; -#[derive(Debug, Clone)] -pub struct CompositeArray { - underlying: ArrayRef, - dtype: DType, - stats: Arc>, -} - -impl CompositeArray { - pub fn new(id: Arc, metadata: Metadata, underlying: ArrayRef) -> Self { - let dtype = DType::Composite(id, Box::new(underlying.dtype().clone()), metadata); - Self { - underlying, - dtype, - stats: Arc::new(RwLock::new(StatsSet::new())), - } - } - - pub fn id(&self) -> Arc { - let DType::Composite(id, _, _) = &self.dtype else { - unreachable!() - }; - id.clone() - } - - pub fn metadata(&self) -> &Metadata { - let DType::Composite(_, _, metadata) = &self.dtype else { - unreachable!() - }; - metadata - } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] +pub struct CompositeID(pub &'static str); - #[inline] - pub fn underlying(&self) -> &dyn Array { - self.underlying.as_ref() +impl Display for CompositeID { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) } } -impl Array for CompositeArray { - #[inline] - fn as_any(&self) -> &dyn Any { - self - } - - #[inline] - fn boxed(self) -> ArrayRef { - Box::new(self) - } +#[distributed_slice] +pub static COMPOSITE_EXTENSIONS: [&'static dyn CompositeExtension] = [..]; - #[inline] - fn into_any(self: Box) -> Box { - self - } - - #[inline] - fn len(&self) -> usize { - self.underlying.len() - } - - #[inline] - fn is_empty(&self) -> bool { - self.underlying.is_empty() - } - - #[inline] - fn dtype(&self) -> &DType { - &self.dtype - } - - #[inline] - fn stats(&self) -> Stats { - Stats::new(&self.stats, self) - } - - fn slice(&self, start: usize, stop: usize) -> VortexResult { - Ok(Self::new( - self.id().clone(), - self.metadata().clone(), - self.underlying.slice(start, stop)?, - ) - .boxed()) - } - - #[inline] - fn encoding(&self) -> EncodingRef { - &CompositeEncoding - } - - #[inline] - fn nbytes(&self) -> usize { - self.underlying.nbytes() - } - - fn serde(&self) -> Option<&dyn ArraySerde> { - Some(self) - } -} - -impl StatsCompute for CompositeArray {} - -impl<'arr> AsRef<(dyn Array + 'arr)> for CompositeArray { - fn as_ref(&self) -> &(dyn Array + 'arr) { - self - } -} - -#[derive(Debug)] -pub struct CompositeEncoding; - -impl CompositeEncoding { - pub const ID: EncodingId = EncodingId::new("vortex.composite"); -} - -#[distributed_slice(ENCODINGS)] -static ENCODINGS_COMPOSITE: EncodingRef = &CompositeEncoding; - -impl Encoding for CompositeEncoding { - fn id(&self) -> &EncodingId { - &Self::ID - } - - fn compression(&self) -> Option<&dyn EncodingCompression> { - Some(self) - } - - fn serde(&self) -> Option<&dyn EncodingSerde> { - Some(self) - } -} - -impl ArrayDisplay for CompositeArray { - fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { - f.child("composite", self.underlying()) - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::composite_dtypes::{localtime, TimeUnit, TimeUnitSerializer}; - use crate::compute::scalar_at::scalar_at; - use crate::dtype::{IntWidth, Nullability}; - use crate::scalar::{CompositeScalar, PScalar, PrimitiveScalar}; - - #[test] - pub fn scalar() { - let dtype = localtime(TimeUnit::Us, IntWidth::_64, Nullability::NonNullable); - let arr = CompositeArray::new( - Arc::new("localtime".into()), - TimeUnitSerializer::serialize(TimeUnit::Us), - vec![64_799_000_000_i64, 43_000_000_000].into(), - ); - assert_eq!( - scalar_at(arr.as_ref(), 0).unwrap(), - CompositeScalar::new( - dtype.clone(), - Box::new(PrimitiveScalar::some(PScalar::I64(64_799_000_000)).into()), - ) - .into() - ); - assert_eq!( - scalar_at(arr.as_ref(), 1).unwrap(), - CompositeScalar::new( - dtype.clone(), - Box::new(PrimitiveScalar::some(PScalar::I64(43_000_000_000)).into()), - ) - .into() - ); - } +pub fn find_extension(id: &str) -> Option<&'static dyn CompositeExtension> { + COMPOSITE_EXTENSIONS + .iter() + .find(|ext| ext.id().0 == id) + .copied() } diff --git a/vortex-array/src/array/composite/serde.rs b/vortex-array/src/array/composite/serde.rs index ea07bb369..03a3ed86d 100644 --- a/vortex-array/src/array/composite/serde.rs +++ b/vortex-array/src/array/composite/serde.rs @@ -3,54 +3,26 @@ use crate::array::{Array, ArrayRef}; use crate::dtype::DType; use crate::error::VortexResult; use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; +use std::sync::Arc; impl ArraySerde for CompositeArray { fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> { - // TODO(ngates): just write the ID and metadata? - ctx.dtype(self.dtype())?; + ctx.write_slice(self.metadata().as_slice())?; + let underlying = self.underlying(); + ctx.dtype(underlying.dtype())?; ctx.write(self.underlying()) } } impl EncodingSerde for CompositeEncoding { fn read(&self, ctx: &mut ReadCtx) -> VortexResult { - let DType::Composite(id, underlying, metadata) = ctx.dtype()? else { - panic!("Invalid DType") + let DType::Composite(id, _) = *ctx.schema() else { + panic!("Expected composite schema") }; - Ok(CompositeArray::new(id, metadata, ctx.with_schema(&underlying).read()?).boxed()) - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use crate::array::composite::CompositeArray; - use crate::array::downcast::DowncastArrayBuiltin; - use crate::array::Array; - use crate::dtype::Metadata; - use crate::serde::test::roundtrip_array; - - #[test] - fn roundtrip() { - let arr = CompositeArray::new( - Arc::new("test".into()), - Metadata::default(), - vec![7u8, 37, 71, 97].into(), - ); - - let read_arr = roundtrip_array(arr.as_ref()).unwrap(); - - assert_eq!( - arr.underlying().as_primitive().buffer().typed_data::(), - read_arr - .as_composite() - .underlying() - .as_primitive() - .buffer() - .typed_data::() - ); + let metadata = ctx.read_slice()?; + let underling_dtype = ctx.dtype()?; + let underlying = ctx.with_schema(&underling_dtype).read()?; - assert_eq!(arr.dtype(), read_arr.dtype()); + Ok(CompositeArray::new(id, Arc::new(metadata), underlying).boxed()) } } diff --git a/vortex-array/src/array/composite/typed.rs b/vortex-array/src/array/composite/typed.rs new file mode 100644 index 000000000..084c5c064 --- /dev/null +++ b/vortex-array/src/array/composite/typed.rs @@ -0,0 +1,97 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use crate::array::composite::array::CompositeArray; +use crate::array::composite::{CompositeID, CompositeMetadata}; +use crate::array::{Array, ArrayRef}; +use crate::compute::ArrayCompute; + +pub trait CompositeExtension: Debug + Send + Sync + 'static { + fn id(&self) -> CompositeID; + + fn as_typed_compute(&self, array: &CompositeArray) -> Box; +} + +pub type CompositeExtensionRef = &'static dyn CompositeExtension; + +#[derive(Debug, Clone)] +pub struct TypedCompositeArray { + metadata: M, + underlying: ArrayRef, +} + +impl TypedCompositeArray { + pub fn new(metadata: M, underlying: ArrayRef) -> Self { + Self { + metadata, + underlying, + } + } + + #[inline] + pub fn metadata(&self) -> &M { + &self.metadata + } + + #[inline] + pub fn underlying(&self) -> &dyn Array { + self.underlying.as_ref() + } + + pub fn as_composite(&self) -> CompositeArray { + CompositeArray::new( + self.metadata().id(), + Arc::new(self.metadata().serialize()), + dyn_clone::clone_box(self.underlying()), + ) + } +} + +macro_rules! composite_impl { + ($id:expr, $T:ty) => { + use crate::array::composite::{ + CompositeArray, CompositeExtension, CompositeMetadata, COMPOSITE_EXTENSIONS, + }; + use crate::compute::ArrayCompute; + use crate::dtype::{DType, Nullability}; + use linkme::distributed_slice; + use paste::paste; + + paste! { + #[derive(Debug)] + pub struct [<$T Extension>]; + + impl [<$T Extension>] { + pub const ID: CompositeID = CompositeID($id); + + pub fn dtype(nullability: Nullability) -> DType { + DType::Composite(Self::ID, nullability) + } + } + + impl CompositeExtension for [<$T Extension>] { + fn id(&self) -> CompositeID { + Self::ID + } + + fn as_typed_compute(&self, array: &CompositeArray) -> Box { + if array.id() != Self::ID { + panic!("Incorrect CompositeID"); + } + Box::new(array.as_typed::<$T>()) + } + } + + impl CompositeMetadata for $T { + fn id(&self) -> CompositeID { + [<$T Extension>]::ID + } + } + + #[distributed_slice(COMPOSITE_EXTENSIONS)] + static ENCODINGS_COMPOSITE_EXT: &'static dyn CompositeExtension = &[<$T Extension>]; + } + }; +} + +pub(crate) use composite_impl; diff --git a/vortex-array/src/arrow/convert.rs b/vortex-array/src/arrow/dtypes.rs similarity index 90% rename from vortex-array/src/arrow/convert.rs rename to vortex-array/src/arrow/dtypes.rs index f0e3b3f0b..c00c036b2 100644 --- a/vortex-array/src/arrow/convert.rs +++ b/vortex-array/src/arrow/dtypes.rs @@ -6,8 +6,8 @@ use itertools::Itertools; use crate::array::struct_::StructArray; use crate::array::{Array, ArrayRef}; -use crate::composite_dtypes::{localdate, localtime, zoneddatetime, TimeUnit}; use crate::compute::cast::cast; +use crate::datetime::{LocalDateTimeExtension, TimeUnit}; use crate::dtype::DType::*; use crate::dtype::{DType, FloatWidth, IntWidth, Nullability}; use crate::encode::FromArrow; @@ -110,11 +110,14 @@ impl From<&Field> for DType { DataType::Utf8 | DataType::LargeUtf8 => Utf8(nullability), DataType::Binary | DataType::LargeBinary => Binary(nullability), // TODO(robert): what to do about this timezone? - DataType::Timestamp(u, _) => zoneddatetime(u.into(), nullability), - DataType::Date32 => localdate(IntWidth::_32, nullability), - DataType::Date64 => localdate(IntWidth::_64, nullability), - DataType::Time32(u) => localtime(u.into(), IntWidth::_32, nullability), - DataType::Time64(u) => localtime(u.into(), IntWidth::_64, nullability), + DataType::Timestamp(_u, tz) => match tz { + None => LocalDateTimeExtension::dtype(nullability), + Some(_) => unimplemented!("Timezone not yet supported"), + }, + // DataType::Date32 => localdate(IntWidth::_32, nullability), + // DataType::Date64 => localdate(IntWidth::_64, nullability), + // DataType::Time32(u) => localtime(u.into(), IntWidth::_32, nullability), + // DataType::Time64(u) => localtime(u.into(), IntWidth::_64, nullability), DataType::List(e) | DataType::LargeList(e) => { List(Box::new(e.as_ref().into()), nullability) } diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index ad0c08b3d..c87743391 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -1,2 +1,2 @@ -pub mod convert; +pub mod dtypes; pub mod wrappers; diff --git a/vortex-array/src/composite_dtypes.rs b/vortex-array/src/composite_dtypes.rs deleted file mode 100644 index 08ab92de9..000000000 --- a/vortex-array/src/composite_dtypes.rs +++ /dev/null @@ -1,90 +0,0 @@ -use std::fmt::{Display, Formatter}; -use std::sync::Arc; - -use crate::dtype::{DType, IntWidth, Nullability, Signedness}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] -pub enum TimeUnit { - Ns, - Us, - Ms, - S, -} - -impl Display for TimeUnit { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - TimeUnit::Ns => write!(f, "ns"), - TimeUnit::Us => write!(f, "us"), - TimeUnit::Ms => write!(f, "ms"), - TimeUnit::S => write!(f, "s"), - } - } -} - -pub struct TimeUnitSerializer; - -impl TimeUnitSerializer { - pub fn serialize(unit: TimeUnit) -> Vec { - vec![unit as u8] - } - - pub fn deserialize(bytes: &[u8]) -> TimeUnit { - match bytes[0] { - 0x00 => TimeUnit::Ns, - 0x01 => TimeUnit::Us, - 0x02 => TimeUnit::Ms, - 0x03 => TimeUnit::S, - _ => panic!("Unknown timeunit variant"), - } - } -} - -const LOCALTIME_DTYPE: &str = "localtime"; - -pub fn localtime(unit: TimeUnit, width: IntWidth, nullability: Nullability) -> DType { - DType::Composite( - Arc::new(LOCALTIME_DTYPE.to_string()), - Box::new(DType::Int(width, Signedness::Signed, nullability)), - TimeUnitSerializer::serialize(unit), - ) -} - -const LOCALDATE_DTYPE: &str = "localdate"; - -pub fn localdate(width: IntWidth, nullability: Nullability) -> DType { - DType::Composite( - Arc::new(LOCALDATE_DTYPE.to_string()), - Box::new(DType::Int(width, Signedness::Signed, nullability)), - vec![], - ) -} - -const INSTANT_DTYPE: &str = "instant"; - -pub fn instant(unit: TimeUnit, nullability: Nullability) -> DType { - DType::Composite( - Arc::new(INSTANT_DTYPE.to_string()), - Box::new(DType::Int(IntWidth::_64, Signedness::Signed, nullability)), - TimeUnitSerializer::serialize(unit), - ) -} - -const ZONEDDATETIME_DTYPE: &str = "zoneddatetime"; - -pub fn zoneddatetime(unit: TimeUnit, nullability: Nullability) -> DType { - DType::Composite( - Arc::new(ZONEDDATETIME_DTYPE.to_string()), - Box::new(DType::Struct( - vec![ - Arc::new("instant".to_string()), - Arc::new("timezone".to_string()), - ], - vec![ - DType::Int(IntWidth::_64, Signedness::Signed, nullability), - DType::Utf8(nullability), - ], - )), - TimeUnitSerializer::serialize(unit), - ) -} diff --git a/vortex-array/src/datetime/README.md b/vortex-array/src/datetime/README.md new file mode 100644 index 000000000..8e6bb6bb3 --- /dev/null +++ b/vortex-array/src/datetime/README.md @@ -0,0 +1,13 @@ +# Vortex Datetime Composite Extensions + +This module provides implementations of datetime types using composite arrays. + +## Arrow Conversion + +| Arrow Type | Vortex Type | | +|-----------------------|-----------------|----------------------------------| +| `time32/64` | `LocalTime` | Time since midnight | +| `date32/64` | `LocalDate` | Julian day | +| `timestamp(tz=None)` | `LocalDateTime` | Julian day + time since midnight | +| `timestamp(tz=UTC)` | `Instant` | Time since Unix epoch | +| `timestamp(tz=Other)` | `ZonedDateTime` | TZ aware time since Unix epoch | diff --git a/vortex-array/src/datetime/localdatetime.rs b/vortex-array/src/datetime/localdatetime.rs new file mode 100644 index 000000000..d297100c9 --- /dev/null +++ b/vortex-array/src/datetime/localdatetime.rs @@ -0,0 +1,72 @@ +use std::fmt::{Display, Formatter}; +use std::sync::Arc; + +use arrow_array::{ + ArrayRef as ArrowArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; + +use crate::array::composite::CompositeID; +use crate::array::composite::{composite_impl, TypedCompositeArray}; + +use crate::arrow::wrappers::as_nulls; +use crate::compute::as_arrow::AsArrowArray; +use crate::compute::cast::cast; +use crate::compute::flatten::flatten_primitive; +use crate::datetime::TimeUnit; +use crate::error::VortexResult; +use crate::ptype::PType; +use crate::serde::BytesSerde; + +#[derive(Debug, Clone)] +pub struct LocalDateTime { + time_unit: TimeUnit, +} + +composite_impl!("vortex.localdatetime", LocalDateTime); + +impl LocalDateTime { + pub fn new(time_unit: TimeUnit) -> Self { + Self { time_unit } + } +} + +impl Display for LocalDateTime { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.time_unit) + } +} + +impl BytesSerde for LocalDateTime { + fn serialize(&self) -> Vec { + self.time_unit.serialize() + } + + fn deserialize(metadata: &[u8]) -> VortexResult { + TimeUnit::deserialize(metadata).map(Self::new) + } +} + +pub type LocalDateTimeArray = TypedCompositeArray; + +impl ArrayCompute for LocalDateTimeArray { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + Some(self) + } +} + +impl AsArrowArray for LocalDateTimeArray { + fn as_arrow(&self) -> VortexResult { + // A LocalDateTime maps to an Arrow Timestamp array with no timezone. + let timestamps = flatten_primitive(cast(self.underlying(), &PType::I64.into())?.as_ref())?; + let validity = as_nulls(timestamps.validity())?; + let buffer = timestamps.scalar_buffer::(); + + Ok(match self.metadata().time_unit { + TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)), + TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(buffer, validity)), + TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(buffer, validity)), + TimeUnit::S => Arc::new(TimestampSecondArray::new(buffer, validity)), + }) + } +} diff --git a/vortex-array/src/datetime/mod.rs b/vortex-array/src/datetime/mod.rs new file mode 100644 index 000000000..b45e95047 --- /dev/null +++ b/vortex-array/src/datetime/mod.rs @@ -0,0 +1,41 @@ +mod localdatetime; + +use crate::error::VortexResult; +use crate::serde::BytesSerde; +pub use localdatetime::*; +use std::fmt::{Display, Formatter}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub enum TimeUnit { + Ns, + Us, + Ms, + S, +} + +impl Display for TimeUnit { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TimeUnit::Ns => write!(f, "ns"), + TimeUnit::Us => write!(f, "us"), + TimeUnit::Ms => write!(f, "ms"), + TimeUnit::S => write!(f, "s"), + } + } +} + +impl BytesSerde for TimeUnit { + fn serialize(&self) -> Vec { + vec![*self as u8] + } + + fn deserialize(data: &[u8]) -> VortexResult { + match data[0] { + 0x00 => Ok(TimeUnit::Ns), + 0x01 => Ok(TimeUnit::Us), + 0x02 => Ok(TimeUnit::Ms), + 0x03 => Ok(TimeUnit::S), + _ => Err("Unknown timeunit variant".into()), + } + } +} diff --git a/vortex-array/src/dtype.rs b/vortex-array/src/dtype.rs index 4a01869b2..1e8aa2b58 100644 --- a/vortex-array/src/dtype.rs +++ b/vortex-array/src/dtype.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use itertools::Itertools; +use crate::array::composite::CompositeID; use DType::*; use crate::ptype::PType; @@ -138,7 +139,7 @@ pub enum DType { Binary(Nullability), Struct(FieldNames, Vec), List(Box, Nullability), - Composite(Arc, Box, Metadata), + Composite(CompositeID, Nullability), } impl DType { @@ -162,7 +163,7 @@ impl DType { Binary(n) => matches!(n, Nullable), Struct(_, fs) => fs.iter().all(|f| f.is_nullable()), List(_, n) => matches!(n, Nullable), - Composite(_, d, _) => d.is_nullable(), + Composite(_, n) => matches!(n, Nullable), } } @@ -188,11 +189,7 @@ impl DType { fs.iter().map(|f| f.with_nullability(nullability)).collect(), ), List(c, _) => List(c.clone(), nullability), - Composite(n, d, m) => Composite( - n.clone(), - Box::new(d.with_nullability(nullability)), - m.clone(), - ), + Composite(id, _) => Composite(*id, nullability), } } @@ -225,8 +222,7 @@ impl Display for DType { .join(", ") ), List(c, n) => write!(f, "list({}){}", c, n), - // TODO(robert): Print metadata - Composite(n, d, _) => write!(f, "composite({}, [{}])", n, d,), + Composite(id, n) => write!(f, "<{}>{}", id, n), } } } @@ -281,6 +277,6 @@ mod test { #[test] fn size_of() { - assert_eq!(mem::size_of::(), 56); + assert_eq!(mem::size_of::(), 48); } } diff --git a/vortex-array/src/encode.rs b/vortex-array/src/encode.rs index 1d964037f..570f0da1e 100644 --- a/vortex-array/src/encode.rs +++ b/vortex-array/src/encode.rs @@ -19,15 +19,15 @@ use arrow_array::types::{ }; use arrow_buffer::buffer::{NullBuffer, OffsetBuffer}; use arrow_buffer::Buffer; -use arrow_schema::{DataType, Field, TimeUnit}; +use arrow_schema::{DataType, TimeUnit}; use crate::array::bool::BoolArray; -use crate::array::composite::CompositeArray; use crate::array::constant::ConstantArray; use crate::array::primitive::PrimitiveArray; use crate::array::struct_::StructArray; use crate::array::varbin::VarBinArray; use crate::array::{Array, ArrayRef}; +use crate::datetime::{LocalDateTime, LocalDateTimeArray}; use crate::dtype::DType; use crate::ptype::PType; use crate::scalar::NullScalar; @@ -64,14 +64,27 @@ impl FromArrow<&ArrowPrimitiveArray> for ArrayRef { nulls(value.nulls(), nullable, value.len()), ) .boxed(); + if T::DATA_TYPE.is_numeric() { - arr - } else { - let DType::Composite(id, _, metadata) = (&Field::new("_", T::DATA_TYPE, false)).into() - else { - panic!("Expected composite DType") - }; - CompositeArray::new(id, metadata, arr).boxed() + return arr; + } + + match T::DATA_TYPE { + DataType::Timestamp(time_unit, tz) => match tz { + // A timestamp with no timezone is the equivalent of an "unknown" timezone. + // Therefore, we must treat it as a LocalDateTime and not an Instant. + None => LocalDateTimeArray::new(LocalDateTime::new((&time_unit).into()), arr) + .as_composite() + .boxed(), + Some(_tz) => todo!(), + }, + DataType::Date32 => todo!(), + DataType::Date64 => todo!(), + DataType::Time32(_) => todo!(), + DataType::Time64(_) => todo!(), + DataType::Duration(_) => todo!(), + DataType::Interval(_) => todo!(), + _ => panic!("Invalid data type for PrimitiveArray"), } } } diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index fa8474344..c97210cdc 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -3,9 +3,9 @@ pub mod arrow; pub mod scalar; pub mod accessor; -pub mod composite_dtypes; pub mod compress; pub mod compute; +pub mod datetime; pub mod dtype; pub mod encode; pub mod error; diff --git a/vortex-array/src/scalar/mod.rs b/vortex-array/src/scalar/mod.rs index 935194173..d39799be0 100644 --- a/vortex-array/src/scalar/mod.rs +++ b/vortex-array/src/scalar/mod.rs @@ -123,7 +123,7 @@ impl Scalar { DType::Binary(_) => BinaryScalar::new(None).into(), DType::Struct(_, _) => StructScalar::new(dtype.clone(), vec![]).into(), DType::List(_, _) => ListScalar::new(dtype.clone(), None).into(), - DType::Composite(_, _, _) => unimplemented!("CompositeScalar"), + DType::Composite(_, _) => unimplemented!("CompositeScalar"), } } } @@ -172,6 +172,6 @@ mod test { #[test] fn size_of() { - assert_eq!(mem::size_of::(), 88); + assert_eq!(mem::size_of::(), 80); } } diff --git a/vortex-array/src/serde/dtype.rs b/vortex-array/src/serde/dtype.rs index d53793f01..2fd9ecee1 100644 --- a/vortex-array/src/serde/dtype.rs +++ b/vortex-array/src/serde/dtype.rs @@ -2,6 +2,7 @@ use leb128::read::Error; use std::io::Read; use std::sync::Arc; +use crate::array::composite::find_extension; use num_enum::{IntoPrimitive, TryFromPrimitive}; use crate::dtype::DType::*; @@ -91,10 +92,12 @@ impl<'a> DTypeReader<'a> { Ok(Struct(names, fields)) } DTypeTag::Composite => { - let name = unsafe { String::from_utf8_unchecked(self.read_slice()?) }; - let dtype = self.read()?; - let metadata = self.read_slice()?; - Ok(Composite(Arc::new(name), Box::new(dtype), metadata)) + let nullability = self.read_nullability()?; + let id = unsafe { String::from_utf8_unchecked(self.read_slice()?) }; + let extension = find_extension(id.as_str()).ok_or(VortexError::InvalidArgument( + "Failed to find extension".into(), + ))?; + Ok(Composite(extension.id(), nullability)) } } } @@ -167,10 +170,9 @@ impl<'a, 'b> DTypeWriter<'a, 'b> { self.write_nullability(*n)?; self.write(e.as_ref())? } - Composite(n, d, m) => { - self.writer.write_slice(n.as_bytes())?; - self.writer.dtype(d)?; - self.writer.write_slice(m)? + Composite(id, n) => { + self.write_nullability(*n)?; + self.writer.write_slice(id.0.as_bytes())?; } } @@ -225,7 +227,7 @@ impl From<&DType> for DTypeTag { Decimal(_, _, _) => DTypeTag::Decimal, List(_, _) => DTypeTag::List, Struct(_, _) => DTypeTag::Struct, - Composite(_, _, _) => DTypeTag::Composite, + Composite(_, _) => DTypeTag::Composite, } } } diff --git a/vortex-array/src/serde/mod.rs b/vortex-array/src/serde/mod.rs index c5d300b0c..331e862c4 100644 --- a/vortex-array/src/serde/mod.rs +++ b/vortex-array/src/serde/mod.rs @@ -22,6 +22,15 @@ pub trait EncodingSerde { fn read(&self, ctx: &mut ReadCtx) -> VortexResult; } +pub trait BytesSerde +where + Self: Sized, +{ + fn serialize(&self) -> Vec; + + fn deserialize(data: &[u8]) -> VortexResult; +} + pub struct ReadCtx<'a> { schema: &'a DType, encodings: Vec<&'static EncodingId>, From c6674ee46f4db74a4af28c5f650ad0ad43ab4881 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 20 Mar 2024 13:09:35 +0000 Subject: [PATCH 2/2] flatten ALP arrays (#123) * wip * flatten * consolidate * asdfasd * fixes * tests and fixes * remove extra * lints --- vortex-alp/src/compress.rs | 94 ++++++++++++++++++++++--- vortex-alp/src/compute.rs | 45 ++++++------ vortex-array/src/array/primitive/mod.rs | 6 +- vortex-array/src/ptype.rs | 16 ----- 4 files changed, 110 insertions(+), 51 deletions(-) diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index e58336b26..1d4ba36a4 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -1,16 +1,35 @@ -use crate::alp::ALPFloat; +use itertools::Itertools; + use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; use vortex::array::sparse::SparseArray; use vortex::array::{Array, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compute::flatten::flatten_primitive; +use vortex::compute::patch::patch; use vortex::error::{VortexError, VortexResult}; use vortex::ptype::{NativePType, PType}; +use crate::alp::ALPFloat; use crate::array::{ALPArray, ALPEncoding}; use crate::downcast::DowncastALP; use crate::Exponents; +#[macro_export] +macro_rules! match_each_alp_float_ptype { + ($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({ + macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )} + use vortex::error::VortexError; + use vortex::ptype::PType; + let ptype = $self; + match ptype { + PType::F32 => Ok(__with__! { f32 }), + PType::F64 => Ok(__with__! { f64 }), + _ => Err(VortexError::InvalidPType(ptype)) + } + }) +} + impl EncodingCompression for ALPEncoding { fn can_compress( &self, @@ -39,15 +58,10 @@ impl EncodingCompression for ALPEncoding { // TODO(ngates): fill forward nulls let parray = array.as_primitive(); - let (exponents, encoded, patches) = match parray.ptype() { - PType::F32 => { - encode_to_array(parray.typed_data::(), like_alp.map(|l| l.exponents())) - } - PType::F64 => { - encode_to_array(parray.typed_data::(), like_alp.map(|l| l.exponents())) - } - _ => panic!("Unsupported ptype"), - }; + let (exponents, encoded, patches) = match_each_alp_float_ptype!( + *parray.ptype(), |$T| { + encode_to_array(parray.typed_data::<$T>(), like_alp.map(|l| l.exponents())) + })?; let compressed_encoded = ctx .named("packed") @@ -90,7 +104,7 @@ where ) } -pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { +pub(crate) fn alp_encode(parray: &PrimitiveArray) -> VortexResult { let (exponents, encoded, patches) = match parray.ptype() { PType::F32 => encode_to_array(parray.typed_data::(), None), PType::F64 => encode_to_array(parray.typed_data::(), None), @@ -99,6 +113,39 @@ pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { Ok(ALPArray::new(encoded, exponents, patches)) } +pub fn decompress(array: &ALPArray) -> VortexResult { + let encoded = flatten_primitive(array.encoded())?; + let decoded = match_each_alp_float_ptype!(array.dtype().try_into().unwrap(), |$T| { + use vortex::array::CloneOptionalArray; + PrimitiveArray::from_nullable( + decompress_primitive::<$T>(encoded.typed_data(), array.exponents()), + encoded.validity().clone_optional(), + ) + })?; + if let Some(patches) = array.patches() { + // TODO(#121): right now, applying patches forces an extraneous copy of the array data + let patched = patch(&decoded, patches)?; + let patched_encoding_id = patched.encoding().id().clone(); + patched + .into_any() + .downcast::() + .map_err(|_| VortexError::InvalidEncoding(patched_encoding_id)) + .map(|boxed| *boxed) + } else { + Ok(decoded) + } +} + +fn decompress_primitive( + values: &[T::ALPInt], + exponents: &Exponents, +) -> Vec { + values + .iter() + .map(|&v| T::decode_single(v, exponents)) + .collect_vec() +} + #[cfg(test)] mod tests { use super::*; @@ -113,6 +160,9 @@ mod tests { vec![1234; 1025] ); assert_eq!(encoded.exponents(), &Exponents { e: 4, f: 1 }); + + let decoded = decompress(&encoded).unwrap(); + assert_eq!(array.typed_data::(), decoded.typed_data::()); } #[test] @@ -126,5 +176,27 @@ mod tests { vec![0, 1234, 0] ); assert_eq!(encoded.exponents(), &Exponents { e: 4, f: 1 }); + + let decoded = decompress(&encoded).unwrap(); + let expected = vec![0f32, 1.234f32, 0f32]; + assert_eq!(decoded.typed_data::(), expected.as_slice()); + } + + #[test] + #[allow(clippy::approx_constant)] + fn test_patched_compress() { + let values = vec![1.234f64, 2.718, std::f64::consts::PI, 4.0]; + let array = PrimitiveArray::from(values.clone()); + let encoded = alp_encode(&array).unwrap(); + println!("Encoded {:?}", encoded); + assert!(encoded.patches().is_some()); + assert_eq!( + encoded.encoded().as_primitive().typed_data::(), + vec![1234i64, 2718, 2718, 4000] // fill forward + ); + assert_eq!(encoded.exponents(), &Exponents { e: 3, f: 0 }); + + let decoded = decompress(&encoded).unwrap(); + assert_eq!(values, decoded.typed_data::()); } } diff --git a/vortex-alp/src/compute.rs b/vortex-alp/src/compute.rs index 68867d26c..714239805 100644 --- a/vortex-alp/src/compute.rs +++ b/vortex-alp/src/compute.rs @@ -1,19 +1,29 @@ -use crate::alp::ALPFloat; -use crate::ALPArray; -use std::f32; use vortex::array::Array; +use vortex::compute::flatten::{FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::ArrayCompute; -use vortex::dtype::{DType, FloatWidth}; -use vortex::error::{VortexError, VortexResult}; +use vortex::error::VortexResult; use vortex::scalar::Scalar; +use crate::compress::decompress; +use crate::{match_each_alp_float_ptype, ALPArray, ALPFloat}; + impl ArrayCompute for ALPArray { + fn flatten(&self) -> Option<&dyn FlattenFn> { + Some(self) + } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } } +impl FlattenFn for ALPArray { + fn flatten(&self) -> VortexResult { + decompress(self).map(FlattenedArray::Primitive) + } +} + impl ScalarAtFn for ALPArray { fn scalar_at(&self, index: usize) -> VortexResult { if let Some(patch) = self.patches().and_then(|p| scalar_at(p, index).ok()) { @@ -21,23 +31,12 @@ impl ScalarAtFn for ALPArray { } let encoded_val = scalar_at(self.encoded(), index)?; - - match self.dtype() { - DType::Float(FloatWidth::_32, _) => { - let encoded_val: i32 = encoded_val.try_into().unwrap(); - Ok(Scalar::from(::decode_single( - encoded_val, - self.exponents(), - ))) - } - DType::Float(FloatWidth::_64, _) => { - let encoded_val: i64 = encoded_val.try_into().unwrap(); - Ok(Scalar::from(::decode_single( - encoded_val, - self.exponents(), - ))) - } - _ => Err(VortexError::InvalidDType(self.dtype().clone())), - } + match_each_alp_float_ptype!(self.dtype().try_into().unwrap(), |$T| { + let encoded_val: <$T as ALPFloat>::ALPInt = encoded_val.try_into().unwrap(); + Scalar::from(<$T as ALPFloat>::decode_single( + encoded_val, + self.exponents(), + )) + }) } } diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index ae122cf40..166def627 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -131,7 +131,11 @@ impl PrimitiveArray { pub fn typed_data(&self) -> &[T] { if self.ptype() != &T::PTYPE { - panic!("Invalid PType") + panic!( + "Invalid PType! Expected {}, got self.ptype {}", + T::PTYPE, + self.ptype() + ); } self.buffer().typed_data() } diff --git a/vortex-array/src/ptype.rs b/vortex-array/src/ptype.rs index 65910487b..122d59b86 100644 --- a/vortex-array/src/ptype.rs +++ b/vortex-array/src/ptype.rs @@ -107,22 +107,6 @@ macro_rules! match_each_integer_ptype { } pub use match_each_integer_ptype; -#[macro_export] -macro_rules! match_each_unsigned_integer_ptype { - ($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({ - macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )} - use $crate::ptype::PType; - match $self { - PType::U8 => __with__! { u8 }, - PType::U16 => __with__! { u16 }, - PType::U32 => __with__! { u32 }, - PType::U64 => __with__! { u64 }, - _ => panic!("Unsupported ptype {:?}", $self), - } - }) -} -pub use match_each_unsigned_integer_ptype; - impl PType { pub const fn is_unsigned_int(self) -> bool { matches!(self, PType::U8 | PType::U16 | PType::U32 | PType::U64)