Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScalarAt compute function #49

Merged
merged 4 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions vortex-alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ linkme = "0.3.22"
itertools = "0.12.1"
codecz = { version = "0.1.0", path = "../codecz" }
log = { version = "0.4.20", features = [] }

[lints]
workspace = true
35 changes: 1 addition & 34 deletions vortex-alp/src/alp.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use codecz::alp;
pub use codecz::alp::ALPExponents;
use vortex::array::{Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::dtype::{DType, FloatWidth, IntWidth, Signedness};
use vortex::dtype::{DType, IntWidth, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::scalar::{NullableScalar, Scalar};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

Expand Down Expand Up @@ -106,37 +104,6 @@ impl Array for ALPArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if let Some(patch) = self
.patches()
.and_then(|p| p.scalar_at(index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

let Some(encoded_val) = self.encoded.scalar_at(index)?.into_nonnull() else {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
};
match self.dtype {
DType::Float(FloatWidth::_32, _) => {
let encoded_val: i32 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f32>(encoded_val, self.exponents)
.unwrap()
.into())
}

DType::Float(FloatWidth::_64, _) => {
let encoded_val: i64 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f64>(encoded_val, self.exponents)
.unwrap()
.into())
}

_ => unreachable!(),
}
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand Down
47 changes: 47 additions & 0 deletions vortex-alp/src/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::ALPArray;
use codecz::alp;
use vortex::array::Array;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::ArrayCompute;
use vortex::dtype::{DType, FloatWidth};
use vortex::error::VortexResult;
use vortex::scalar::{NullableScalar, Scalar};

impl ArrayCompute for ALPArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}
}

impl ScalarAtFn for ALPArray {
fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if let Some(patch) = self
.patches()
.and_then(|p| scalar_at(p, index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

let Some(encoded_val) = scalar_at(self.encoded(), index)?.into_nonnull() else {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
};
match self.dtype() {
DType::Float(FloatWidth::_32, _) => {
let encoded_val: i32 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f32>(encoded_val, self.exponents())
.unwrap()
.into())
}

DType::Float(FloatWidth::_64, _) => {
let encoded_val: i64 = encoded_val.try_into().unwrap();
Ok(alp::decode_single::<f64>(encoded_val, self.exponents())
.unwrap()
.into())
}

_ => unreachable!(),
}
}
}
1 change: 1 addition & 0 deletions vortex-alp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use vortex::array::{EncodingRef, ENCODINGS};

mod alp;
mod compress;
mod compute;
mod downcast;
mod serde;
mod stats;
Expand Down
3 changes: 3 additions & 0 deletions vortex-dict/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ hashbrown = "0.14.3"
linkme = "0.3.22"
log = "0.4.20"
num-traits = "0.2.17"

[lints]
workspace = true
11 changes: 6 additions & 5 deletions vortex-dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex::array::primitive::PrimitiveArray;
use vortex::array::varbin::VarBinArray;
use vortex::array::{Array, ArrayKind, ArrayRef, CloneOptionalArray};
use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use vortex::compute::scalar_at::scalar_at;
use vortex::dtype::DType;
use vortex::match_each_native_ptype;
use vortex::ptype::NativePType;
Expand Down Expand Up @@ -207,8 +208,8 @@ fn bytes_at_primitive<'a, T: NativePType + AsPrimitive<usize>>(
}

fn bytes_at<'a>(offsets: &'a dyn Array, bytes: &'a [u8], idx: usize) -> &'a [u8] {
let start: usize = offsets.scalar_at(idx).unwrap().try_into().unwrap();
let stop: usize = offsets.scalar_at(idx + 1).unwrap().try_into().unwrap();
let start: usize = scalar_at(offsets, idx).unwrap().try_into().unwrap();
let stop: usize = scalar_at(offsets, idx + 1).unwrap().try_into().unwrap();
&bytes[start..stop]
}

Expand Down Expand Up @@ -268,7 +269,7 @@ where
mod test {
use vortex::array::primitive::PrimitiveArray;
use vortex::array::varbin::VarBinArray;
use vortex::array::Array;
use vortex::compute::scalar_at::scalar_at;

use crate::compress::{dict_encode_typed_primitive, dict_encode_varbin};

Expand Down Expand Up @@ -297,8 +298,8 @@ mod test {
assert!(!codes.is_valid(2));
assert!(!codes.is_valid(5));
assert!(!codes.is_valid(7));
assert_eq!(values.scalar_at(0), Ok(1.into()));
assert_eq!(values.scalar_at(2), Ok(3.into()));
assert_eq!(scalar_at(values.as_ref(), 0), Ok(1.into()));
assert_eq!(scalar_at(values.as_ref(), 2), Ok(3.into()));
}

#[test]
Expand Down
18 changes: 18 additions & 0 deletions vortex-dict/src/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::DictArray;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::ArrayCompute;
use vortex::error::VortexResult;
use vortex::scalar::Scalar;

impl ArrayCompute for DictArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}
}

impl ScalarAtFn for DictArray {
fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
let dict_index: usize = scalar_at(self.codes(), index)?.try_into()?;
scalar_at(self.dict(), dict_index)
}
}
11 changes: 1 addition & 10 deletions vortex-dict/src/dict.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use vortex::array::{
check_index_bounds, check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId,
};
use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId};
use vortex::compress::EncodingCompression;
use vortex::dtype::{DType, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::scalar::Scalar;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

Expand Down Expand Up @@ -75,12 +72,6 @@ impl Array for DictArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
check_index_bounds(self, index)?;
let dict_index: usize = self.codes().scalar_at(index)?.try_into()?;
self.dict().scalar_at(dict_index)
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand Down
1 change: 1 addition & 0 deletions vortex-dict/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use compress::*;
pub use dict::*;

mod compress;
mod compute;
mod dict;
mod downcast;
mod serde;
Expand Down
23 changes: 5 additions & 18 deletions vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use vortex::array::{
check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef,
};
use vortex::compress::EncodingCompression;
use vortex::compute::scalar_at::scalar_at;
use vortex::compute::ArrayCompute;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::scalar::{NullableScalar, Scalar};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stat, Stats, StatsCompute, StatsSet};

Expand Down Expand Up @@ -74,7 +75,7 @@ impl BitPackedArray {

pub fn is_valid(&self, index: usize) -> bool {
self.validity()
.map(|v| v.scalar_at(index).and_then(|v| v.try_into()).unwrap())
.map(|v| scalar_at(v, index).and_then(|v| v.try_into()).unwrap())
.unwrap_or(true)
}
}
Expand Down Expand Up @@ -115,22 +116,6 @@ impl Array for BitPackedArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if !self.is_valid(index) {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
}

if let Some(patch) = self
.patches()
.and_then(|p| p.scalar_at(index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

todo!("Decode single element from BitPacked array");
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand All @@ -156,6 +141,8 @@ impl Array for BitPackedArray {
}
}

impl ArrayCompute for BitPackedArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for BitPackedArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down
7 changes: 3 additions & 4 deletions vortex-fastlanes/src/for/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock};

use vortex::array::{Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::compute::ArrayCompute;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
Expand Down Expand Up @@ -78,10 +79,6 @@ impl Array for FoRArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, _index: usize) -> VortexResult<Box<dyn Scalar>> {
todo!()
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand Down Expand Up @@ -110,6 +107,8 @@ impl Array for FoRArray {
}
}

impl ArrayCompute for FoRArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for FoRArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down
46 changes: 6 additions & 40 deletions vortex-ffor/src/ffor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::{
check_validity_buffer, Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId,
EncodingRef,
};
use vortex::compress::EncodingCompression;
use vortex::compute::scalar_at::scalar_at;
use vortex::compute::ArrayCompute;
use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::match_each_integer_ptype;
use vortex::scalar::{NullableScalar, Scalar};
use vortex::scalar::Scalar;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

Expand Down Expand Up @@ -104,7 +104,7 @@ impl FFORArray {

pub fn is_valid(&self, index: usize) -> bool {
self.validity()
.map(|v| v.scalar_at(index).and_then(|v| v.try_into()).unwrap())
.map(|v| scalar_at(v, index).and_then(|v| v.try_into()).unwrap())
.unwrap_or(true)
}
}
Expand Down Expand Up @@ -145,42 +145,6 @@ impl Array for FFORArray {
Stats::new(&self.stats, self)
}

fn scalar_at(&self, index: usize) -> VortexResult<Box<dyn Scalar>> {
if !self.is_valid(index) {
return Ok(NullableScalar::none(self.dtype().clone()).boxed());
}

if let Some(patch) = self
.patches()
.and_then(|p| p.scalar_at(index).ok())
.and_then(|p| p.into_nonnull())
{
return Ok(patch);
}

let Some(parray) = self.encoded().maybe_primitive() else {
return Err(VortexError::InvalidEncoding(
self.encoded().encoding().id().clone(),
));
};

if let Ok(ptype) = self.dtype().try_into() {
match_each_integer_ptype!(ptype, |$T| {
return Ok(codecz::ffor::decode_single::<$T>(
parray.buffer().as_slice(),
self.len,
self.num_bits,
self.min_val().try_into().unwrap(),
index,
)
.unwrap()
.into());
})
} else {
return Err(VortexError::InvalidDType(self.dtype().clone()));
}
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
Expand All @@ -204,6 +168,8 @@ impl Array for FFORArray {
}
}

impl ArrayCompute for FFORArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for FFORArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
Expand Down
3 changes: 3 additions & 0 deletions vortex-ree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ linkme = "0.3.22"
half = "2.3.1"
num-traits = "0.2.17"
itertools = "0.10.5"

[lints]
workspace = true
Loading