diff --git a/vortex-array2/src/array/composite/array.rs b/vortex-array2/src/array/composite/array.rs new file mode 100644 index 0000000000..11b5e8bb2d --- /dev/null +++ b/vortex-array2/src/array/composite/array.rs @@ -0,0 +1,135 @@ +use std::collections::HashMap; + +use vortex::scalar::AsBytes; +use vortex_error::VortexResult; +use vortex_schema::{CompositeID, DType}; + +use crate::array::composite::{find_extension, CompositeExtensionRef, TypedCompositeArray}; +use crate::compute::ArrayCompute; +use crate::stats::ArrayStatisticsCompute; +use crate::validity::{ArrayValidity, LogicalValidity}; +use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; +use crate::{ + impl_encoding, ArrayFlatten, IntoArrayData, TryDeserializeArrayMetadata, + TrySerializeArrayMetadata, +}; + +pub trait UnderlyingMetadata: + 'static + Send + Sync + Debug + TrySerializeArrayMetadata + for<'m> TryDeserializeArrayMetadata<'m> +{ + fn id(&self) -> CompositeID; +} + +impl_encoding!("vortex.composite", Composite); + +#[derive(Debug, Clone)] +pub struct CompositeMetadata { + ext: CompositeExtensionRef, + underlying_dtype: DType, + underlying_metadata: Arc<[u8]>, +} + +impl<'a> CompositeArray<'a> { + pub fn new(id: CompositeID, metadata: Arc<[u8]>, underlying: Array<'a>) -> Self { + let dtype = DType::Composite(id, underlying.dtype().is_nullable().into()); + let ext = find_extension(id.0).expect("Unrecognized composite extension"); + Self::try_from_parts( + dtype, + CompositeMetadata { + ext, + underlying_dtype: underlying.dtype().clone(), + underlying_metadata: metadata, + }, + vec![].into(), + vec![underlying.into_array_data()].into(), + HashMap::default(), + ) + .unwrap() + } +} + +impl CompositeArray<'_> { + #[inline] + pub fn id(&self) -> CompositeID { + self.metadata().ext.id() + } + + #[inline] + pub fn extension(&self) -> CompositeExtensionRef { + find_extension(self.id().0).expect("Unrecognized composite extension") + } + + pub fn underlying_metadata(&self) -> &Arc<[u8]> { + &self.metadata().underlying_metadata + } + + pub fn underlying_dtype(&self) -> &DType { + &self.metadata().underlying_dtype + } + + #[inline] + pub fn underlying(&self) -> Array { + self.array() + .child(0, self.underlying_dtype()) + .expect("CompositeArray must have an underlying array") + } + + pub fn with_compute(&self, mut f: F) -> R + where + F: FnMut(&dyn ArrayCompute) -> R, + { + let mut result = None; + + self.extension() + .with_compute(self, &mut |c| { + result = Some(f(c)); + Ok(()) + }) + .unwrap(); + + // Now we unwrap the optional, which we know to be populated by the closure. + result.unwrap() + } + + pub fn as_typed TryDeserializeArrayMetadata<'a>>( + &self, + ) -> VortexResult> { + Ok(TypedCompositeArray::new( + M::try_deserialize_metadata(Some(self.underlying_metadata().as_bytes()))?, + self.underlying().clone(), + )) + } +} + +impl ArrayTrait for CompositeArray<'_> { + fn len(&self) -> usize { + self.underlying().len() + } +} + +impl ArrayFlatten for CompositeArray<'_> { + fn flatten<'a>(self) -> VortexResult> + where + Self: 'a, + { + Ok(Flattened::Composite(self)) + } +} + +impl ArrayValidity for CompositeArray<'_> { + fn is_valid(&self, index: usize) -> bool { + self.underlying().with_dyn(|a| a.is_valid(index)) + } + + fn logical_validity(&self) -> LogicalValidity { + self.underlying().with_dyn(|a| a.logical_validity()) + } +} + +impl AcceptArrayVisitor for CompositeArray<'_> { + fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { + visitor.visit_child("underlying", &self.underlying()) + } +} + +impl ArrayStatisticsCompute for CompositeArray<'_> {} diff --git a/vortex-array2/src/array/composite/compute.rs b/vortex-array2/src/array/composite/compute.rs new file mode 100644 index 0000000000..ac6b3b3d21 --- /dev/null +++ b/vortex-array2/src/array/composite/compute.rs @@ -0,0 +1,95 @@ +use arrow_array::ArrayRef as ArrowArrayRef; +use itertools::Itertools; +use vortex::scalar::Scalar; +use vortex_error::{vortex_err, VortexResult}; + +use crate::array::composite::array::CompositeArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; +use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; +use crate::compute::slice::{slice, SliceFn}; +use crate::compute::take::{take, TakeFn}; +use crate::compute::ArrayCompute; +use crate::{Array, IntoArray, OwnedArray}; + +impl ArrayCompute for CompositeArray<'_> { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + Some(self) + } + + fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + Some(self) + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + + fn slice(&self) -> Option<&dyn SliceFn> { + Some(self) + } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} + +impl AsArrowArray for CompositeArray<'_> { + fn as_arrow(&self) -> VortexResult { + self.with_compute(|c| { + c.as_arrow().map(|a| a.as_arrow()).unwrap_or_else(|| { + Err(vortex_err!( + NotImplemented: "as_arrow", + format!("composite extension {}", self.id()) + )) + }) + }) + } +} + +impl AsContiguousFn for CompositeArray<'_> { + fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { + let composites = arrays + .iter() + .map(|array| CompositeArray::try_from(array).unwrap()) + .collect_vec(); + let underlyings = composites.iter().map(|c| c.underlying()).collect_vec(); + Ok(CompositeArray::new( + self.id(), + self.underlying_metadata().clone(), + as_contiguous(&underlyings)?, + ) + .into_array()) + } +} + +impl ScalarAtFn for CompositeArray<'_> { + fn scalar_at(&self, index: usize) -> VortexResult { + // TODO(ngates): this seems wrong... I don't think we just cast scalars like this. + // e.g. how do we know what a datetime is in? + let underlying = scalar_at(&self.underlying(), index)?; + underlying.cast(self.dtype()) + } +} + +impl TakeFn for CompositeArray<'_> { + fn take(&self, indices: &Array) -> VortexResult { + Ok(CompositeArray::new( + self.id(), + self.underlying_metadata().clone(), + take(&self.underlying(), indices)?, + ) + .into_array()) + } +} + +impl SliceFn for CompositeArray<'_> { + fn slice(&self, start: usize, stop: usize) -> VortexResult { + Ok(CompositeArray::new( + self.id(), + self.underlying_metadata().clone(), + slice(&self.underlying(), start, stop)?, + ) + .into_array()) + } +} diff --git a/vortex-array2/src/array/composite/mod.rs b/vortex-array2/src/array/composite/mod.rs new file mode 100644 index 0000000000..a72e9665b5 --- /dev/null +++ b/vortex-array2/src/array/composite/mod.rs @@ -0,0 +1,23 @@ +pub use array::*; +use linkme::distributed_slice; +pub use typed::*; +use vortex_schema::CompositeID; + +mod array; +mod compute; +mod serde; +mod typed; + +#[distributed_slice] +pub static VORTEX_COMPOSITE_EXTENSIONS: [&'static dyn CompositeExtension] = [..]; + +pub fn find_extension(id: &str) -> Option<&'static dyn CompositeExtension> { + VORTEX_COMPOSITE_EXTENSIONS + .iter() + .find(|ext| ext.id().0 == id) + .copied() +} + +pub fn find_extension_id(id: &str) -> Option { + find_extension(id).map(|e| e.id()) +} diff --git a/vortex-array2/src/array/composite/serde.rs b/vortex-array2/src/array/composite/serde.rs new file mode 100644 index 0000000000..99e021edf3 --- /dev/null +++ b/vortex-array2/src/array/composite/serde.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +use vortex_error::VortexResult; + +use crate::array::composite::CompositeMetadata; +use crate::{TryDeserializeArrayMetadata, TrySerializeArrayMetadata}; + +impl TrySerializeArrayMetadata for CompositeMetadata { + fn try_serialize_metadata(&self) -> VortexResult> { + todo!() + } +} + +impl TryDeserializeArrayMetadata<'_> for CompositeMetadata { + fn try_deserialize_metadata(_metadata: Option<&[u8]>) -> VortexResult { + todo!() + } +} diff --git a/vortex-array2/src/array/composite/typed.rs b/vortex-array2/src/array/composite/typed.rs new file mode 100644 index 0000000000..e9593d2606 --- /dev/null +++ b/vortex-array2/src/array/composite/typed.rs @@ -0,0 +1,123 @@ +use std::fmt::Debug; + +use vortex_error::VortexResult; +use vortex_schema::CompositeID; +use vortex_schema::DType; + +use crate::array::composite::array::CompositeArray; +use crate::array::composite::UnderlyingMetadata; +use crate::compute::ArrayCompute; +use crate::Array; + +pub trait CompositeExtension: Debug + Send + Sync + 'static { + fn id(&self) -> CompositeID; + + fn with_compute<'a>( + &self, + array: &'a CompositeArray<'a>, + f: &mut dyn for<'b> FnMut(&'b (dyn ArrayCompute + 'a)) -> VortexResult<()>, + ) -> VortexResult<()>; +} + +pub type CompositeExtensionRef = &'static dyn CompositeExtension; + +#[derive(Debug, Clone)] +pub struct TypedCompositeArray<'a, M: UnderlyingMetadata> { + metadata: M, + underlying: Array<'a>, + dtype: DType, +} + +impl<'a, M: UnderlyingMetadata> TypedCompositeArray<'a, M> { + pub fn new(metadata: M, underlying: Array<'a>) -> Self { + let dtype = DType::Composite(metadata.id(), underlying.dtype().is_nullable().into()); + Self { + metadata, + underlying, + dtype, + } + } + + #[inline] + pub fn underlying_metadata(&self) -> &M { + &self.metadata + } + + #[inline] + pub fn underlying(&self) -> &Array<'a> { + &self.underlying + } + + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + + pub fn as_composite(&self) -> VortexResult> { + Ok(CompositeArray::new( + self.underlying_metadata().id(), + self.underlying_metadata().try_serialize_metadata()?, + self.underlying().clone(), + )) + } +} + +#[macro_export] +macro_rules! impl_composite { + ($id:expr, $T:ty) => { + use linkme::distributed_slice; + use paste::paste; + use vortex_schema::{CompositeID, DType, Nullability}; + use $crate::array::composite::{ + CompositeArray, CompositeExtension, TypedCompositeArray, UnderlyingMetadata, + VORTEX_COMPOSITE_EXTENSIONS, + }; + use $crate::compute::ArrayCompute; + use $crate::TryDeserializeArrayMetadata; + + paste! { + #[derive(Debug)] + pub struct [<$T Extension>]; + + impl [<$T Extension>] { + pub const ID: CompositeID = CompositeID($id); + + pub fn dtype(nullability: Nullability) -> DType { + DType::Composite(Self::ID, nullability) + } + } + + impl CompositeExtension for [<$T Extension>] { + fn id(&self) -> CompositeID { + Self::ID + } + + fn with_compute<'a>( + &self, + array: &'a CompositeArray<'a>, + f: &mut dyn for<'b> FnMut(&'b (dyn ArrayCompute + 'a)) -> VortexResult<()>, + ) -> VortexResult<()> { + if array.id() != Self::ID { + panic!("Incorrect CompositeID"); + } + let typed = TypedCompositeArray::new( + $T::try_deserialize_metadata(Some(array.underlying_metadata().as_ref()))?, + array.underlying().clone(), + ); + f(&typed) + } + } + + impl UnderlyingMetadata for $T { + fn id(&self) -> CompositeID { + [<$T Extension>]::ID + } + } + + #[distributed_slice(VORTEX_COMPOSITE_EXTENSIONS)] + static ENCODINGS_COMPOSITE_EXT: &'static dyn CompositeExtension = &[<$T Extension>]; + } + }; +} + +pub use impl_composite; diff --git a/vortex-array2/src/array/datetime/README.md b/vortex-array2/src/array/datetime/README.md new file mode 100644 index 0000000000..8e6bb6bb36 --- /dev/null +++ b/vortex-array2/src/array/datetime/README.md @@ -0,0 +1,13 @@ +# Vortex Datetime Composite Extensions + +This module provides implementations of datetime types using composite arrays. + +## Arrow Conversion + +| Arrow Type | Vortex Type | | +|-----------------------|-----------------|----------------------------------| +| `time32/64` | `LocalTime` | Time since midnight | +| `date32/64` | `LocalDate` | Julian day | +| `timestamp(tz=None)` | `LocalDateTime` | Julian day + time since midnight | +| `timestamp(tz=UTC)` | `Instant` | Time since Unix epoch | +| `timestamp(tz=Other)` | `ZonedDateTime` | TZ aware time since Unix epoch | diff --git a/vortex-array2/src/array/datetime/localdatetime.rs b/vortex-array2/src/array/datetime/localdatetime.rs new file mode 100644 index 0000000000..7146c709ed --- /dev/null +++ b/vortex-array2/src/array/datetime/localdatetime.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use arrow_array::{ + ArrayRef as ArrowArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; +use serde::{Deserialize, Serialize}; +use vortex::ptype::PType; +use vortex_error::VortexResult; + +use crate::array::datetime::TimeUnit; +use crate::compute::as_arrow::AsArrowArray; +use crate::compute::cast::cast; +use crate::impl_composite; +use crate::validity::ArrayValidity; + +impl_composite!("vortex.localdatetime", LocalDateTime); + +pub type LocalDateTimeArray<'a> = TypedCompositeArray<'a, LocalDateTime>; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalDateTime { + time_unit: TimeUnit, +} + +impl ArrayCompute for LocalDateTimeArray<'_> { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + Some(self) + } +} + +impl AsArrowArray for LocalDateTimeArray<'_> { + fn as_arrow(&self) -> VortexResult { + // A LocalDateTime maps to an Arrow Timestamp array with no timezone. + let timestamps = cast(self.underlying(), PType::I64.into())?.flatten_primitive()?; + let validity = timestamps.logical_validity().to_null_buffer()?; + let buffer = timestamps.scalar_buffer::(); + + Ok(match self.underlying_metadata().time_unit { + TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)), + TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(buffer, validity)), + TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(buffer, validity)), + TimeUnit::S => Arc::new(TimestampSecondArray::new(buffer, validity)), + }) + } +} diff --git a/vortex-array2/src/array/datetime/mod.rs b/vortex-array2/src/array/datetime/mod.rs new file mode 100644 index 0000000000..71b621779f --- /dev/null +++ b/vortex-array2/src/array/datetime/mod.rs @@ -0,0 +1,25 @@ +use std::fmt::{Display, Formatter}; + +pub use localdatetime::*; +use serde::{Deserialize, Serialize}; + +mod localdatetime; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)] +pub enum TimeUnit { + Ns, + Us, + Ms, + S, +} + +impl Display for TimeUnit { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TimeUnit::Ns => write!(f, "ns"), + TimeUnit::Us => write!(f, "us"), + TimeUnit::Ms => write!(f, "ms"), + TimeUnit::S => write!(f, "s"), + } + } +} diff --git a/vortex-array2/src/array/mod.rs b/vortex-array2/src/array/mod.rs index 7b57e9f484..320027199b 100644 --- a/vortex-array2/src/array/mod.rs +++ b/vortex-array2/src/array/mod.rs @@ -1,6 +1,8 @@ pub mod bool; pub mod chunked; +pub mod composite; pub mod constant; +pub mod datetime; pub mod primitive; pub mod r#struct; pub mod varbin; diff --git a/vortex-array2/src/flatten.rs b/vortex-array2/src/flatten.rs index 5e1241d874..35a2b1e00c 100644 --- a/vortex-array2/src/flatten.rs +++ b/vortex-array2/src/flatten.rs @@ -2,6 +2,7 @@ use vortex_error::VortexResult; use crate::array::bool::BoolArray; use crate::array::chunked::ChunkedArray; +use crate::array::composite::CompositeArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; use crate::array::varbin::VarBinArray; @@ -12,6 +13,7 @@ use crate::{Array, IntoArray}; pub enum Flattened<'a> { Bool(BoolArray<'a>), Chunked(ChunkedArray<'a>), + Composite(CompositeArray<'a>), Primitive(PrimitiveArray<'a>), Struct(StructArray<'a>), VarBin(VarBinArray<'a>), @@ -45,6 +47,7 @@ impl<'a> IntoArray<'a> for Flattened<'a> { Flattened::Struct(a) => a.into_array(), Flattened::Chunked(a) => a.into_array(), Flattened::VarBin(a) => a.into_array(), + Flattened::Composite(a) => a.into_array(), } } } diff --git a/vortex-array2/src/implementation.rs b/vortex-array2/src/implementation.rs index a8716f5c9f..68f1362643 100644 --- a/vortex-array2/src/implementation.rs +++ b/vortex-array2/src/implementation.rs @@ -3,7 +3,7 @@ use vortex_error::VortexError; use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::encoding::{ArrayEncodingExt, EncodingId}; use crate::{Array, ArrayMetadata}; -use crate::{ArrayTrait, TryDeserializeArrayMetadata, TrySerializeArrayMetadata}; +use crate::{ArrayTrait, TryDeserializeArrayMetadata}; /// Trait the defines the set of types relating to an array. /// Because it has associated types it can't be used as a trait object. @@ -12,10 +12,7 @@ pub trait ArrayDef { const ENCODING: EncodingRef; type Array<'a>: ArrayTrait + TryFrom, Error = VortexError> + 'a; - type Metadata: ArrayMetadata - + Clone - + TrySerializeArrayMetadata - + for<'a> TryDeserializeArrayMetadata<'a>; + type Metadata: ArrayMetadata + Clone + for<'m> TryDeserializeArrayMetadata<'m>; type Encoding: ArrayEncoding + ArrayEncodingExt; } diff --git a/vortex-array2/src/lib.rs b/vortex-array2/src/lib.rs index 70f6615495..c18907a8a3 100644 --- a/vortex-array2/src/lib.rs +++ b/vortex-array2/src/lib.rs @@ -1,5 +1,3 @@ -extern crate core; - mod accessor; pub mod array; mod arrow; diff --git a/vortex-array2/src/metadata.rs b/vortex-array2/src/metadata.rs index bfc5935b56..de9a6177b0 100644 --- a/vortex-array2/src/metadata.rs +++ b/vortex-array2/src/metadata.rs @@ -19,13 +19,15 @@ pub trait TrySerializeArrayMetadata { fn try_serialize_metadata(&self) -> VortexResult>; } +// TODO(ngates): move 'm lifetime into the function body since the result isn't tied to it. +// Although maybe we should make the result tied to ti? pub trait TryDeserializeArrayMetadata<'m>: Sized { // FIXME(ngates): we could push buffer/child validation into here. fn try_deserialize_metadata(metadata: Option<&'m [u8]>) -> VortexResult; } /// Provide default implementation for metadata serialization based on flexbuffers serde. -impl TrySerializeArrayMetadata for M { +impl TrySerializeArrayMetadata for M { fn try_serialize_metadata(&self) -> VortexResult> { let mut ser = FlexbufferSerializer::new(); self.serialize(&mut ser)?; @@ -33,7 +35,7 @@ impl TrySerializeArrayMetadata for M { } } -impl<'de, M: ArrayMetadata + Deserialize<'de>> TryDeserializeArrayMetadata<'de> for M { +impl<'de, M: Deserialize<'de>> TryDeserializeArrayMetadata<'de> for M { fn try_deserialize_metadata(metadata: Option<&'de [u8]>) -> VortexResult { let bytes = metadata.ok_or_else(|| vortex_err!("Array requires metadata bytes"))?; Ok(M::deserialize(Reader::get_root(bytes)?)?) diff --git a/vortex-schema/src/dtype.rs b/vortex-schema/src/dtype.rs index b3773150e8..3ee8914916 100644 --- a/vortex-schema/src/dtype.rs +++ b/vortex-schema/src/dtype.rs @@ -8,6 +8,7 @@ use DType::*; use crate::CompositeID; #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Ord, PartialOrd)] +#[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))] pub enum Nullability { #[default] NonNullable, diff --git a/vortex-schema/src/lib.rs b/vortex-schema/src/lib.rs index 3b3bc329f9..1b2698ea83 100644 --- a/vortex-schema/src/lib.rs +++ b/vortex-schema/src/lib.rs @@ -10,6 +10,7 @@ mod serialize; pub use deserialize::*; #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] +#[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))] pub struct CompositeID(pub &'static str); impl Display for CompositeID {