Skip to content

Commit

Permalink
Array2: Composite (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Apr 15, 2024
1 parent 9992e94 commit 2ce2e79
Show file tree
Hide file tree
Showing 15 changed files with 491 additions and 9 deletions.
135 changes: 135 additions & 0 deletions vortex-array2/src/array/composite/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::collections::HashMap;

use vortex::scalar::AsBytes;
use vortex_error::VortexResult;
use vortex_schema::{CompositeID, DType};

use crate::array::composite::{find_extension, CompositeExtensionRef, TypedCompositeArray};
use crate::compute::ArrayCompute;
use crate::stats::ArrayStatisticsCompute;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{
impl_encoding, ArrayFlatten, IntoArrayData, TryDeserializeArrayMetadata,
TrySerializeArrayMetadata,
};

pub trait UnderlyingMetadata:
'static + Send + Sync + Debug + TrySerializeArrayMetadata + for<'m> TryDeserializeArrayMetadata<'m>
{
fn id(&self) -> CompositeID;
}

impl_encoding!("vortex.composite", Composite);

#[derive(Debug, Clone)]
pub struct CompositeMetadata {
ext: CompositeExtensionRef,
underlying_dtype: DType,
underlying_metadata: Arc<[u8]>,
}

impl<'a> CompositeArray<'a> {
pub fn new(id: CompositeID, metadata: Arc<[u8]>, underlying: Array<'a>) -> Self {
let dtype = DType::Composite(id, underlying.dtype().is_nullable().into());
let ext = find_extension(id.0).expect("Unrecognized composite extension");
Self::try_from_parts(
dtype,
CompositeMetadata {
ext,
underlying_dtype: underlying.dtype().clone(),
underlying_metadata: metadata,
},
vec![].into(),
vec![underlying.into_array_data()].into(),
HashMap::default(),
)
.unwrap()
}
}

impl CompositeArray<'_> {
#[inline]
pub fn id(&self) -> CompositeID {
self.metadata().ext.id()
}

#[inline]
pub fn extension(&self) -> CompositeExtensionRef {
find_extension(self.id().0).expect("Unrecognized composite extension")
}

pub fn underlying_metadata(&self) -> &Arc<[u8]> {
&self.metadata().underlying_metadata
}

pub fn underlying_dtype(&self) -> &DType {
&self.metadata().underlying_dtype
}

#[inline]
pub fn underlying(&self) -> Array {
self.array()
.child(0, self.underlying_dtype())
.expect("CompositeArray must have an underlying array")
}

pub fn with_compute<R, F>(&self, mut f: F) -> R
where
F: FnMut(&dyn ArrayCompute) -> R,
{
let mut result = None;

self.extension()
.with_compute(self, &mut |c| {
result = Some(f(c));
Ok(())
})
.unwrap();

// Now we unwrap the optional, which we know to be populated by the closure.
result.unwrap()
}

pub fn as_typed<M: UnderlyingMetadata + for<'a> TryDeserializeArrayMetadata<'a>>(
&self,
) -> VortexResult<TypedCompositeArray<M>> {
Ok(TypedCompositeArray::new(
M::try_deserialize_metadata(Some(self.underlying_metadata().as_bytes()))?,
self.underlying().clone(),
))
}
}

impl ArrayTrait for CompositeArray<'_> {
fn len(&self) -> usize {
self.underlying().len()
}
}

impl ArrayFlatten for CompositeArray<'_> {
fn flatten<'a>(self) -> VortexResult<Flattened<'a>>
where
Self: 'a,
{
Ok(Flattened::Composite(self))
}
}

impl ArrayValidity for CompositeArray<'_> {
fn is_valid(&self, index: usize) -> bool {
self.underlying().with_dyn(|a| a.is_valid(index))
}

fn logical_validity(&self) -> LogicalValidity {
self.underlying().with_dyn(|a| a.logical_validity())
}
}

impl AcceptArrayVisitor for CompositeArray<'_> {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("underlying", &self.underlying())
}
}

impl ArrayStatisticsCompute for CompositeArray<'_> {}
95 changes: 95 additions & 0 deletions vortex-array2/src/array/composite/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use arrow_array::ArrayRef as ArrowArrayRef;
use itertools::Itertools;
use vortex::scalar::Scalar;
use vortex_error::{vortex_err, VortexResult};

use crate::array::composite::array::CompositeArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn};
use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::slice::{slice, SliceFn};
use crate::compute::take::{take, TakeFn};
use crate::compute::ArrayCompute;
use crate::{Array, IntoArray, OwnedArray};

impl ArrayCompute for CompositeArray<'_> {
fn as_arrow(&self) -> Option<&dyn AsArrowArray> {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}

fn slice(&self) -> Option<&dyn SliceFn> {
Some(self)
}

fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}
}

impl AsArrowArray for CompositeArray<'_> {
fn as_arrow(&self) -> VortexResult<ArrowArrayRef> {
self.with_compute(|c| {
c.as_arrow().map(|a| a.as_arrow()).unwrap_or_else(|| {
Err(vortex_err!(
NotImplemented: "as_arrow",
format!("composite extension {}", self.id())
))
})
})
}
}

impl AsContiguousFn for CompositeArray<'_> {
fn as_contiguous(&self, arrays: &[Array]) -> VortexResult<OwnedArray> {
let composites = arrays
.iter()
.map(|array| CompositeArray::try_from(array).unwrap())
.collect_vec();
let underlyings = composites.iter().map(|c| c.underlying()).collect_vec();
Ok(CompositeArray::new(
self.id(),
self.underlying_metadata().clone(),
as_contiguous(&underlyings)?,
)
.into_array())
}
}

impl ScalarAtFn for CompositeArray<'_> {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
// TODO(ngates): this seems wrong... I don't think we just cast scalars like this.
// e.g. how do we know what a datetime is in?
let underlying = scalar_at(&self.underlying(), index)?;
underlying.cast(self.dtype())
}
}

impl TakeFn for CompositeArray<'_> {
fn take(&self, indices: &Array) -> VortexResult<OwnedArray> {
Ok(CompositeArray::new(
self.id(),
self.underlying_metadata().clone(),
take(&self.underlying(), indices)?,
)
.into_array())
}
}

impl SliceFn for CompositeArray<'_> {
fn slice(&self, start: usize, stop: usize) -> VortexResult<OwnedArray> {
Ok(CompositeArray::new(
self.id(),
self.underlying_metadata().clone(),
slice(&self.underlying(), start, stop)?,
)
.into_array())
}
}
23 changes: 23 additions & 0 deletions vortex-array2/src/array/composite/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pub use array::*;
use linkme::distributed_slice;
pub use typed::*;
use vortex_schema::CompositeID;

mod array;
mod compute;
mod serde;
mod typed;

#[distributed_slice]
pub static VORTEX_COMPOSITE_EXTENSIONS: [&'static dyn CompositeExtension] = [..];

pub fn find_extension(id: &str) -> Option<&'static dyn CompositeExtension> {
VORTEX_COMPOSITE_EXTENSIONS
.iter()
.find(|ext| ext.id().0 == id)
.copied()
}

pub fn find_extension_id(id: &str) -> Option<CompositeID> {
find_extension(id).map(|e| e.id())
}
18 changes: 18 additions & 0 deletions vortex-array2/src/array/composite/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::sync::Arc;

use vortex_error::VortexResult;

use crate::array::composite::CompositeMetadata;
use crate::{TryDeserializeArrayMetadata, TrySerializeArrayMetadata};

impl TrySerializeArrayMetadata for CompositeMetadata {
fn try_serialize_metadata(&self) -> VortexResult<Arc<[u8]>> {
todo!()
}
}

impl TryDeserializeArrayMetadata<'_> for CompositeMetadata {
fn try_deserialize_metadata(_metadata: Option<&[u8]>) -> VortexResult<Self> {
todo!()
}
}
123 changes: 123 additions & 0 deletions vortex-array2/src/array/composite/typed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::fmt::Debug;

use vortex_error::VortexResult;
use vortex_schema::CompositeID;
use vortex_schema::DType;

use crate::array::composite::array::CompositeArray;
use crate::array::composite::UnderlyingMetadata;
use crate::compute::ArrayCompute;
use crate::Array;

pub trait CompositeExtension: Debug + Send + Sync + 'static {
fn id(&self) -> CompositeID;

fn with_compute<'a>(
&self,
array: &'a CompositeArray<'a>,
f: &mut dyn for<'b> FnMut(&'b (dyn ArrayCompute + 'a)) -> VortexResult<()>,
) -> VortexResult<()>;
}

pub type CompositeExtensionRef = &'static dyn CompositeExtension;

#[derive(Debug, Clone)]
pub struct TypedCompositeArray<'a, M: UnderlyingMetadata> {
metadata: M,
underlying: Array<'a>,
dtype: DType,
}

impl<'a, M: UnderlyingMetadata> TypedCompositeArray<'a, M> {
pub fn new(metadata: M, underlying: Array<'a>) -> Self {
let dtype = DType::Composite(metadata.id(), underlying.dtype().is_nullable().into());
Self {
metadata,
underlying,
dtype,
}
}

#[inline]
pub fn underlying_metadata(&self) -> &M {
&self.metadata
}

#[inline]
pub fn underlying(&self) -> &Array<'a> {
&self.underlying
}

#[inline]
pub fn dtype(&self) -> &DType {
&self.dtype
}

pub fn as_composite(&self) -> VortexResult<CompositeArray<'a>> {
Ok(CompositeArray::new(
self.underlying_metadata().id(),
self.underlying_metadata().try_serialize_metadata()?,
self.underlying().clone(),
))
}
}

#[macro_export]
macro_rules! impl_composite {
($id:expr, $T:ty) => {
use linkme::distributed_slice;
use paste::paste;
use vortex_schema::{CompositeID, DType, Nullability};
use $crate::array::composite::{
CompositeArray, CompositeExtension, TypedCompositeArray, UnderlyingMetadata,
VORTEX_COMPOSITE_EXTENSIONS,
};
use $crate::compute::ArrayCompute;
use $crate::TryDeserializeArrayMetadata;

paste! {
#[derive(Debug)]
pub struct [<$T Extension>];

impl [<$T Extension>] {
pub const ID: CompositeID = CompositeID($id);

pub fn dtype(nullability: Nullability) -> DType {
DType::Composite(Self::ID, nullability)
}
}

impl CompositeExtension for [<$T Extension>] {
fn id(&self) -> CompositeID {
Self::ID
}

fn with_compute<'a>(
&self,
array: &'a CompositeArray<'a>,
f: &mut dyn for<'b> FnMut(&'b (dyn ArrayCompute + 'a)) -> VortexResult<()>,
) -> VortexResult<()> {
if array.id() != Self::ID {
panic!("Incorrect CompositeID");
}
let typed = TypedCompositeArray::new(
$T::try_deserialize_metadata(Some(array.underlying_metadata().as_ref()))?,
array.underlying().clone(),
);
f(&typed)
}
}

impl UnderlyingMetadata for $T {
fn id(&self) -> CompositeID {
[<$T Extension>]::ID
}
}

#[distributed_slice(VORTEX_COMPOSITE_EXTENSIONS)]
static ENCODINGS_COMPOSITE_EXT: &'static dyn CompositeExtension = &[<$T Extension>];
}
};
}

pub use impl_composite;
Loading

0 comments on commit 2ce2e79

Please sign in to comment.