diff --git a/vortex-array/src/array/composite/typed.rs b/vortex-array/src/array/composite/typed.rs index 084c5c064a..41a99dff2b 100644 --- a/vortex-array/src/array/composite/typed.rs +++ b/vortex-array/src/array/composite/typed.rs @@ -5,6 +5,7 @@ use crate::array::composite::array::CompositeArray; use crate::array::composite::{CompositeID, CompositeMetadata}; use crate::array::{Array, ArrayRef}; use crate::compute::ArrayCompute; +use crate::dtype::DType; pub trait CompositeExtension: Debug + Send + Sync + 'static { fn id(&self) -> CompositeID; @@ -18,13 +19,16 @@ pub type CompositeExtensionRef = &'static dyn CompositeExtension; pub struct TypedCompositeArray { metadata: M, underlying: ArrayRef, + dtype: DType, } impl TypedCompositeArray { pub fn new(metadata: M, underlying: ArrayRef) -> Self { + let dtype = DType::Composite(metadata.id(), underlying.dtype().is_nullable().into()); Self { metadata, underlying, + dtype, } } @@ -38,6 +42,11 @@ impl TypedCompositeArray { self.underlying.as_ref() } + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + pub fn as_composite(&self) -> CompositeArray { CompositeArray::new( self.metadata().id(), diff --git a/vortex-array/src/datetime/localdatetime.rs b/vortex-array/src/datetime/localdatetime.rs index d297100c90..c11d21a2db 100644 --- a/vortex-array/src/datetime/localdatetime.rs +++ b/vortex-array/src/datetime/localdatetime.rs @@ -29,6 +29,11 @@ impl LocalDateTime { pub fn new(time_unit: TimeUnit) -> Self { Self { time_unit } } + + #[inline] + pub fn time_unit(&self) -> TimeUnit { + self.time_unit + } } impl Display for LocalDateTime { diff --git a/vortex-datetime/Cargo.toml b/vortex-datetime/Cargo.toml index 8559799afc..77cd5f5288 100644 --- a/vortex-datetime/Cargo.toml +++ b/vortex-datetime/Cargo.toml @@ -3,6 +3,9 @@ name = "vortex-datetime" version = "0.1.0" edition = "2021" +[lints] +workspace = true + [dependencies] vortex-array = { "path" = "../vortex-array" } linkme = "0.3.22" diff --git a/vortex-datetime/src/compress.rs b/vortex-datetime/src/compress.rs index b38b8c27f3..879f3980da 100644 --- a/vortex-datetime/src/compress.rs +++ b/vortex-datetime/src/compress.rs @@ -1,11 +1,15 @@ -use crate::{DateTimeArray, DateTimeEncoding}; +use vortex::array::composite::CompositeEncoding; use vortex::array::downcast::DowncastArrayBuiltin; -use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; -use vortex::array::typed::{TypedArray, TypedEncoding}; -use vortex::array::{Array, ArrayRef, Encoding}; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{Array, ArrayRef, CloneOptionalArray}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; -use vortex::dtype::{DType, TimeUnit}; -use vortex::error::{VortexError, VortexResult}; +use vortex::compute::cast::cast; +use vortex::compute::flatten::flatten_primitive; +use vortex::datetime::{LocalDateTime, LocalDateTimeArray, LocalDateTimeExtension, TimeUnit}; +use vortex::error::VortexResult; +use vortex::ptype::PType; + +use crate::{DateTimeArray, DateTimeEncoding}; impl EncodingCompression for DateTimeEncoding { fn can_compress( @@ -13,15 +17,12 @@ impl EncodingCompression for DateTimeEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&dyn EncodingCompression> { - if array.encoding().id() != TypedEncoding.id() { - return None; - } - - if array.as_typed().untyped_array().encoding().id() != PrimitiveEncoding.id() { + if array.encoding().id() != &CompositeEncoding::ID { return None; } - if !matches!(array.dtype(), DType::ZonedDateTime(_, _)) { + let composite = array.as_composite(); + if !matches!(composite.id(), LocalDateTimeExtension::ID) { return None; } @@ -34,49 +35,55 @@ impl EncodingCompression for DateTimeEncoding { like: Option<&dyn Array>, ctx: CompressCtx, ) -> VortexResult { - match array.dtype() { - DType::ZonedDateTime(unit, nullability) => { - let tarray = array.as_any().downcast_ref::().unwrap(); - let parray = tarray - .untyped_array() - .as_any() - .downcast_ref::() - .unwrap(); - // Eh, it's fine for now. - let ts = parray.typed_data::(); + let array = array.as_composite(); + match array.id() { + LocalDateTimeExtension::ID => compress_localdatetime( + array.as_typed::(), + like.map(|l| l.as_any().downcast_ref::().unwrap()), + ctx, + ), + _ => panic!("Unsupported composite ID {}", array.id()), + } + } +} - let ld = like.map(|l| l.as_any().downcast_ref::().unwrap()); +fn compress_localdatetime( + array: LocalDateTimeArray, + like: Option<&DateTimeArray>, + ctx: CompressCtx, +) -> VortexResult { + let underlying = flatten_primitive(cast(array.underlying(), &PType::I64.into())?.as_ref())?; - match unit { - TimeUnit::Us => { - let mut days = Vec::with_capacity(ts.len()); - let mut seconds = Vec::with_capacity(ts.len()); - let mut subsecond = Vec::with_capacity(ts.len()); - for &t in ts.iter() { - days.push(t / 86_400_000_000); - seconds.push((t % 86_400_000_000) / 1_000_000); - subsecond.push((t % 86_400_000_000) % 1_000_000); - } + let divisor = match array.metadata().time_unit() { + TimeUnit::Ns => 1_000_000_000, + TimeUnit::Us => 1_000_000, + TimeUnit::Ms => 1_000, + TimeUnit::S => 1, + }; - Ok(DateTimeArray::new( - ctx.named("days") - .compress(&PrimitiveArray::from(days), ld.map(|l| l.days()))?, - ctx.named("seconds").compress( - &PrimitiveArray::from(seconds).as_ref(), - ld.map(|l| l.seconds()), - )?, - ctx.named("subsecond").compress( - &PrimitiveArray::from(subsecond).as_ref(), - ld.map(|l| l.subsecond()), - )?, - array.dtype().clone(), - ) - .boxed()) - } - _ => todo!("Unit {:?}", unit), - } - } - _ => Err(VortexError::InvalidDType(array.dtype().clone())), - } + let mut days = Vec::with_capacity(underlying.len()); + let mut seconds = Vec::with_capacity(underlying.len()); + let mut subsecond = Vec::with_capacity(underlying.len()); + + for &t in underlying.typed_data::().iter() { + days.push(t / (86_400 * divisor)); + seconds.push((t % (86_400 * divisor)) / divisor); + subsecond.push((t % (86_400 * divisor)) % divisor); } + + Ok(DateTimeArray::new( + ctx.named("days") + .compress(&PrimitiveArray::from(days), like.map(|l| l.days()))?, + ctx.named("seconds").compress( + PrimitiveArray::from(seconds).as_ref(), + like.map(|l| l.seconds()), + )?, + ctx.named("subsecond").compress( + PrimitiveArray::from(subsecond).as_ref(), + like.map(|l| l.subsecond()), + )?, + underlying.validity().clone_optional(), + LocalDateTimeExtension::dtype(underlying.validity().is_some().into()), + ) + .boxed()) } diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs index 027ff07b8f..62e3a9da9f 100644 --- a/vortex-datetime/src/datetime.rs +++ b/vortex-datetime/src/datetime.rs @@ -1,16 +1,13 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId}; +use vortex::array::{Array, ArrayRef, Encoding, EncodingId}; use vortex::compress::EncodingCompression; use vortex::compute::ArrayCompute; -use vortex::dtype::Nullability::NonNullable; -use vortex::dtype::{DType, Nullability}; +use vortex::dtype::DType; use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; -use vortex::ptype::PType; -use vortex::scalar::Scalar; -use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; +use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stats, StatsCompute, StatsSet}; /// An array that decomposes a datetime into days, seconds, and nanoseconds. @@ -19,19 +16,27 @@ pub struct DateTimeArray { days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, + validity: Option, dtype: DType, stats: Arc>, } impl DateTimeArray { - pub fn new(days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, dtype: DType) -> Self { - Self::try_new(days, seconds, subsecond, dtype).unwrap() + pub fn new( + days: ArrayRef, + seconds: ArrayRef, + subsecond: ArrayRef, + validity: Option, + dtype: DType, + ) -> Self { + Self::try_new(days, seconds, subsecond, validity, dtype).unwrap() } pub fn try_new( days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, + validity: Option, dtype: DType, ) -> VortexResult { if !matches!(days.dtype(), DType::Int(_, _, _)) { @@ -48,6 +53,7 @@ impl DateTimeArray { days, seconds, subsecond, + validity, dtype, stats: Arc::new(RwLock::new(StatsSet::new())), }) @@ -91,19 +97,25 @@ impl Array for DateTimeArray { } fn dtype(&self) -> &DType { - &DType::LocalDate(NonNullable) + &self.dtype } fn stats(&self) -> Stats { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { - todo!() + Ok(Self::new( + self.days.slice(start, stop)?, + self.seconds.slice(start, stop)?, + self.subsecond.slice(start, stop)?, + self.validity + .as_ref() + .map(|v| v.slice(start, stop)) + .transpose()?, + self.dtype.clone(), + ) + .boxed()) } fn encoding(&self) -> &'static dyn Encoding { @@ -114,8 +126,8 @@ impl Array for DateTimeArray { self.days().nbytes() + self.seconds().nbytes() + self.subsecond().nbytes() } - fn serde(&self) -> &dyn ArraySerde { - self + fn serde(&self) -> Option<&dyn ArraySerde> { + None } } @@ -123,18 +135,6 @@ impl StatsCompute for DateTimeArray {} impl ArrayCompute for DateTimeArray {} -impl ArraySerde for DateTimeArray { - fn write(&self, ctx: &mut WriteCtx) -> std::io::Result<()> { - todo!() - } -} - -impl EncodingSerde for DateTimeEncoding { - fn read(&self, ctx: &mut ReadCtx) -> std::io::Result { - todo!() - } -} - impl<'arr> AsRef<(dyn Array + 'arr)> for DateTimeArray { fn as_ref(&self) -> &(dyn Array + 'arr) { self @@ -164,6 +164,6 @@ impl Encoding for DateTimeEncoding { } fn serde(&self) -> Option<&dyn EncodingSerde> { - Some(self) + None } }