Skip to content

Commit

Permalink
Composite
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Mar 20, 2024
1 parent 6d6ef06 commit 563d31e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 82 deletions.
9 changes: 9 additions & 0 deletions vortex-array/src/array/composite/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::array::composite::array::CompositeArray;
use crate::array::composite::{CompositeID, CompositeMetadata};
use crate::array::{Array, ArrayRef};
use crate::compute::ArrayCompute;
use crate::dtype::DType;

pub trait CompositeExtension: Debug + Send + Sync + 'static {
fn id(&self) -> CompositeID;
Expand All @@ -18,13 +19,16 @@ pub type CompositeExtensionRef = &'static dyn CompositeExtension;
pub struct TypedCompositeArray<M: CompositeMetadata> {
metadata: M,
underlying: ArrayRef,
dtype: DType,
}

impl<M: CompositeMetadata> TypedCompositeArray<M> {
pub fn new(metadata: M, underlying: ArrayRef) -> Self {
let dtype = DType::Composite(metadata.id(), underlying.dtype().is_nullable().into());
Self {
metadata,
underlying,
dtype,
}
}

Expand All @@ -38,6 +42,11 @@ impl<M: CompositeMetadata> TypedCompositeArray<M> {
self.underlying.as_ref()
}

#[inline]
pub fn dtype(&self) -> &DType {
&self.dtype
}

pub fn as_composite(&self) -> CompositeArray {
CompositeArray::new(
self.metadata().id(),
Expand Down
5 changes: 5 additions & 0 deletions vortex-array/src/datetime/localdatetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ impl LocalDateTime {
pub fn new(time_unit: TimeUnit) -> Self {
Self { time_unit }
}

#[inline]
pub fn time_unit(&self) -> TimeUnit {
self.time_unit
}
}

impl Display for LocalDateTime {
Expand Down
3 changes: 3 additions & 0 deletions vortex-datetime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = "vortex-datetime"
version = "0.1.0"
edition = "2021"

[lints]
workspace = true

[dependencies]
vortex-array = { "path" = "../vortex-array" }
linkme = "0.3.22"
Expand Down
113 changes: 60 additions & 53 deletions vortex-datetime/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
use crate::{DateTimeArray, DateTimeEncoding};
use vortex::array::composite::CompositeEncoding;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding};
use vortex::array::typed::{TypedArray, TypedEncoding};
use vortex::array::{Array, ArrayRef, Encoding};
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef, CloneOptionalArray};
use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::dtype::{DType, TimeUnit};
use vortex::error::{VortexError, VortexResult};
use vortex::compute::cast::cast;
use vortex::compute::flatten::flatten_primitive;
use vortex::datetime::{LocalDateTime, LocalDateTimeArray, LocalDateTimeExtension, TimeUnit};
use vortex::error::VortexResult;
use vortex::ptype::PType;

use crate::{DateTimeArray, DateTimeEncoding};

impl EncodingCompression for DateTimeEncoding {
fn can_compress(
&self,
array: &dyn Array,
_config: &CompressConfig,
) -> Option<&dyn EncodingCompression> {
if array.encoding().id() != TypedEncoding.id() {
return None;
}

if array.as_typed().untyped_array().encoding().id() != PrimitiveEncoding.id() {
if array.encoding().id() != &CompositeEncoding::ID {
return None;
}

if !matches!(array.dtype(), DType::ZonedDateTime(_, _)) {
let composite = array.as_composite();
if !matches!(composite.id(), LocalDateTimeExtension::ID) {
return None;
}

Expand All @@ -34,49 +35,55 @@ impl EncodingCompression for DateTimeEncoding {
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
match array.dtype() {
DType::ZonedDateTime(unit, nullability) => {
let tarray = array.as_any().downcast_ref::<TypedArray>().unwrap();
let parray = tarray
.untyped_array()
.as_any()
.downcast_ref::<PrimitiveArray>()
.unwrap();
// Eh, it's fine for now.
let ts = parray.typed_data::<i64>();
let array = array.as_composite();
match array.id() {
LocalDateTimeExtension::ID => compress_localdatetime(
array.as_typed::<LocalDateTime>(),
like.map(|l| l.as_any().downcast_ref::<DateTimeArray>().unwrap()),
ctx,
),
_ => panic!("Unsupported composite ID {}", array.id()),
}
}
}

let ld = like.map(|l| l.as_any().downcast_ref::<DateTimeArray>().unwrap());
fn compress_localdatetime(
array: LocalDateTimeArray,
like: Option<&DateTimeArray>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let underlying = flatten_primitive(cast(array.underlying(), &PType::I64.into())?.as_ref())?;

match unit {
TimeUnit::Us => {
let mut days = Vec::with_capacity(ts.len());
let mut seconds = Vec::with_capacity(ts.len());
let mut subsecond = Vec::with_capacity(ts.len());
for &t in ts.iter() {
days.push(t / 86_400_000_000);
seconds.push((t % 86_400_000_000) / 1_000_000);
subsecond.push((t % 86_400_000_000) % 1_000_000);
}
let divisor = match array.metadata().time_unit() {
TimeUnit::Ns => 1_000_000_000,
TimeUnit::Us => 1_000_000,
TimeUnit::Ms => 1_000,
TimeUnit::S => 1,
};

Ok(DateTimeArray::new(
ctx.named("days")
.compress(&PrimitiveArray::from(days), ld.map(|l| l.days()))?,
ctx.named("seconds").compress(
&PrimitiveArray::from(seconds).as_ref(),
ld.map(|l| l.seconds()),
)?,
ctx.named("subsecond").compress(
&PrimitiveArray::from(subsecond).as_ref(),
ld.map(|l| l.subsecond()),
)?,
array.dtype().clone(),
)
.boxed())
}
_ => todo!("Unit {:?}", unit),
}
}
_ => Err(VortexError::InvalidDType(array.dtype().clone())),
}
let mut days = Vec::with_capacity(underlying.len());
let mut seconds = Vec::with_capacity(underlying.len());
let mut subsecond = Vec::with_capacity(underlying.len());

for &t in underlying.typed_data::<i64>().iter() {
days.push(t / (86_400 * divisor));
seconds.push((t % (86_400 * divisor)) / divisor);
subsecond.push((t % (86_400 * divisor)) % divisor);
}

Ok(DateTimeArray::new(
ctx.named("days")
.compress(&PrimitiveArray::from(days), like.map(|l| l.days()))?,
ctx.named("seconds").compress(
PrimitiveArray::from(seconds).as_ref(),
like.map(|l| l.seconds()),
)?,
ctx.named("subsecond").compress(
PrimitiveArray::from(subsecond).as_ref(),
like.map(|l| l.subsecond()),
)?,
underlying.validity().clone_optional(),
LocalDateTimeExtension::dtype(underlying.validity().is_some().into()),
)
.boxed())
}
58 changes: 29 additions & 29 deletions vortex-datetime/src/datetime.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId};
use vortex::array::{Array, ArrayRef, Encoding, EncodingId};
use vortex::compress::EncodingCompression;
use vortex::compute::ArrayCompute;
use vortex::dtype::Nullability::NonNullable;
use vortex::dtype::{DType, Nullability};
use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::ptype::PType;
use vortex::scalar::Scalar;
use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsCompute, StatsSet};

/// An array that decomposes a datetime into days, seconds, and nanoseconds.
Expand All @@ -19,19 +16,27 @@ pub struct DateTimeArray {
days: ArrayRef,
seconds: ArrayRef,
subsecond: ArrayRef,
validity: Option<ArrayRef>,
dtype: DType,
stats: Arc<RwLock<StatsSet>>,
}

impl DateTimeArray {
pub fn new(days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, dtype: DType) -> Self {
Self::try_new(days, seconds, subsecond, dtype).unwrap()
pub fn new(
days: ArrayRef,
seconds: ArrayRef,
subsecond: ArrayRef,
validity: Option<ArrayRef>,
dtype: DType,
) -> Self {
Self::try_new(days, seconds, subsecond, validity, dtype).unwrap()
}

pub fn try_new(
days: ArrayRef,
seconds: ArrayRef,
subsecond: ArrayRef,
validity: Option<ArrayRef>,
dtype: DType,
) -> VortexResult<Self> {
if !matches!(days.dtype(), DType::Int(_, _, _)) {
Expand All @@ -48,6 +53,7 @@ impl DateTimeArray {
days,
seconds,
subsecond,
validity,
dtype,
stats: Arc::new(RwLock::new(StatsSet::new())),
})
Expand Down Expand Up @@ -91,19 +97,25 @@ impl Array for DateTimeArray {
}

fn dtype(&self) -> &DType {
&DType::LocalDate(NonNullable)
&self.dtype
}

fn stats(&self) -> Stats {
Stats::new(&self.stats, self)
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
todo!()
Ok(Self::new(
self.days.slice(start, stop)?,
self.seconds.slice(start, stop)?,
self.subsecond.slice(start, stop)?,
self.validity
.as_ref()
.map(|v| v.slice(start, stop))
.transpose()?,
self.dtype.clone(),
)
.boxed())
}

fn encoding(&self) -> &'static dyn Encoding {
Expand All @@ -114,27 +126,15 @@ impl Array for DateTimeArray {
self.days().nbytes() + self.seconds().nbytes() + self.subsecond().nbytes()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
None
}
}

impl StatsCompute for DateTimeArray {}

impl ArrayCompute for DateTimeArray {}

impl ArraySerde for DateTimeArray {
fn write(&self, ctx: &mut WriteCtx) -> std::io::Result<()> {
todo!()
}
}

impl EncodingSerde for DateTimeEncoding {
fn read(&self, ctx: &mut ReadCtx) -> std::io::Result<ArrayRef> {
todo!()
}
}

impl<'arr> AsRef<(dyn Array + 'arr)> for DateTimeArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down Expand Up @@ -164,6 +164,6 @@ impl Encoding for DateTimeEncoding {
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
None
}
}

0 comments on commit 563d31e

Please sign in to comment.