Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Mar 4, 2024
1 parent f655068 commit bc2c5dc
Show file tree
Hide file tree
Showing 49 changed files with 451 additions and 344 deletions.
3 changes: 3 additions & 0 deletions vortex-alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ linkme = "0.3.22"
itertools = "0.12.1"
codecz = { version = "0.1.0", path = "../codecz" }
log = { version = "0.4.20", features = [] }

[lints]
workspace = true
35 changes: 1 addition & 34 deletions vortex-alp/src/alp.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use codecz::alp;
pub use codecz::alp::ALPExponents;
use vortex::array::{Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::dtype::{DType, FloatWidth, IntWidth, Signedness};
use vortex::dtype::{DType, IntWidth, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::scalar::{NullableScalar, Scalar};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

Expand Down Expand Up @@ -106,37 +104,6 @@ impl Array for ALPArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if let Some(patch) = self
.patches()
.and_then(|p| p.scalar_at(index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

let Some(encoded_val) = self.encoded.scalar_at(index)?.into_nonnull() else {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
};
match self.dtype {
DType::Float(FloatWidth::_32, _) => {
let encoded_val: i32 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f32>(encoded_val, self.exponents)
.unwrap()
.into())
}

DType::Float(FloatWidth::_64, _) => {
let encoded_val: i64 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f64>(encoded_val, self.exponents)
.unwrap()
.into())
}

_ => unreachable!(),
}
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand Down
47 changes: 47 additions & 0 deletions vortex-alp/src/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::ALPArray;
use codecz::alp;
use vortex::array::Array;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::ArrayCompute;
use vortex::dtype::{DType, FloatWidth};
use vortex::error::VortexResult;
use vortex::scalar::{NullableScalar, Scalar};

impl ArrayCompute for ALPArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}
}

impl ScalarAtFn for ALPArray {
fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if let Some(patch) = self
.patches()
.and_then(|p| scalar_at(p, index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

let Some(encoded_val) = scalar_at(self.encoded(), index)?.into_nonnull() else {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
};
match self.dtype() {
DType::Float(FloatWidth::_32, _) => {
let encoded_val: i32 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f32>(encoded_val, self.exponents())
.unwrap()
.into())
}

DType::Float(FloatWidth::_64, _) => {
let encoded_val: i64 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f64>(encoded_val, self.exponents())
.unwrap()
.into())
}

_ => unreachable!(),
}
}
}
1 change: 1 addition & 0 deletions vortex-alp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use vortex::array::{EncodingRef, ENCODINGS};

mod alp;
mod compress;
mod compute;
mod downcast;
mod serde;
mod stats;
Expand Down
3 changes: 3 additions & 0 deletions vortex-dict/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ hashbrown = "0.14.3"
linkme = "0.3.22"
log = "0.4.20"
num-traits = "0.2.17"

[lints]
workspace = true
5 changes: 3 additions & 2 deletions vortex-dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex::array::primitive::PrimitiveArray;
use vortex::array::varbin::VarBinArray;
use vortex::array::{Array, ArrayKind, ArrayRef, CloneOptionalArray};
use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use vortex::compute::scalar_at::scalar_at;
use vortex::dtype::DType;
use vortex::match_each_native_ptype;
use vortex::ptype::NativePType;
Expand Down Expand Up @@ -207,8 +208,8 @@ fn bytes_at_primitive<'a, T: NativePType + AsPrimitive<usize>>(
}

fn bytes_at<'a>(offsets: &'a dyn Array, bytes: &'a [u8], idx: usize) -> &'a [u8] {
let start: usize = offsets.scalar_at(idx).unwrap().try_into().unwrap();
let stop: usize = offsets.scalar_at(idx + 1).unwrap().try_into().unwrap();
let start: usize = scalar_at(offsets, idx).unwrap().try_into().unwrap();
let stop: usize = scalar_at(offsets, idx + 1).unwrap().try_into().unwrap();
&bytes[start..stop]
}

Expand Down
18 changes: 18 additions & 0 deletions vortex-dict/src/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::DictArray;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::ArrayCompute;
use vortex::error::VortexResult;
use vortex::scalar::Scalar;

impl ArrayCompute for DictArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}
}

impl ScalarAtFn for DictArray {
fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
let dict_index: usize = scalar_at(self.codes(), index)?.try_into()?;
scalar_at(self.dict(), dict_index)
}
}
11 changes: 1 addition & 10 deletions vortex-dict/src/dict.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use vortex::array::{
check_index_bounds, check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId,
};
use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId};
use vortex::compress::EncodingCompression;
use vortex::dtype::{DType, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::scalar::Scalar;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

Expand Down Expand Up @@ -75,12 +72,6 @@ impl Array for DictArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
check_index_bounds(self, index)?;
let dict_index: usize = self.codes().scalar_at(index)?.try_into()?;
self.dict().scalar_at(dict_index)
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand Down
1 change: 1 addition & 0 deletions vortex-dict/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use compress::*;
pub use dict::*;

mod compress;
mod compute;
mod dict;
mod downcast;
mod serde;
Expand Down
23 changes: 5 additions & 18 deletions vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use vortex::array::{
check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef,
};
use vortex::compress::EncodingCompression;
use vortex::compute::scalar_at::scalar_at;
use vortex::compute::ArrayCompute;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::scalar::{NullableScalar, Scalar};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stat, Stats, StatsCompute, StatsSet};

Expand Down Expand Up @@ -74,7 +75,7 @@ impl BitPackedArray {

pub fn is_valid(&self, index: usize) -> bool {
self.validity()
.map(|v| v.scalar_at(index).and_then(|v| v.try_into()).unwrap())
.map(|v| scalar_at(v, index).and_then(|v| v.try_into()).unwrap())
.unwrap_or(true)
}
}
Expand Down Expand Up @@ -115,22 +116,6 @@ impl Array for BitPackedArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if !self.is_valid(index) {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
}

if let Some(patch) = self
.patches()
.and_then(|p| p.scalar_at(index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

todo!("Decode single element from BitPacked array");
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand All @@ -156,6 +141,8 @@ impl Array for BitPackedArray {
}
}

impl ArrayCompute for BitPackedArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for BitPackedArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down
7 changes: 3 additions & 4 deletions vortex-fastlanes/src/for/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock};

use vortex::array::{Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::compute::ArrayCompute;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
Expand Down Expand Up @@ -78,10 +79,6 @@ impl Array for FoRArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, _index: usize) -> VortexResult<Box<dyn Scalar>> {
todo!()
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand Down Expand Up @@ -110,6 +107,8 @@ impl Array for FoRArray {
}
}

impl ArrayCompute for FoRArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for FoRArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down
46 changes: 6 additions & 40 deletions vortex-ffor/src/ffor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::{
check_validity_buffer, Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId,
EncodingRef,
};
use vortex::compress::EncodingCompression;
use vortex::compute::scalar_at::scalar_at;
use vortex::compute::ArrayCompute;
use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::match_each_integer_ptype;
use vortex::scalar::{NullableScalar, Scalar};
use vortex::scalar::Scalar;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

Expand Down Expand Up @@ -104,7 +104,7 @@ impl FFORArray {

pub fn is_valid(&self, index: usize) -> bool {
self.validity()
.map(|v| v.scalar_at(index).and_then(|v| v.try_into()).unwrap())
.map(|v| scalar_at(v, index).and_then(|v| v.try_into()).unwrap())
.unwrap_or(true)
}
}
Expand Down Expand Up @@ -145,42 +145,6 @@ impl Array for FFORArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if !self.is_valid(index) {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
}

if let Some(patch) = self
.patches()
.and_then(|p| p.scalar_at(index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

let Some(parray) = self.encoded().maybe_primitive() else {
return Err(VortexError::InvalidEncoding(
self.encoded().encoding().id().clone(),
));
};

if let Ok(ptype) = self.dtype().try_into() {
match_each_integer_ptype!(ptype, |$T| {
return Ok(codecz::ffor::decode_single::<$T>(
parray.buffer().as_slice(),
self.len,
self.num_bits,
self.min_val().try_into().unwrap(),
index,
)
.unwrap()
.into());
})
} else {
return Err(VortexError::InvalidDType(self.dtype().clone()));
}
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand All @@ -204,6 +168,8 @@ impl Array for FFORArray {
}
}

impl ArrayCompute for FFORArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for FFORArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down
3 changes: 3 additions & 0 deletions vortex-ree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ linkme = "0.3.22"
half = "2.3.1"
num-traits = "0.2.17"
itertools = "0.10.5"

[lints]
workspace = true
17 changes: 17 additions & 0 deletions vortex-ree/src/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::REEArray;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::ArrayCompute;
use vortex::error::VortexResult;
use vortex::scalar::Scalar;

impl ArrayCompute for REEArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}
}

impl ScalarAtFn for REEArray {
fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
scalar_at(self.values(), self.find_physical_index(index)?)
}
}
1 change: 1 addition & 0 deletions vortex-ree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use vortex::array::{EncodingRef, ENCODINGS};
pub use ree::*;

mod compress;
mod compute;
mod downcast;
mod ree;
mod serde;
Expand Down
Loading

0 comments on commit bc2c5dc

Please sign in to comment.