diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f2943b311..4d45cd115f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ permissions: jobs: build: name: 'build' - runs-on: ubuntu-latest-medium + runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index b56a0ea867..b36429b8b1 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -1,7 +1,7 @@ +use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use itertools::Itertools; -use bench_vortex::serde::{take_taxi_data, write_taxi_data}; use vortex::array::ENCODINGS; fn random_access(c: &mut Criterion) { diff --git a/bench-vortex/src/bin/serde.rs b/bench-vortex/src/bin/serde.rs new file mode 100644 index 0000000000..3667e1504b --- /dev/null +++ b/bench-vortex/src/bin/serde.rs @@ -0,0 +1,10 @@ +use bench_vortex::setup_logger; +use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data}; +use log::LevelFilter; + +pub fn main() { + setup_logger(LevelFilter::Debug); + let taxi_spiral = write_taxi_data(); + let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13]); //, 100_000, 3_000_000]); + println!("TAKE TAXI DATA: {:?}", rows); +} diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index e64ef9ead4..342d8e94b1 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -18,13 +18,11 @@ use vortex::compress::{CompressConfig, CompressCtx}; use vortex::formatter::display_tree; use vortex_alp::ALPEncoding; use vortex_datetime::DateTimeEncoding; -use vortex_dict::DictEncoding; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; use vortex_schema::DType; -pub mod serde; pub mod taxi_data; pub fn idempotent(name: &str, f: impl FnOnce(&mut File)) -> PathBuf { @@ -39,8 +37,7 @@ pub fn idempotent(name: &str, f: impl FnOnce(&mut File)) -> PathBuf { path.to_path_buf() } -#[allow(dead_code)] -fn setup_logger(level: LevelFilter) { +pub fn setup_logger(level: LevelFilter) { TermLogger::init( level, Config::default(), @@ -54,7 +51,7 @@ pub fn enumerate_arrays() -> Vec { println!("FOUND {:?}", ENCODINGS.iter().map(|e| e.id()).collect_vec()); vec![ &ALPEncoding, - &DictEncoding, + //&DictEncoding, &BitPackedEncoding, &FoREncoding, &DateTimeEncoding, diff --git a/bench-vortex/src/serde.rs b/bench-vortex/src/serde.rs deleted file mode 100644 index cd30abeebd..0000000000 --- a/bench-vortex/src/serde.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::fs::File; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use arrow_array::{ArrayRef as ArrowArrayRef, RecordBatchReader, StructArray as ArrowStructArray}; -use itertools::Itertools; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - -use vortex::array::chunked::ChunkedArray; -use vortex::array::primitive::PrimitiveArray; -use vortex::array::ArrayRef; -use vortex::arrow::FromArrowType; -use vortex::compute::take::take; -use vortex::encode::FromArrowArray; -use vortex::ptype::PType; -use vortex::serde::{ReadCtx, WriteCtx}; -use vortex_schema::DType; - -use crate::taxi_data::download_taxi_data; -use crate::{compress_ctx, idempotent}; - -pub fn write_taxi_data() -> PathBuf { - idempotent("taxi.spiral", |write| { - let taxi_pq = File::open(download_taxi_data()).unwrap(); - let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); - - // FIXME(ngates): the compressor should handle batch size. - let reader = builder.with_batch_size(65_536).build().unwrap(); - - let dtype = DType::from_arrow(reader.schema()); - let ctx = compress_ctx(); - - let chunks = reader - .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| { - let struct_arrow: ArrowStructArray = record_batch.into(); - let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); - let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false); - ctx.compress(&vortex_array, None).unwrap() - }) - .collect_vec(); - let chunked = ChunkedArray::new(chunks, dtype.clone()); - - let mut write_ctx = WriteCtx::new(write); - write_ctx.dtype(&dtype).unwrap(); - write_ctx.write(&chunked).unwrap(); - }) -} - -pub fn take_taxi_data(path: &Path, indices: &[u64]) -> ArrayRef { - let chunked = { - let mut file = File::open(path).unwrap(); - let dummy_dtype: DType = PType::U8.into(); - let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file); - let dtype = read_ctx.dtype().unwrap(); - read_ctx.with_schema(&dtype).read().unwrap() - }; - take(&chunked, &PrimitiveArray::from(indices.to_vec())).unwrap() -} - -#[cfg(test)] -mod test { - - use log::LevelFilter; - use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; - - use crate::serde::{take_taxi_data, write_taxi_data}; - - #[allow(dead_code)] - fn setup_logger(level: LevelFilter) { - TermLogger::init( - level, - Config::default(), - TerminalMode::Mixed, - ColorChoice::Auto, - ) - .unwrap(); - } - - #[ignore] - #[test] - fn round_trip_serde() { - let taxi_spiral = write_taxi_data(); - let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13, 100_000, 3_000_000]); - println!("TAKE TAXI DATA: {:?}", rows); - } -} diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 4e5b065d3a..701dc6afd1 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -1,6 +1,20 @@ -use std::path::PathBuf; +use arrow_array::RecordBatchReader; +use itertools::Itertools; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::ProjectionMask; +use std::fs::File; +use std::path::{Path, PathBuf}; +use vortex::array::chunked::ChunkedArray; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{ArrayRef, IntoArray}; +use vortex::arrow::FromArrowType; +use vortex::compute::take::take; +use vortex::formatter::display_tree; +use vortex::ptype::PType; +use vortex::serde::{ReadCtx, WriteCtx}; +use vortex_schema::DType; -use crate::idempotent; +use crate::{compress_ctx, idempotent}; pub fn download_taxi_data() -> PathBuf { idempotent("yellow-tripdata-2023-11.parquet", |file| { @@ -12,3 +26,50 @@ pub fn download_taxi_data() -> PathBuf { .unwrap(); }) } + +pub fn write_taxi_data() -> PathBuf { + idempotent("taxi.spiral", |write| { + let taxi_pq = File::open(download_taxi_data()).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); + let _mask = ProjectionMask::roots(builder.parquet_schema(), (0..14).collect_vec()); + + // FIXME(ngates): the compressor should handle batch size. + let reader = builder + // .with_limit(100) + // .with_projection(_mask) + .with_batch_size(65_536) + .build() + .unwrap(); + + let dtype = DType::from_arrow(reader.schema()); + println!("SCHEMA {:?}\nDTYPE: {:?}", reader.schema(), dtype); + let ctx = compress_ctx(); + + let chunks = reader + .map(|batch_result| batch_result.unwrap()) + .map(|record_batch| { + println!("RBSCHEMA: {:?}", record_batch.schema()); + let vortex_array = record_batch.into_array(); + let compressed = ctx.compress(&vortex_array, None).unwrap(); + println!("COMPRESSED {}", display_tree(&compressed)); + compressed + }) + .collect_vec(); + let chunked = ChunkedArray::new(chunks, dtype.clone()); + + let mut write_ctx = WriteCtx::new(write); + write_ctx.dtype(&dtype).unwrap(); + write_ctx.write(&chunked).unwrap(); + }) +} + +pub fn take_taxi_data(path: &Path, indices: &[u64]) -> ArrayRef { + let chunked = { + let mut file = File::open(path).unwrap(); + let dummy_dtype: DType = PType::U8.into(); + let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file); + let dtype = read_ctx.dtype().unwrap(); + read_ctx.with_schema(&dtype).read().unwrap() + }; + take(&chunked, &PrimitiveArray::from(indices.to_vec())).unwrap() +} diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index ab172931f0..e9ab7fe353 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -24,6 +24,7 @@ use crate::dtype::PyDType; use crate::error::PyVortexError; use crate::vortex_arrow; use std::sync::Arc; +use vortex::compute::take::take; #[pyclass(name = "Array", module = "vortex", sequence, subclass)] pub struct PyArray { @@ -196,6 +197,12 @@ impl PyArray { fn dtype(self_: PyRef) -> PyResult> { PyDType::wrap(self_.py(), self_.inner.dtype().clone()) } + + fn take(&self, indices: PyRef<'_, PyArray>) -> PyResult> { + take(&self.inner, indices.unwrap()) + .map_err(PyVortexError::map_err) + .and_then(|arr| PyArray::wrap(indices.py(), arr)) + } } #[pymethods] diff --git a/pyvortex/test/test_array.py b/pyvortex/test/test_array.py index 5d383c9402..119beb914e 100644 --- a/pyvortex/test/test_array.py +++ b/pyvortex/test/test_array.py @@ -16,6 +16,15 @@ def test_varbin_array_round_trip(): assert arr.to_pyarrow().combine_chunks() == a +def test_varbin_array_take(): + a = vortex.encode(pa.array(["a", "b", "c", "d"])) + # TODO(ngates): ensure we correctly round-trip to a string and not large_string + assert a.take(vortex.encode(pa.array([0, 2]))).to_pyarrow().combine_chunks() == pa.array( + ["a", "c"], + type=pa.large_utf8(), + ) + + def test_empty_array(): a = pa.array([], type=pa.uint8()) primitive = vortex.encode(a) diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index 0f17310660..95a99c6973 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -60,7 +60,7 @@ impl EncodingCompression for ALPEncoding { let (exponents, encoded, patches) = match_each_alp_float_ptype!( parray.ptype(), |$T| { - encode_to_array(parray.typed_data::<$T>(), like_alp.map(|l| l.exponents())) + encode_to_array::<$T>(parray, like_alp.map(|l| l.exponents())) })?; let compressed_encoded = ctx @@ -81,18 +81,20 @@ impl EncodingCompression for ALPEncoding { } fn encode_to_array( - values: &[T], + values: &PrimitiveArray, exponents: Option<&Exponents>, ) -> (Exponents, ArrayRef, Option) where T: ALPFloat + NativePType, T::ALPInt: NativePType, { - let (exponents, values, exc_pos, exc) = T::encode(values, exponents); - let len = values.len(); + let (exponents, encoded, exc_pos, exc) = T::encode(values.typed_data::(), exponents); + let len = encoded.len(); ( exponents, - PrimitiveArray::from(values).into_array(), + PrimitiveArray::from(encoded) + .into_nullable(values.validity().is_some().into()) + .into_array(), (!exc.is_empty()).then(|| { SparseArray::new( PrimitiveArray::from(exc_pos).into_array(), @@ -106,8 +108,8 @@ where pub(crate) fn alp_encode(parray: &PrimitiveArray) -> VortexResult { let (exponents, encoded, patches) = match parray.ptype() { - PType::F32 => encode_to_array(parray.typed_data::(), None), - PType::F64 => encode_to_array(parray.typed_data::(), None), + PType::F32 => encode_to_array::(parray, None), + PType::F64 => encode_to_array::(parray, None), _ => return Err("ALP can only encode f32 and f64".into()), }; Ok(ALPArray::new(encoded, exponents, patches)) diff --git a/vortex-alp/src/serde.rs b/vortex-alp/src/serde.rs index ddb0b095e8..f559ccc174 100644 --- a/vortex-alp/src/serde.rs +++ b/vortex-alp/src/serde.rs @@ -9,10 +9,7 @@ use crate::ALPEncoding; impl ArraySerde for ALPArray { fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> { - ctx.write_option_tag(self.patches().is_some())?; - if let Some(p) = self.patches() { - ctx.write(p)?; - } + ctx.write_optional_array(self.patches())?; ctx.write_fixed_slice([self.exponents().e, self.exponents().f])?; ctx.write(self.encoded()) } @@ -20,12 +17,7 @@ impl ArraySerde for ALPArray { impl EncodingSerde for ALPEncoding { fn read(&self, ctx: &mut ReadCtx) -> VortexResult { - let patches_tag = ctx.read_nbytes::<1>()?[0]; - let patches = if patches_tag == 0x01 { - Some(ctx.read()?) - } else { - None - }; + let patches = ctx.read_optional_array()?; let exponents = ctx.read_nbytes::<2>()?; let encoded_dtype = match ctx.schema() { DType::Float(width, nullability) => match width { diff --git a/vortex-array/src/array/bool/compute.rs b/vortex-array/src/array/bool/compute.rs index fff642379d..5318ec7778 100644 --- a/vortex-array/src/array/bool/compute.rs +++ b/vortex-array/src/array/bool/compute.rs @@ -34,12 +34,11 @@ impl ArrayCompute for BoolArray { } impl AsContiguousFn for BoolArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { - // TODO(ngates): implement a HasValidity trait to avoid this duplicate code. + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { let validity: Option = if self.dtype().is_nullable() { - Some(Validity::from_iter(arrays.iter().map(|a| a.as_bool()).map( - |a| a.validity().unwrap_or_else(|| Validity::valid(a.len())), - ))) + Some(Validity::from_iter(arrays.iter().map(|a| { + a.validity().unwrap_or_else(|| Validity::Valid(a.len())) + }))) } else { None }; diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index bf7a35af31..876a0d9a79 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -47,7 +47,7 @@ impl BoolArray { pub fn null(n: usize) -> Self { BoolArray::new( BooleanBuffer::from(vec![false; n]), - Some(Validity::invalid(n)), + Some(Validity::Invalid(n)), ) } @@ -170,13 +170,8 @@ impl FromIterator> for BoolArray { let mut validity: Vec = Vec::with_capacity(lower); let values: Vec = iter .map(|i| { - if let Some(v) = i { - validity.push(true); - v - } else { - validity.push(false); - false - } + validity.push(i.is_some()); + i.unwrap_or_default() }) .collect::>(); diff --git a/vortex-array/src/array/chunked/compute.rs b/vortex-array/src/array/chunked/compute/mod.rs similarity index 81% rename from vortex-array/src/array/chunked/compute.rs rename to vortex-array/src/array/chunked/compute/mod.rs index 10694863d0..eaf1cfbd8f 100644 --- a/vortex-array/src/array/chunked/compute.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -8,9 +8,12 @@ use crate::array::ArrayRef; use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::flatten::{FlattenFn, FlattenedArray}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; +use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; use crate::scalar::Scalar; +mod take; + impl ArrayCompute for ChunkedArray { fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { Some(self) @@ -23,17 +26,21 @@ impl ArrayCompute for ChunkedArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl AsContiguousFn for ChunkedArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { // Combine all the chunks into one, then call as_contiguous again. let chunks = arrays .iter() .flat_map(|a| a.as_chunked().chunks().iter()) .cloned() .collect_vec(); - as_contiguous(chunks) + as_contiguous(&chunks) } } @@ -45,7 +52,7 @@ impl FlattenFn for ChunkedArray { impl ScalarAtFn for ChunkedArray { fn scalar_at(&self, index: usize) -> VortexResult { - let (chunk_index, chunk_offset) = self.find_physical_location(index); + let (chunk_index, chunk_offset) = self.find_chunk_idx(index); scalar_at(self.chunks[chunk_index].as_ref(), chunk_offset) } } diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs new file mode 100644 index 0000000000..9685098ac8 --- /dev/null +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -0,0 +1,69 @@ +use crate::array::chunked::ChunkedArray; +use crate::array::{Array, ArrayRef, IntoArray}; +use crate::compute::cast::cast; +use crate::compute::flatten::flatten_primitive; +use crate::compute::take::{take, TakeFn}; +use crate::ptype::PType; +use vortex_error::VortexResult; + +impl TakeFn for ChunkedArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + if self.len() == indices.len() { + return Ok(self.to_array()); + } + + let indices = flatten_primitive(cast(indices, PType::U64.into())?.as_ref())?; + + // While the chunk idx remains the same, accumulate a list of chunk indices. + let mut chunks = Vec::new(); + let mut indices_in_chunk = Vec::new(); + let mut prev_chunk_idx = self + .find_chunk_idx(indices.typed_data::()[0] as usize) + .0; + for idx in indices.typed_data::() { + let (chunk_idx, idx_in_chunk) = self.find_chunk_idx(*idx as usize); + + if chunk_idx != prev_chunk_idx { + // Start a new chunk + let indices_in_chunk_array = indices_in_chunk.clone().into_array(); + chunks.push(take( + &self.chunks()[prev_chunk_idx], + &indices_in_chunk_array, + )?); + indices_in_chunk = Vec::new(); + } + + indices_in_chunk.push(idx_in_chunk as u64); + prev_chunk_idx = chunk_idx; + } + + if !indices_in_chunk.is_empty() { + let indices_in_chunk_array = indices_in_chunk.into_array(); + chunks.push(take( + &self.chunks()[prev_chunk_idx], + &indices_in_chunk_array, + )?); + } + + Ok(ChunkedArray::new(chunks, self.dtype().clone()).into_array()) + } +} + +#[cfg(test)] +mod test { + use crate::array::chunked::ChunkedArray; + use crate::array::downcast::DowncastArrayBuiltin; + use crate::array::IntoArray; + use crate::compute::as_contiguous::as_contiguous; + use crate::compute::take::take; + + #[test] + fn test_take() { + let a = vec![1i32, 2, 3].into_array(); + let arr = ChunkedArray::new(vec![a.clone(), a.clone(), a.clone()], a.dtype().clone()); + let indices = vec![0, 0, 6, 4].into_array(); + + let result = as_contiguous(take(&arr, &indices).unwrap().as_chunked().chunks()).unwrap(); + assert_eq!(result.as_primitive().typed_data::(), &[1, 1, 1, 2]); + } +} diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 2115b6a591..8c8a0c2c99 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -22,7 +22,7 @@ mod stats; #[derive(Debug, Clone)] pub struct ChunkedArray { chunks: Vec, - chunk_ends: Vec, + chunk_ends: Vec, dtype: DType, stats: Arc>, } @@ -33,30 +33,21 @@ impl ChunkedArray { } pub fn try_new(chunks: Vec, dtype: DType) -> VortexResult { - chunks - .iter() - .map(|c| c.dtype().as_nullable()) - .all_equal_value() - .map(|_| ()) - .or_else(|mismatched| match mismatched { - None => Ok(()), - Some((fst, snd)) => Err(VortexError::MismatchedTypes(fst, snd)), - })?; - + for chunk in &chunks { + if chunk.dtype() != &dtype { + return Err(VortexError::MismatchedTypes( + dtype.clone(), + chunk.dtype().clone(), + )); + } + } let chunk_ends = chunks .iter() - .scan(0usize, |acc, c| { - *acc += c.len(); + .scan(0u64, |acc, c| { + *acc += c.len() as u64; Some(*acc) }) - .collect::>(); - - let dtype = if chunks.iter().any(|c| c.dtype().is_nullable()) && !dtype.is_nullable() { - dtype.as_nullable() - } else { - dtype - }; - + .collect_vec(); Ok(Self { chunks, chunk_ends, @@ -70,11 +61,16 @@ impl ChunkedArray { &self.chunks } - fn find_physical_location(&self, index: usize) -> (usize, usize) { + #[inline] + pub fn chunk_ends(&self) -> &[u64] { + &self.chunk_ends + } + + pub fn find_chunk_idx(&self, index: usize) -> (usize, usize) { assert!(index <= self.len(), "Index out of bounds of the array"); let index_chunk = self .chunk_ends - .binary_search(&index) + .binary_search(&(index as u64)) // If the result of binary_search is Ok it means we have exact match, since these are chunk ends EXCLUSIVE we have to add one to move to the next one .map(|o| o + 1) .unwrap_or_else(|o| o); @@ -83,7 +79,7 @@ impl ChunkedArray { 0 } else { self.chunk_ends[index_chunk - 1] - }; + } as usize; (index_chunk, index_in_chunk) } } @@ -92,7 +88,7 @@ impl Array for ChunkedArray { impl_array!(); fn len(&self) -> usize { - *self.chunk_ends.last().unwrap_or(&0usize) + self.chunk_ends.last().map(|&i| i as usize).unwrap_or(0) } #[inline] @@ -113,8 +109,8 @@ impl Array for ChunkedArray { fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; - let (offset_chunk, offset_in_first_chunk) = self.find_physical_location(start); - let (length_chunk, length_in_last_chunk) = self.find_physical_location(stop); + let (offset_chunk, offset_in_first_chunk) = self.find_chunk_idx(start); + let (length_chunk, length_in_last_chunk) = self.find_chunk_idx(stop); if length_chunk == offset_chunk { if let Some(chunk) = self.chunks.get(offset_chunk) { @@ -163,7 +159,7 @@ impl ArrayValidity for ChunkedArray { Some(Validity::from_iter(self.chunks.iter().map(|chunk| { chunk .validity() - .unwrap_or_else(|| Validity::valid(chunk.len())) + .unwrap_or_else(|| Validity::Valid(chunk.len())) }))) } } diff --git a/vortex-array/src/array/composite/compute.rs b/vortex-array/src/array/composite/compute.rs index 4fe1ae8195..e4eca7c399 100644 --- a/vortex-array/src/array/composite/compute.rs +++ b/vortex-array/src/array/composite/compute.rs @@ -10,6 +10,7 @@ use crate::compute::as_arrow::AsArrowArray; use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::flatten::{FlattenFn, FlattenedArray}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; +use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; use crate::scalar::Scalar; @@ -29,6 +30,10 @@ impl ArrayCompute for CompositeArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl AsArrowArray for CompositeArray { @@ -50,7 +55,7 @@ impl AsArrowArray for CompositeArray { } impl AsContiguousFn for CompositeArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { let composites = arrays .iter() .map(|array| array.as_composite().underlying()) @@ -59,7 +64,7 @@ impl AsContiguousFn for CompositeArray { Ok(CompositeArray::new( self.id(), self.metadata().clone(), - as_contiguous(composites)?, + as_contiguous(&composites)?, ) .into_array()) } @@ -79,3 +84,14 @@ impl ScalarAtFn for CompositeArray { underlying.cast(self.dtype()) } } + +impl TakeFn for CompositeArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + Ok(CompositeArray::new( + self.id(), + self.metadata().clone(), + take(self.underlying(), indices)?, + ) + .into_array()) + } +} diff --git a/vortex-array/src/array/composite/serde.rs b/vortex-array/src/array/composite/serde.rs index e3a940e861..74e6125df2 100644 --- a/vortex-array/src/array/composite/serde.rs +++ b/vortex-array/src/array/composite/serde.rs @@ -19,7 +19,7 @@ impl ArraySerde for CompositeArray { impl EncodingSerde for CompositeEncoding { fn read(&self, ctx: &mut ReadCtx) -> VortexResult { let DType::Composite(id, _) = *ctx.schema() else { - panic!("Expected composite schema") + panic!("Expected composite schema, found {}", ctx.schema()) }; let metadata = ctx.read_slice()?; let underling_dtype = ctx.dtype()?; diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index 8fece38c64..bfd6dee2d6 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -34,7 +34,7 @@ impl ArrayCompute for ConstantArray { } impl AsContiguousFn for ConstantArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { let chunks = arrays.iter().map(|a| a.as_constant().clone()).collect_vec(); if chunks.iter().map(|c| c.scalar()).all_equal() { Ok(ConstantArray::new( diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 37b9c47c67..0a3d0facb7 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -102,8 +102,8 @@ impl ArrayValidity for ConstantArray { fn validity(&self) -> Option { match self.scalar.dtype().is_nullable() { true => match self.scalar().is_null() { - true => Some(Validity::invalid(self.len())), - false => Some(Validity::valid(self.len())), + true => Some(Validity::Invalid(self.len())), + false => Some(Validity::Valid(self.len())), }, false => None, } diff --git a/vortex-array/src/array/primitive/compute/as_contiguous.rs b/vortex-array/src/array/primitive/compute/as_contiguous.rs index d2c3ddfaef..16fbfe8140 100644 --- a/vortex-array/src/array/primitive/compute/as_contiguous.rs +++ b/vortex-array/src/array/primitive/compute/as_contiguous.rs @@ -12,7 +12,7 @@ use crate::ptype::NativePType; use crate::validity::{ArrayValidity, Validity}; impl AsContiguousFn for PrimitiveArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { if !arrays .iter() .map(|chunk| chunk.as_primitive().ptype()) @@ -26,7 +26,7 @@ impl AsContiguousFn for PrimitiveArray { let validity = if self.dtype().is_nullable() { Some(Validity::from_iter(arrays.iter().map(|v| { - v.validity().unwrap_or_else(|| Validity::valid(v.len())) + v.validity().unwrap_or_else(|| Validity::Valid(v.len())) }))) } else { None diff --git a/vortex-array/src/array/primitive/compute/mod.rs b/vortex-array/src/array/primitive/compute/mod.rs index 17f4936c17..fe66426490 100644 --- a/vortex-array/src/array/primitive/compute/mod.rs +++ b/vortex-array/src/array/primitive/compute/mod.rs @@ -7,6 +7,7 @@ use crate::compute::flatten::FlattenFn; use crate::compute::patch::PatchFn; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::search_sorted::SearchSortedFn; +use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; @@ -17,6 +18,7 @@ mod flatten; mod patch; mod scalar_at; mod search_sorted; +mod take; impl ArrayCompute for PrimitiveArray { fn as_arrow(&self) -> Option<&dyn AsArrowArray> { @@ -50,4 +52,8 @@ impl ArrayCompute for PrimitiveArray { fn search_sorted(&self) -> Option<&dyn SearchSortedFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } diff --git a/vortex-array/src/array/primitive/compute/search_sorted.rs b/vortex-array/src/array/primitive/compute/search_sorted.rs index 4934ea35fd..0355c1c795 100644 --- a/vortex-array/src/array/primitive/compute/search_sorted.rs +++ b/vortex-array/src/array/primitive/compute/search_sorted.rs @@ -1,58 +1,33 @@ use vortex_error::VortexResult; use crate::array::primitive::PrimitiveArray; +use crate::compute::search_sorted::SearchSorted; use crate::compute::search_sorted::{SearchSortedFn, SearchSortedSide}; use crate::match_each_native_ptype; -use crate::ptype::NativePType; use crate::scalar::Scalar; impl SearchSortedFn for PrimitiveArray { fn search_sorted(&self, value: &Scalar, side: SearchSortedSide) -> VortexResult { match_each_native_ptype!(self.ptype(), |$T| { let pvalue: $T = value.try_into()?; - Ok(search_sorted(self.typed_data::<$T>(), pvalue, side)) + Ok(self.typed_data::<$T>().search_sorted(&pvalue, side)) }) } } -fn search_sorted(arr: &[T], target: T, side: SearchSortedSide) -> usize { - match side { - SearchSortedSide::Left => search_sorted_cmp(arr, target, |a, b| a < b), - SearchSortedSide::Right => search_sorted_cmp(arr, target, |a, b| a <= b), - } -} - -fn search_sorted_cmp(arr: &[T], target: T, cmp: Cmp) -> usize -where - Cmp: Fn(T, T) -> bool + 'static, -{ - let mut low = 0; - let mut high = arr.len(); - - while low < high { - let mid = low + (high - low) / 2; - - if cmp(arr[mid], target) { - low = mid + 1; - } else { - high = mid; - } - } - - low -} - #[cfg(test)] mod test { use super::*; + use crate::array::IntoArray; + use crate::compute::search_sorted::search_sorted; #[test] fn test_searchsorted_primitive() { - let values = vec![1u16, 2, 3]; + let values = vec![1u16, 2, 3].into_array(); - assert_eq!(search_sorted(&values, 0, SearchSortedSide::Left), 0); - assert_eq!(search_sorted(&values, 1, SearchSortedSide::Left), 0); - assert_eq!(search_sorted(&values, 1, SearchSortedSide::Right), 1); - assert_eq!(search_sorted(&values, 4, SearchSortedSide::Left), 3); + assert_eq!(search_sorted(&values, 0, SearchSortedSide::Left), Ok(0)); + assert_eq!(search_sorted(&values, 1, SearchSortedSide::Left), Ok(0)); + assert_eq!(search_sorted(&values, 1, SearchSortedSide::Right), Ok(1)); + assert_eq!(search_sorted(&values, 4, SearchSortedSide::Left), Ok(3)); } } diff --git a/vortex-array/src/array/primitive/compute/take.rs b/vortex-array/src/array/primitive/compute/take.rs new file mode 100644 index 0000000000..1d6ce71b4b --- /dev/null +++ b/vortex-array/src/array/primitive/compute/take.rs @@ -0,0 +1,43 @@ +use crate::array::primitive::PrimitiveArray; +use crate::array::{Array, ArrayRef}; +use crate::compute::flatten::flatten_primitive; +use crate::compute::take::TakeFn; +use crate::ptype::NativePType; +use crate::validity::ArrayValidity; +use crate::{match_each_integer_ptype, match_each_native_ptype}; +use num_traits::PrimInt; +use vortex_error::VortexResult; + +impl TakeFn for PrimitiveArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + let validity = self.validity().map(|v| v.take(indices)).transpose()?; + let indices = flatten_primitive(indices)?; + match_each_native_ptype!(self.ptype(), |$T| { + match_each_integer_ptype!(indices.ptype(), |$I| { + Ok(PrimitiveArray::from_nullable( + take_primitive(self.typed_data::<$T>(), indices.typed_data::<$I>()), + validity, + ).into_array()) + }) + }) + } +} + +fn take_primitive(array: &[T], indices: &[I]) -> Vec { + indices + .iter() + .map(|&idx| array[idx.to_usize().unwrap()]) + .collect() +} + +#[cfg(test)] +mod test { + use crate::array::primitive::compute::take::take_primitive; + + #[test] + fn test_take() { + let a = vec![1i32, 2, 3, 4, 5]; + let result = take_primitive(&a, &[0, 0, 4, 2]); + assert_eq!(result, vec![1i32, 1, 5, 3]); + } +} diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 2ff7b8196b..02ee95700e 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -10,7 +10,7 @@ use arrow_buffer::buffer::{Buffer, ScalarBuffer}; use linkme::distributed_slice; use vortex_error::VortexResult; -use vortex_schema::DType; +use vortex_schema::{DType, Nullability}; use crate::accessor::ArrayAccessor; use crate::array::IntoArray; @@ -95,10 +95,30 @@ impl PrimitiveArray { pub fn null(n: usize) -> Self { PrimitiveArray::from_nullable( iter::repeat(T::zero()).take(n).collect::>(), - Some(Validity::invalid(n)), + Some(Validity::Invalid(n)), ) } + pub fn into_nullable(self, nullability: Nullability) -> Self { + let dtype = self.dtype().with_nullability(nullability); + if self.validity().is_some() && nullability == Nullability::NonNullable { + panic!("Cannot convert nullable array to non-nullable array") + } + let len = self.len(); + let validity = if nullability == Nullability::Nullable { + Some(self.validity().unwrap_or_else(|| Validity::Valid(len))) + } else { + None + }; + Self { + buffer: self.buffer, + ptype: self.ptype, + dtype, + validity, + stats: self.stats, + } + } + #[inline] pub fn ptype(&self) -> PType { self.ptype @@ -241,13 +261,8 @@ impl FromIterator> for PrimitiveArray { let mut validity: Vec = Vec::with_capacity(lower); let values: Vec = iter .map(|i| { - if let Some(v) = i { - validity.push(true); - v - } else { - validity.push(false); - T::default() - } + validity.push(i.is_some()); + i.unwrap_or_default() }) .collect::>(); @@ -275,10 +290,11 @@ impl ArrayDisplay for PrimitiveArray { #[cfg(test)] mod test { + use crate::array::primitive::PrimitiveArray; + use crate::array::Array; use crate::compute::scalar_at::scalar_at; - use vortex_schema::{IntWidth, Nullability, Signedness}; - - use super::*; + use crate::ptype::PType; + use vortex_schema::{DType, IntWidth, Nullability, Signedness}; #[test] fn from_arrow() { diff --git a/vortex-array/src/array/sparse/compute.rs b/vortex-array/src/array/sparse/compute.rs index c980acff1c..1047dc5f95 100644 --- a/vortex-array/src/array/sparse/compute.rs +++ b/vortex-array/src/array/sparse/compute.rs @@ -30,17 +30,17 @@ impl ArrayCompute for SparseArray { } impl AsContiguousFn for SparseArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { Ok(SparseArray::new( as_contiguous( - arrays + &arrays .iter() .map(|a| a.as_sparse().indices()) .cloned() .collect_vec(), )?, as_contiguous( - arrays + &arrays .iter() .map(|a| a.as_sparse().values()) .cloned() diff --git a/vortex-array/src/array/struct_/compute.rs b/vortex-array/src/array/struct_/compute.rs index 8ec1142f81..2338289ac7 100644 --- a/vortex-array/src/array/struct_/compute.rs +++ b/vortex-array/src/array/struct_/compute.rs @@ -15,6 +15,7 @@ use crate::compute::as_arrow::{as_arrow, AsArrowArray}; use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::flatten::{flatten, FlattenFn, FlattenedArray}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; +use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; use crate::scalar::{Scalar, StructScalar}; @@ -34,6 +35,10 @@ impl ArrayCompute for StructArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl AsArrowArray for StructArray { @@ -68,7 +73,7 @@ impl AsArrowArray for StructArray { } impl AsContiguousFn for StructArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { let mut fields = vec![Vec::new(); self.fields().len()]; for array in arrays { for f in 0..self.fields().len() { @@ -80,7 +85,7 @@ impl AsContiguousFn for StructArray { self.names().clone(), fields .iter() - .map(|field_arrays| as_contiguous(field_arrays.clone())) + .map(|field_arrays| as_contiguous(field_arrays)) .try_collect()?, ) .into_array()) @@ -111,3 +116,16 @@ impl ScalarAtFn for StructArray { .into()) } } + +impl TakeFn for StructArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + Ok(StructArray::new( + self.names().clone(), + self.fields() + .iter() + .map(|field| take(field, indices)) + .try_collect()?, + ) + .into_array()) + } +} diff --git a/vortex-array/src/array/varbin/builder.rs b/vortex-array/src/array/varbin/builder.rs new file mode 100644 index 0000000000..4c5c2a0358 --- /dev/null +++ b/vortex-array/src/array/varbin/builder.rs @@ -0,0 +1,87 @@ +use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::VarBinArray; +use crate::array::Array; +use crate::ptype::NativePType; +use crate::validity::Validity; +use arrow_buffer::NullBufferBuilder; +use num_traits::PrimInt; +use vortex_schema::DType; + +pub struct VarBinBuilder { + offsets: Vec, + data: Vec, + validity: NullBufferBuilder, +} + +impl VarBinBuilder { + pub fn with_capacity(len: usize) -> Self { + let mut offsets = Vec::with_capacity(len + 1); + offsets.push(O::zero()); + Self { + offsets, + data: Vec::new(), + validity: NullBufferBuilder::new(len), + } + } + + pub fn push(&mut self, value: Option<&[u8]>) { + match value { + Some(v) => { + self.offsets + .push(O::from(self.data.len() + v.len()).unwrap()); + self.data.extend_from_slice(v); + self.validity.append_non_null(); + } + None => { + self.offsets.push(self.offsets[self.offsets.len() - 1]); + self.validity.append_null(); + } + } + } + + pub fn finish(self, dtype: DType) -> VarBinArray { + let offsets = PrimitiveArray::from(self.offsets); + let data = PrimitiveArray::from(self.data); + + // TODO(ngates): create our own ValidityBuilder that doesn't need mut or clone on finish. + let nulls = self.validity.finish_cloned(); + + let validity = if dtype.is_nullable() { + Some( + nulls + .map(Validity::from) + .unwrap_or_else(|| Validity::Valid(offsets.len() - 1)), + ) + } else { + assert!(nulls.is_none(), "dtype and validity mismatch"); + None + }; + + VarBinArray::new(offsets.into_array(), data.into_array(), dtype, validity) + } +} + +#[cfg(test)] +mod test { + use crate::array::varbin::builder::VarBinBuilder; + use crate::array::Array; + use crate::compute::scalar_at::scalar_at; + use crate::scalar::Scalar; + use crate::validity::ArrayValidity; + use vortex_schema::DType; + use vortex_schema::Nullability::Nullable; + + #[test] + fn test_builder() { + let mut builder = VarBinBuilder::::with_capacity(0); + builder.push(Some(b"hello")); + builder.push(None); + builder.push(Some(b"world")); + let array = builder.finish(DType::Utf8(Nullable)); + + assert_eq!(array.len(), 3); + assert_eq!(array.nullability(), Nullable); + assert_eq!(scalar_at(&array, 0).unwrap(), Scalar::from("hello")); + assert!(scalar_at(&array, 1).unwrap().is_null()); + } +} diff --git a/vortex-array/src/array/varbin/compute.rs b/vortex-array/src/array/varbin/compute/mod.rs similarity index 92% rename from vortex-array/src/array/varbin/compute.rs rename to vortex-array/src/array/varbin/compute/mod.rs index 1726abf485..7fde8a1ebc 100644 --- a/vortex-array/src/array/varbin/compute.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -1,3 +1,5 @@ +mod take; + use std::sync::Arc; use arrow_array::{ @@ -18,6 +20,7 @@ use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::cast; use crate::compute::flatten::{flatten, flatten_primitive, FlattenFn, FlattenedArray}; use crate::compute::scalar_at::ScalarAtFn; +use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; use crate::ptype::PType; use crate::scalar::{BinaryScalar, Scalar, Utf8Scalar}; @@ -39,20 +42,23 @@ impl ArrayCompute for VarBinArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl AsContiguousFn for VarBinArray { - fn as_contiguous(&self, arrays: Vec) -> VortexResult { - let bytes = as_contiguous( - arrays - .iter() - .map(|a| a.as_varbin().sliced_bytes()) - .try_collect()?, - )?; + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { + let bytes_chunks: Vec = arrays + .iter() + .map(|a| a.as_varbin().sliced_bytes()) + .try_collect()?; + let bytes = as_contiguous(&bytes_chunks)?; let validity = if self.dtype().is_nullable() { Some(Validity::from_iter(arrays.iter().map(|a| { - a.validity().unwrap_or_else(|| Validity::valid(a.len())) + a.validity().unwrap_or_else(|| Validity::Valid(a.len())) }))) } else { None diff --git a/vortex-array/src/array/varbin/compute/take.rs b/vortex-array/src/array/varbin/compute/take.rs new file mode 100644 index 0000000000..0aa58dc5db --- /dev/null +++ b/vortex-array/src/array/varbin/compute/take.rs @@ -0,0 +1,78 @@ +use crate::array::varbin::builder::VarBinBuilder; +use crate::array::varbin::VarBinArray; +use crate::array::{Array, ArrayRef}; +use crate::compute::flatten::flatten_primitive; +use crate::compute::take::TakeFn; +use crate::match_each_integer_ptype; +use crate::ptype::NativePType; +use crate::validity::{ArrayValidity, Validity}; +use num_traits::PrimInt; +use vortex_error::VortexResult; +use vortex_schema::DType; + +impl TakeFn for VarBinArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + // TODO(ngates): support i64 indices. + assert!( + indices.len() < i32::MAX as usize, + "indices.len() must be less than i32::MAX" + ); + + let offsets = flatten_primitive(self.offsets())?; + let data = flatten_primitive(self.bytes())?; + let indices = flatten_primitive(indices)?; + match_each_integer_ptype!(offsets.ptype(), |$O| { + match_each_integer_ptype!(indices.ptype(), |$I| { + Ok(take( + self.dtype().clone(), + offsets.typed_data::<$O>(), + data.typed_data::(), + indices.typed_data::<$I>(), + self.validity(), + ).into_array()) + }) + }) + } +} + +fn take( + dtype: DType, + offsets: &[O], + data: &[u8], + indices: &[I], + validity: Option, +) -> VarBinArray { + if let Some(v) = validity { + return take_nullable(dtype, offsets, data, indices, v); + } + + let mut builder = VarBinBuilder::::with_capacity(indices.len()); + for &idx in indices { + let idx = idx.to_usize().unwrap(); + let start = offsets[idx].to_usize().unwrap(); + let stop = offsets[idx + 1].to_usize().unwrap(); + builder.push(Some(&data[start..stop])); + } + builder.finish(dtype) +} + +fn take_nullable( + dtype: DType, + offsets: &[O], + data: &[u8], + indices: &[I], + validity: Validity, +) -> VarBinArray { + let mut builder = VarBinBuilder::::with_capacity(indices.len()); + for &idx in indices { + let idx = idx.to_usize().unwrap(); + if validity.is_valid(idx) { + let start = offsets[idx].to_usize().unwrap(); + let stop = offsets[idx + 1].to_usize().unwrap(); + builder.push(Some(&data[start..stop])); + } else { + builder.push(None); + } + } + builder.finish(dtype) +} diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 6ee729b85b..af7c2e0b32 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -23,6 +23,7 @@ use crate::stats::{Stats, StatsSet}; use crate::validity::{ArrayValidity, Validity}; mod accessor; +mod builder; mod compress; mod compute; mod serde; diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index a37dcfcc32..e5a4382cad 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -120,6 +120,7 @@ impl VarBinViewArray { if !matches!(dtype, DType::Binary(_) | DType::Utf8(_)) { return Err(VortexError::InvalidDType(dtype)); } + let dtype = if validity.is_some() && !dtype.is_nullable() { dtype.as_nullable() } else { diff --git a/vortex-array/src/arrow/recordbatch.rs b/vortex-array/src/arrow/recordbatch.rs index 8d253d3ec7..3cc307edf0 100644 --- a/vortex-array/src/arrow/recordbatch.rs +++ b/vortex-array/src/arrow/recordbatch.rs @@ -1,14 +1,8 @@ -use std::sync::Arc; - -use arrow_array::RecordBatch; - -use vortex_schema::DType; - use crate::array::struct_::StructArray; use crate::array::{Array, ArrayRef, IntoArray}; -use crate::arrow::FromArrowType; -use crate::compute::cast::cast; use crate::encode::FromArrowArray; +use arrow_array::RecordBatch; +use std::sync::Arc; impl IntoArray for &RecordBatch { fn into_array(self) -> ArrayRef { @@ -23,12 +17,7 @@ impl IntoArray for &RecordBatch { self.columns() .iter() .zip(self.schema().fields()) - .map(|(array, field)| { - // The dtype of the child arrays infer their nullability from the array itself. - // In case the schema says something different, we cast into the schema's dtype. - let vortex_array = ArrayRef::from_arrow(array.clone(), field.is_nullable()); - cast(&vortex_array, &DType::from_arrow(field.as_ref())).unwrap() - }) + .map(|(array, field)| ArrayRef::from_arrow(array.clone(), field.is_nullable())) .collect(), ) .into_array() diff --git a/vortex-array/src/arrow/wrappers.rs b/vortex-array/src/arrow/wrappers.rs index 78fc1d8613..a3639813de 100644 --- a/vortex-array/src/arrow/wrappers.rs +++ b/vortex-array/src/arrow/wrappers.rs @@ -1,4 +1,4 @@ -use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use vortex_error::VortexResult; @@ -6,16 +6,12 @@ use crate::array::primitive::PrimitiveArray; use crate::ptype::NativePType; use crate::validity::Validity; -pub fn as_scalar_buffer( - array: PrimitiveArray, -) -> ScalarBuffer { +pub fn as_scalar_buffer(array: PrimitiveArray) -> ScalarBuffer { assert_eq!(array.ptype(), T::PTYPE); ScalarBuffer::from(array.buffer().clone()) } -pub fn as_offset_buffer( - array: PrimitiveArray, -) -> OffsetBuffer { +pub fn as_offset_buffer(array: PrimitiveArray) -> OffsetBuffer { OffsetBuffer::new(as_scalar_buffer(array)) } diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index ffd3c09d00..1999b7810c 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -15,6 +15,7 @@ use crate::array::varbin::VarBinEncoding; use crate::array::{Array, ArrayKind, ArrayRef, Encoding, EncodingRef, ENCODINGS}; use crate::compute; use crate::compute::scalar_at::scalar_at; +use crate::formatter::display_tree; use crate::sampling::stratified_slices; use crate::stats::Stat; use crate::validity::Validity; @@ -189,7 +190,14 @@ impl CompressCtx { // Otherwise, attempt to compress the array let compressed = self.compress_array(arr)?; - assert_eq!(compressed.dtype(), arr.dtype()); + if compressed.dtype() != arr.dtype() { + panic!( + "Compression changed dtype: {:?} -> {:?} for {}", + arr.dtype(), + compressed.dtype(), + display_tree(&compressed), + ); + } Ok(compressed) } @@ -311,14 +319,14 @@ pub fn sampled_compression(array: &dyn Array, ctx: &CompressCtx) -> VortexResult // Take a sample of the array, then ask codecs for their best compression estimate. let sample = compute::as_contiguous::as_contiguous( - stratified_slices( + &stratified_slices( array.len(), ctx.options.sample_size, ctx.options.sample_count, ) .into_iter() .map(|(start, stop)| array.slice(start, stop).unwrap()) - .collect(), + .collect::>(), )?; find_best_compression(candidates, &sample, ctx)? diff --git a/vortex-array/src/compute/as_contiguous.rs b/vortex-array/src/compute/as_contiguous.rs index 03f134a86c..d89ccbb894 100644 --- a/vortex-array/src/compute/as_contiguous.rs +++ b/vortex-array/src/compute/as_contiguous.rs @@ -5,10 +5,10 @@ use vortex_error::{VortexError, VortexResult}; use crate::array::ArrayRef; pub trait AsContiguousFn { - fn as_contiguous(&self, arrays: Vec) -> VortexResult; + fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult; } -pub fn as_contiguous(arrays: Vec) -> VortexResult { +pub fn as_contiguous(arrays: &[ArrayRef]) -> VortexResult { if arrays.is_empty() { return Err(VortexError::ComputeError("No arrays to concatenate".into())); } @@ -21,7 +21,7 @@ pub fn as_contiguous(arrays: Vec) -> VortexResult { let first = arrays.first().unwrap(); first .as_contiguous() - .map(|f| f.as_contiguous(arrays.clone())) + .map(|f| f.as_contiguous(arrays)) .unwrap_or_else(|| { Err(VortexError::NotImplemented( "as_contiguous", diff --git a/vortex-array/src/compute/patch.rs b/vortex-array/src/compute/patch.rs index b74e5f4f38..fdfeec8a5c 100644 --- a/vortex-array/src/compute/patch.rs +++ b/vortex-array/src/compute/patch.rs @@ -10,7 +10,11 @@ pub trait PatchFn { pub fn patch(array: &dyn Array, patch: &dyn Array) -> VortexResult { if array.len() != patch.len() { return Err(VortexError::InvalidArgument( - "patch array must have the same length as the original array".into(), + format!( + "patch array {} must have the same length as the original array {}", + patch, array + ) + .into(), )); } diff --git a/vortex-array/src/compute/search_sorted.rs b/vortex-array/src/compute/search_sorted.rs index 7734a6aa07..f4ee59c21f 100644 --- a/vortex-array/src/compute/search_sorted.rs +++ b/vortex-array/src/compute/search_sorted.rs @@ -2,6 +2,7 @@ use vortex_error::{VortexError, VortexResult}; use crate::array::Array; use crate::scalar::Scalar; +use std::cmp::Ordering; pub enum SearchSortedSide { Left, @@ -28,3 +29,32 @@ pub fn search_sorted>( )) }) } + +pub trait SearchSorted { + fn search_sorted(&self, value: &T, side: SearchSortedSide) -> usize; +} + +impl SearchSorted for &[T] { + fn search_sorted(&self, value: &T, side: SearchSortedSide) -> usize { + match side { + SearchSortedSide::Left => self + .binary_search_by(|x| { + if x < value { + Ordering::Less + } else { + Ordering::Greater + } + }) + .unwrap_or_else(|x| x), + SearchSortedSide::Right => self + .binary_search_by(|x| { + if x <= value { + Ordering::Less + } else { + Ordering::Greater + } + }) + .unwrap_or_else(|x| x), + } + } +} diff --git a/vortex-array/src/compute/take.rs b/vortex-array/src/compute/take.rs index 3626a768aa..d7b48763a6 100644 --- a/vortex-array/src/compute/take.rs +++ b/vortex-array/src/compute/take.rs @@ -1,16 +1,28 @@ +use log::info; use vortex_error::{VortexError, VortexResult}; use crate::array::{Array, ArrayRef}; +use crate::compute::flatten::flatten; pub trait TakeFn { fn take(&self, indices: &dyn Array) -> VortexResult; } pub fn take(array: &dyn Array, indices: &dyn Array) -> VortexResult { - array.take().map(|t| t.take(indices)).unwrap_or_else(|| { - Err(VortexError::NotImplemented( - "take", - array.encoding().id().name(), - )) - }) + if let Some(take) = array.take() { + return take.take(indices); + } + + // Otherwise, flatten and try again. + info!("TakeFn not implemented for {}, flattening", array); + flatten(array)? + .into_array() + .take() + .map(|t| t.take(indices)) + .unwrap_or_else(|| { + Err(VortexError::NotImplemented( + "take", + array.encoding().id().name(), + )) + }) } diff --git a/vortex-array/src/encode.rs b/vortex-array/src/encode.rs index f244d891d6..666c0f1eb2 100644 --- a/vortex-array/src/encode.rs +++ b/vortex-array/src/encode.rs @@ -188,12 +188,12 @@ fn nulls(nulls: Option<&NullBuffer>, nullable: bool, len: usize) -> Option { r: &'a mut dyn Read, } +pub trait Serde: Sized { + fn read(ctx: &mut ReadCtx) -> VortexResult; + fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()>; +} + impl<'a> ReadCtx<'a> { pub fn new(schema: &'a DType, r: &'a mut dyn Read) -> Self { let encodings = ENCODINGS.iter().map(|e| e.id()).collect::>(); @@ -147,8 +152,8 @@ impl<'a> ReadCtx<'a> { pub fn read_validity(&mut self) -> VortexResult> { if self.read_option_tag()? { match self.read_nbytes::<1>()? { - [0u8] => Ok(Some(Validity::valid(self.read_usize()?))), - [1u8] => Ok(Some(Validity::invalid(self.read_usize()?))), + [0u8] => Ok(Some(Validity::Valid(self.read_usize()?))), + [1u8] => Ok(Some(Validity::Invalid(self.read_usize()?))), [2u8] => Ok(Some(Validity::array( self.with_schema(&Validity::DTYPE).read()?, ))), diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 9d972a1a03..947f1c4b77 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -4,8 +4,11 @@ use crate::array::{Array, ArrayRef}; use crate::compute::as_contiguous::as_contiguous; use crate::compute::flatten::flatten_bool; use crate::compute::scalar_at::scalar_at; +use crate::compute::take::take; use crate::stats::Stat; -use arrow_buffer::BooleanBuffer; +use arrow_buffer::{BooleanBuffer, NullBuffer}; +use itertools::Itertools; +use vortex_error::VortexResult; use vortex_schema::{DType, Nullability}; #[derive(Debug, Clone)] @@ -25,14 +28,6 @@ impl Validity { Self::Array(array) } - pub fn invalid(len: usize) -> Self { - Self::Invalid(len) - } - - pub fn valid(len: usize) -> Self { - Self::Valid(len) - } - pub fn len(&self) -> usize { match self { Self::Valid(len) | Self::Invalid(len) => *len, @@ -94,15 +89,31 @@ impl Validity { } } + pub fn is_valid(&self, idx: usize) -> bool { + match self { + Validity::Valid(_) => true, + Validity::Invalid(_) => false, + Validity::Array(a) => scalar_at(&a, idx).unwrap().try_into().unwrap(), + } + } + // TODO(ngates): maybe we want to impl Array for Validity? pub fn slice(&self, start: usize, stop: usize) -> Self { match self { - Self::Valid(_) => Self::valid(stop - start), - Self::Invalid(_) => Self::invalid(stop - start), + Self::Valid(_) => Self::Valid(stop - start), + Self::Invalid(_) => Self::Invalid(stop - start), Self::Array(a) => Self::Array(a.slice(start, stop).unwrap()), } } + pub fn take(&self, indices: &dyn Array) -> VortexResult { + match self { + Self::Valid(_) => Ok(Self::Valid(indices.len())), + Self::Invalid(_) => Ok(Self::Invalid(indices.len())), + Self::Array(a) => Ok(Self::Array(take(a, indices)?)), + } + } + pub fn nbytes(&self) -> usize { match self { Self::Valid(_) | Self::Invalid(_) => 4, @@ -111,6 +122,18 @@ impl Validity { } } +impl From for Validity { + fn from(value: NullBuffer) -> Self { + if value.null_count() == 0 { + Self::Valid(value.len()) + } else if value.null_count() == value.len() { + Self::Invalid(value.len()) + } else { + Self::Array(BoolArray::new(value.into_inner(), None).into_array()) + } + } +} + impl From for Validity { fn from(value: BooleanBuffer) -> Self { if value.iter().all(|v| v) { @@ -169,15 +192,11 @@ impl FromIterator for Validity { } // Otherwise, map each to a bool array and concatenate them. - Self::Array( - as_contiguous( - validities - .iter() - .map(|v| v.to_bool_array().into_array()) - .collect(), - ) - .unwrap(), - ) + let arrays = validities + .iter() + .map(|v| v.to_bool_array().into_array()) + .collect_vec(); + Self::Array(as_contiguous(&arrays).unwrap()) } } @@ -193,14 +212,6 @@ pub trait ArrayValidity { } fn is_valid(&self, index: usize) -> bool { - if let Some(v) = self.validity() { - match v { - Validity::Valid(_) => true, - Validity::Invalid(_) => false, - Validity::Array(a) => scalar_at(&a, index).unwrap().try_into().unwrap(), - } - } else { - true - } + self.validity().map(|v| v.is_valid(index)).unwrap_or(true) } } diff --git a/vortex-datetime/src/compute.rs b/vortex-datetime/src/compute.rs new file mode 100644 index 0000000000..7e75a81380 --- /dev/null +++ b/vortex-datetime/src/compute.rs @@ -0,0 +1,25 @@ +use crate::DateTimeArray; +use vortex::array::{Array, ArrayRef}; +use vortex::compute::take::{take, TakeFn}; +use vortex::compute::ArrayCompute; +use vortex::validity::ArrayValidity; +use vortex_error::VortexResult; + +impl ArrayCompute for DateTimeArray { + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} + +impl TakeFn for DateTimeArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + Ok(DateTimeArray::new( + take(self.days(), indices)?, + take(self.seconds(), indices)?, + take(self.subsecond(), indices)?, + self.validity(), + self.dtype().clone(), + ) + .into_array()) + } +} diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs index 04db6fd80a..26483425d4 100644 --- a/vortex-datetime/src/datetime.rs +++ b/vortex-datetime/src/datetime.rs @@ -2,7 +2,6 @@ use std::sync::{Arc, RwLock}; use vortex::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef}; use vortex::compress::EncodingCompression; -use vortex::compute::ArrayCompute; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::impl_array; use vortex::serde::{ArraySerde, EncodingSerde}; @@ -121,8 +120,6 @@ impl Array for DateTimeArray { impl StatsCompute for DateTimeArray {} -impl ArrayCompute for DateTimeArray {} - impl ArrayDisplay for DateTimeArray { fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { f.child("days", self.days())?; diff --git a/vortex-datetime/src/lib.rs b/vortex-datetime/src/lib.rs index 3be33e6ffd..bd4625e172 100644 --- a/vortex-datetime/src/lib.rs +++ b/vortex-datetime/src/lib.rs @@ -4,6 +4,7 @@ pub use datetime::*; use vortex::array::{EncodingRef, ENCODINGS}; mod compress; +mod compute; mod datetime; mod serde; diff --git a/vortex-datetime/src/serde.rs b/vortex-datetime/src/serde.rs index bd6bd3758a..70f237f355 100644 --- a/vortex-datetime/src/serde.rs +++ b/vortex-datetime/src/serde.rs @@ -6,8 +6,11 @@ use vortex_error::VortexResult; impl ArraySerde for DateTimeArray { fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> { + ctx.dtype(self.days().dtype())?; ctx.write(self.days())?; + ctx.dtype(self.seconds().dtype())?; ctx.write(self.seconds())?; + ctx.dtype(self.subsecond().dtype())?; ctx.write(self.subsecond())?; ctx.write_validity(self.validity()) } @@ -15,10 +18,16 @@ impl ArraySerde for DateTimeArray { impl EncodingSerde for DateTimeEncoding { fn read(&self, ctx: &mut ReadCtx) -> VortexResult { + let days_dtype = ctx.dtype()?; + let days = ctx.with_schema(&days_dtype).read()?; + let seconds_dtype = ctx.dtype()?; + let seconds = ctx.with_schema(&seconds_dtype).read()?; + let subseconds_dtype = ctx.dtype()?; + let subsecs = ctx.with_schema(&subseconds_dtype).read()?; Ok(DateTimeArray::new( - ctx.read()?, - ctx.read()?, - ctx.read()?, + days, + seconds, + subsecs, ctx.read_validity()?, ctx.schema().clone(), ) diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 1ca5c35bc0..3e0bfb0fcf 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -157,8 +157,7 @@ fn bitpack_patches( values.push(*v); } } - let len = indices.len(); - SparseArray::new(indices.into_array(), values.into_array(), len).into_array() + SparseArray::new(indices.into_array(), values.into_array(), parray.len()).into_array() }) } diff --git a/vortex-fastlanes/src/bitpacking/compute.rs b/vortex-fastlanes/src/bitpacking/compute.rs index 0680beae98..1a0e44d17c 100644 --- a/vortex-fastlanes/src/bitpacking/compute.rs +++ b/vortex-fastlanes/src/bitpacking/compute.rs @@ -1,6 +1,8 @@ use crate::bitpacking::compress::bitunpack; use crate::BitPackedArray; -use vortex::compute::flatten::{FlattenFn, FlattenedArray}; +use vortex::array::{Array, ArrayRef}; +use vortex::compute::flatten::{flatten, FlattenFn, FlattenedArray}; +use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex_error::VortexResult; @@ -8,6 +10,10 @@ impl ArrayCompute for BitPackedArray { fn flatten(&self) -> Option<&dyn FlattenFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl FlattenFn for BitPackedArray { @@ -15,3 +21,9 @@ impl FlattenFn for BitPackedArray { bitunpack(self).map(FlattenedArray::Primitive) } } + +impl TakeFn for BitPackedArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + take(&flatten(self)?.into_array(), indices) + } +} diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 1c07b7d250..2b7175ea9d 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -59,9 +59,6 @@ impl EncodingCompression for FoREncoding { } }); - // TODO(ngates): remove FoR as a potential encoding from the ctx - // NOTE(ngates): we don't invoke next_level here since we know bit-packing is always - // worth trying. let compressed_child = ctx.named("for").excluding(&FoREncoding).compress( &child, like.map(|l| l.as_any().downcast_ref::().unwrap().encoded()), @@ -97,7 +94,7 @@ fn compress_primitive( .collect_vec() }; - PrimitiveArray::from(values) + PrimitiveArray::from_nullable(values, parray.validity()) } pub fn decompress(array: &FoRArray) -> VortexResult { diff --git a/vortex-ree/src/compress.rs b/vortex-ree/src/compress.rs index 7396cbea53..9dc9d2fd18 100644 --- a/vortex-ree/src/compress.rs +++ b/vortex-ree/src/compress.rs @@ -67,7 +67,7 @@ pub fn ree_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) { match_each_native_ptype!(array.ptype(), |$P| { let (ends, values) = ree_encode_primitive(array.typed_data::<$P>()); - let compressed_values = PrimitiveArray::from(values); + let mut compressed_values = PrimitiveArray::from(values).into_nullable(array.dtype().nullability()); compressed_values.stats().set(Stat::IsConstant, false.into()); compressed_values.stats().set(Stat::RunCount, compressed_values.len().into()); compressed_values.stats().set_many(&array.stats(), vec![ diff --git a/vortex-roaring/src/boolean/mod.rs b/vortex-roaring/src/boolean/mod.rs index 5205e6950d..e9da758929 100644 --- a/vortex-roaring/src/boolean/mod.rs +++ b/vortex-roaring/src/boolean/mod.rs @@ -110,7 +110,7 @@ impl ArrayDisplay for RoaringBoolArray { impl ArrayValidity for RoaringBoolArray { fn validity(&self) -> Option { match self.dtype().is_nullable() { - true => Some(Validity::valid(self.length)), + true => Some(Validity::Valid(self.length)), false => None, } } diff --git a/vortex-roaring/src/integer/mod.rs b/vortex-roaring/src/integer/mod.rs index 632a08f12a..6caa1592fa 100644 --- a/vortex-roaring/src/integer/mod.rs +++ b/vortex-roaring/src/integer/mod.rs @@ -112,7 +112,7 @@ impl ArrayDisplay for RoaringIntArray { impl ArrayValidity for RoaringIntArray { fn validity(&self) -> Option { match self.dtype().is_nullable() { - true => Some(Validity::valid(self.len())), + true => Some(Validity::Valid(self.len())), false => None, } } diff --git a/vortex-schema/src/dtype.rs b/vortex-schema/src/dtype.rs index 43e65a7ccd..ac1a6c64f6 100644 --- a/vortex-schema/src/dtype.rs +++ b/vortex-schema/src/dtype.rs @@ -149,6 +149,10 @@ impl DType { Nullability::NonNullable, ); + pub fn nullability(&self) -> Nullability { + self.is_nullable().into() + } + pub fn is_nullable(&self) -> bool { use Nullability::*; @@ -205,8 +209,8 @@ impl Display for DType { Bool(n) => write!(f, "bool{}", n), Int(w, s, n) => match s { Unknown => write!(f, "int({}){}", w, n), - Unsigned => write!(f, "unsigned_int({})", w), - Signed => write!(f, "signed_int({})", w), + Unsigned => write!(f, "unsigned_int({}){}", w, n), + Signed => write!(f, "signed_int({}){}", w, n), }, Decimal(p, s, n) => write!(f, "decimal({}, {}){}", p, s, n), Float(w, n) => write!(f, "float({}){}", w, n), diff --git a/vortex-zigzag/src/serde.rs b/vortex-zigzag/src/serde.rs index c9c294eb7d..e4d340b4a1 100644 --- a/vortex-zigzag/src/serde.rs +++ b/vortex-zigzag/src/serde.rs @@ -1,6 +1,6 @@ use vortex::array::{Array, ArrayRef}; use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; -use vortex_error::{VortexError, VortexResult}; +use vortex_error::VortexResult; use vortex_schema::{DType, Signedness}; use crate::{ZigZagArray, ZigZagEncoding}; @@ -13,15 +13,18 @@ impl ArraySerde for ZigZagArray { impl EncodingSerde for ZigZagEncoding { fn read(&self, ctx: &mut ReadCtx) -> VortexResult { - let encoded_dtype = match ctx.schema() { - DType::Int(w, Signedness::Signed, n) => DType::Int(*w, Signedness::Unsigned, *n), - _ => return Err(VortexError::InvalidDType(ctx.schema().clone())), - }; - let encoded = ctx.with_schema(&encoded_dtype).read()?; + let encoded = ctx.with_schema(&encoded_dtype(ctx.schema())).read()?; Ok(ZigZagArray::new(encoded).into_array()) } } +fn encoded_dtype(schema: &DType) -> DType { + match schema { + DType::Int(w, Signedness::Signed, n) => DType::Int(*w, Signedness::Unsigned, *n), + _ => panic!("Invalid schema"), + } +} + #[cfg(test)] mod test { use vortex::array::downcast::DowncastArrayBuiltin;