From 37f7ffef517f540095489db63804879a8e8ce767 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 18:41:03 +0000 Subject: [PATCH 01/12] Arrow Random Access --- Cargo.lock | 1 + Cargo.toml | 1 + bench-vortex/Cargo.toml | 1 + bench-vortex/benches/random_access.rs | 26 +++++---- bench-vortex/src/taxi_data.rs | 82 ++++++++++++++++++++++++--- 5 files changed, 92 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c86fdf41ab..33a9ddb977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -366,6 +366,7 @@ name = "bench-vortex" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-select", "criterion", "itertools 0.12.1", "log", diff --git a/Cargo.toml b/Cargo.toml index d6dfd0681c..0e2a91c890 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ arrayref = "0.3.7" arrow = { version = "51.0.0", features = ["pyarrow"] } arrow-array = "51.0.0" arrow-buffer = "51.0.0" +arrow-select = "51.0.0" arrow-schema = "51.0.0" bindgen = "0.69.4" criterion = { version = "0.5.1", features = ["html_reports"] } diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index af72a16fe7..5f64e0316a 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -16,6 +16,7 @@ workspace = true [dependencies] arrow-array = { workspace = true } +arrow-select = { workspace = true } vortex-array = { path = "../vortex-array" } vortex-datetime = { path = "../vortex-datetime" } vortex-alp = { path = "../vortex-alp" } diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index b36429b8b1..fcb7bb7ce8 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -1,18 +1,22 @@ -use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data}; +use bench_vortex::taxi_data::{ + download_taxi_data, take_taxi_data, take_taxi_data_arrow, write_taxi_data, +}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use itertools::Itertools; - -use vortex::array::ENCODINGS; fn random_access(c: &mut Criterion) { - let taxi_spiral = write_taxi_data(); + let mut group = c.benchmark_group("random access"); + group.sample_size(10); + let indices = [10, 11, 12, 13, 100_000, 3_000_000]; - println!( - "ENCODINGS {:?}", - ENCODINGS.iter().map(|e| e.id()).collect_vec() - ); - c.bench_function("random access", |b| { - b.iter(|| black_box(take_taxi_data(&taxi_spiral, &indices))) + + let taxi_vortex = write_taxi_data(); + group.bench_function("vortex", |b| { + b.iter(|| black_box(take_taxi_data(&taxi_vortex, &indices))) + }); + + let taxi_parquet = download_taxi_data(); + group.bench_function("arrow", |b| { + b.iter(|| black_box(take_taxi_data_arrow(&taxi_parquet, &indices))) }); } diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 701dc6afd1..69378a6bcc 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -1,15 +1,22 @@ -use arrow_array::RecordBatchReader; +use arrow_array::types::Int64Type; +use arrow_array::{ + ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch, + RecordBatchReader, +}; +use arrow_select::concat::concat_batches; +use arrow_select::take::take_record_batch; use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ProjectionMask; +use std::collections::HashMap; use std::fs::File; use std::path::{Path, PathBuf}; +use std::sync::Arc; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::array::{ArrayRef, IntoArray}; use vortex::arrow::FromArrowType; use vortex::compute::take::take; -use vortex::formatter::display_tree; use vortex::ptype::PType; use vortex::serde::{ReadCtx, WriteCtx}; use vortex_schema::DType; @@ -42,17 +49,14 @@ pub fn write_taxi_data() -> PathBuf { .unwrap(); let dtype = DType::from_arrow(reader.schema()); - println!("SCHEMA {:?}\nDTYPE: {:?}", reader.schema(), dtype); - let ctx = compress_ctx(); + let _ctx = compress_ctx(); let chunks = reader .map(|batch_result| batch_result.unwrap()) .map(|record_batch| { - println!("RBSCHEMA: {:?}", record_batch.schema()); let vortex_array = record_batch.into_array(); - let compressed = ctx.compress(&vortex_array, None).unwrap(); - println!("COMPRESSED {}", display_tree(&compressed)); - compressed + // ctx.compress(&vortex_array, None).unwrap() + vortex_array }) .collect_vec(); let chunked = ChunkedArray::new(chunks, dtype.clone()); @@ -73,3 +77,65 @@ pub fn take_taxi_data(path: &Path, indices: &[u64]) -> ArrayRef { }; take(&chunked, &PrimitiveArray::from(indices.to_vec())).unwrap() } + +pub fn take_taxi_data_arrow(path: &Path, indices: &[u64]) -> RecordBatch { + let file = File::open(path).unwrap(); + + // TODO(ngates): enable read_page_index + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + + // We figure out which row groups we need to read and a selection filter for each of them. + let mut row_groups = HashMap::new(); + let mut row_group_offsets = vec![0]; + row_group_offsets.extend( + builder + .metadata() + .row_groups() + .iter() + .map(|rg| rg.num_rows()) + .scan(0i64, |acc, x| { + *acc += x; + Some(*acc) + }), + ); + + for idx in indices { + let row_group_idx = row_group_offsets + .binary_search(&(*idx as i64)) + .unwrap_or_else(|e| e - 1); + if !row_groups.contains_key(&row_group_idx) { + row_groups.insert(row_group_idx, Vec::new()); + } + row_groups + .get_mut(&row_group_idx) + .unwrap() + .push((*idx as i64) - row_group_offsets[row_group_idx]); + } + let row_group_indices = row_groups + .keys() + .sorted() + .map(|i| row_groups.get(i).unwrap().clone()) + .collect_vec(); + + let reader = builder + .with_row_groups(row_groups.keys().copied().collect_vec()) + // FIXME(ngates): our indices code assumes the batch size == the row group sizes + .with_batch_size(10_000_000) + .build() + .unwrap(); + + let schema = reader.schema(); + + let batches = reader + .into_iter() + .enumerate() + .map(|(idx, batch)| { + let batch = batch.unwrap(); + let indices = ArrowPrimitiveArray::::from(row_group_indices[idx].clone()); + let indices_array: ArrowArrayRef = Arc::new(indices); + take_record_batch(&batch, &indices_array).unwrap() + }) + .collect_vec(); + + concat_batches(&schema, &batches).unwrap() +} From 2a991d7df544d48a823976ed7fde4324b8bb7dc5 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 20:02:40 +0000 Subject: [PATCH 02/12] nullable scalar --- vortex-array/src/array/bool/compute.rs | 11 ++-- vortex-array/src/array/constant/compute.rs | 2 +- vortex-array/src/array/constant/stats.rs | 2 +- vortex-array/src/array/varbin/compute/mod.rs | 5 +- vortex-array/src/scalar/binary.rs | 36 +++++-------- vortex-array/src/scalar/bool.rs | 32 +++-------- vortex-array/src/scalar/mod.rs | 13 +++-- vortex-array/src/scalar/serde.rs | 28 +++++----- vortex-array/src/scalar/utf8.rs | 27 ++++------ vortex-array/src/scalar/value.rs | 57 ++++++++++++++++++++ vortex-array/src/serde/mod.rs | 17 ++++++ vortex-schema/src/dtype.rs | 7 +++ 12 files changed, 147 insertions(+), 90 deletions(-) create mode 100644 vortex-array/src/scalar/value.rs diff --git a/vortex-array/src/array/bool/compute.rs b/vortex-array/src/array/bool/compute.rs index 5318ec7778..893fbb6845 100644 --- a/vortex-array/src/array/bool/compute.rs +++ b/vortex-array/src/array/bool/compute.rs @@ -64,11 +64,12 @@ impl FlattenFn for BoolArray { impl ScalarAtFn for BoolArray { fn scalar_at(&self, index: usize) -> VortexResult { - if self.is_valid(index) { - Ok(self.buffer.value(index).into()) - } else { - Ok(BoolScalar::new(None).into()) - } + Ok(BoolScalar::new( + self.is_valid(index).then(|| self.buffer.value(index)), + self.nullability(), + ) + .unwrap() + .into()) } } diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index bfd6dee2d6..623d55f023 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -53,7 +53,7 @@ impl FlattenFn for ConstantArray { fn flatten(&self) -> VortexResult { Ok(match self.scalar() { Scalar::Bool(b) => { - if let Some(bv) = b.value() { + if let Some(&bv) = b.value() { FlattenedArray::Bool(BoolArray::from(vec![bv; self.len()])) } else { FlattenedArray::Bool(BoolArray::null(self.len())) diff --git a/vortex-array/src/array/constant/stats.rs b/vortex-array/src/array/constant/stats.rs index cad86eeeb9..f5a31f9b0f 100644 --- a/vortex-array/src/array/constant/stats.rs +++ b/vortex-array/src/array/constant/stats.rs @@ -15,7 +15,7 @@ impl StatsCompute for ConstantArray { return Ok(StatsSet::from( [( Stat::TrueCount, - (self.len() as u64 * b.value().map(|v| v as u64).unwrap_or(0)).into(), + (self.len() as u64 * b.value().cloned().map(|v| v as u64).unwrap_or(0)).into(), )] .into(), )); diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index 7fde8a1ebc..34b0f61059 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -160,10 +160,11 @@ impl ScalarAtFn for VarBinArray { bytes.into() } }) + // FIXME(ngates): there's something weird about this. } else if matches!(self.dtype, DType::Utf8(_)) { - Ok(Utf8Scalar::new(None).into()) + Ok(Utf8Scalar::none().into()) } else { - Ok(BinaryScalar::new(None).into()) + Ok(BinaryScalar::none().into()) } } } diff --git a/vortex-array/src/scalar/binary.rs b/vortex-array/src/scalar/binary.rs index fe899c0ce1..70f078698c 100644 --- a/vortex-array/src/scalar/binary.rs +++ b/vortex-array/src/scalar/binary.rs @@ -1,35 +1,21 @@ use std::fmt::{Display, Formatter}; use vortex_error::{VortexError, VortexResult}; -use vortex_schema::{DType, Nullability}; +use vortex_schema::DType; +use vortex_schema::Nullability::{NonNullable, Nullable}; +use crate::scalar::value::ScalarValue; use crate::scalar::Scalar; -#[derive(Debug, Clone, PartialEq, PartialOrd)] -pub struct BinaryScalar { - value: Option>, -} +pub type BinaryScalar = ScalarValue>; impl BinaryScalar { - pub fn new(value: Option>) -> Self { - Self { value } - } - - pub fn none() -> Self { - Self { value: None } - } - - pub fn some(value: Vec) -> Self { - Self { value: Some(value) } - } - - pub fn value(&self) -> Option<&[u8]> { - self.value.as_deref() - } - #[inline] pub fn dtype(&self) -> &DType { - &DType::Binary(Nullability::NonNullable) + match self.nullability() { + NonNullable => &DType::Binary(NonNullable), + Nullable => &DType::Binary(Nullable), + } } pub fn cast(&self, _dtype: &DType) -> VortexResult { @@ -43,7 +29,7 @@ impl BinaryScalar { impl From> for Scalar { fn from(value: Vec) -> Self { - BinaryScalar::new(Some(value)).into() + BinaryScalar::some(value).into() } } @@ -55,7 +41,9 @@ impl TryFrom for Vec { return Err(VortexError::InvalidDType(value.dtype().clone())); }; let dtype = b.dtype().clone(); - b.value.ok_or_else(|| VortexError::InvalidDType(dtype)) + b.value() + .cloned() + .ok_or_else(|| VortexError::InvalidDType(dtype)) } } diff --git a/vortex-array/src/scalar/bool.rs b/vortex-array/src/scalar/bool.rs index f0cc1fd0d6..5d951d816c 100644 --- a/vortex-array/src/scalar/bool.rs +++ b/vortex-array/src/scalar/bool.rs @@ -3,33 +3,18 @@ use std::fmt::{Display, Formatter}; use vortex_error::{VortexError, VortexResult}; use vortex_schema::{DType, Nullability}; +use crate::scalar::value::ScalarValue; use crate::scalar::Scalar; -#[derive(Debug, Clone, PartialEq, PartialOrd)] -pub struct BoolScalar { - value: Option, -} +pub type BoolScalar = ScalarValue; impl BoolScalar { - pub fn new(value: Option) -> Self { - Self { value } - } - - pub fn none() -> Self { - Self { value: None } - } - - pub fn some(value: bool) -> Self { - Self { value: Some(value) } - } - - pub fn value(&self) -> Option { - self.value - } - #[inline] pub fn dtype(&self) -> &DType { - &DType::Bool(Nullability::NonNullable) + match self.nullability() { + Nullability::NonNullable => &DType::Bool(Nullability::NonNullable), + Nullability::Nullable => &DType::Bool(Nullability::Nullable), + } } pub fn cast(&self, dtype: &DType) -> VortexResult { @@ -47,7 +32,7 @@ impl BoolScalar { impl From for Scalar { #[inline] fn from(value: bool) -> Self { - BoolScalar::new(Some(value)).into() + BoolScalar::some(value).into() } } @@ -58,8 +43,7 @@ impl TryFrom for bool { let Scalar::Bool(b) = value else { return Err(VortexError::InvalidDType(value.dtype().clone())); }; - - b.value() + b.into_value() .ok_or_else(|| VortexError::InvalidDType(b.dtype().clone())) } } diff --git a/vortex-array/src/scalar/mod.rs b/vortex-array/src/scalar/mod.rs index 6aef823602..3d17b13bf8 100644 --- a/vortex-array/src/scalar/mod.rs +++ b/vortex-array/src/scalar/mod.rs @@ -10,7 +10,7 @@ pub use serde::*; pub use struct_::*; pub use utf8::*; use vortex_error::VortexResult; -use vortex_schema::{DType, FloatWidth, IntWidth, Signedness}; +use vortex_schema::{DType, FloatWidth, IntWidth, Nullability, Signedness}; use crate::ptype::{NativePType, PType}; @@ -23,6 +23,7 @@ mod primitive; mod serde; mod struct_; mod utf8; +mod value; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub enum Scalar { @@ -84,6 +85,10 @@ impl Scalar { match_each_scalar! { self, |$s| $s.nbytes() } } + pub fn nullability(&self) -> Nullability { + self.dtype().nullability() + } + pub fn is_null(&self) -> bool { match self { Scalar::Binary(b) => b.value().is_none(), @@ -101,7 +106,7 @@ impl Scalar { pub fn null(dtype: &DType) -> Self { match dtype { DType::Null => NullScalar::new().into(), - DType::Bool(_) => BoolScalar::new(None).into(), + DType::Bool(_) => BoolScalar::none().into(), DType::Int(w, s, _) => match (w, s) { (IntWidth::Unknown, Signedness::Unknown | Signedness::Signed) => { PrimitiveScalar::none(PType::I64).into() @@ -133,8 +138,8 @@ impl Scalar { FloatWidth::_32 => PrimitiveScalar::none(PType::F32).into(), FloatWidth::_64 => PrimitiveScalar::none(PType::F64).into(), }, - DType::Utf8(_) => Utf8Scalar::new(None).into(), - DType::Binary(_) => BinaryScalar::new(None).into(), + DType::Utf8(_) => Utf8Scalar::none().into(), + DType::Binary(_) => BinaryScalar::none().into(), DType::Struct(_, _) => StructScalar::new(dtype.clone(), vec![]).into(), DType::List(_, _) => ListScalar::new(dtype.clone(), None).into(), DType::Composite(_, _) => unimplemented!("CompositeScalar"), diff --git a/vortex-array/src/scalar/serde.rs b/vortex-array/src/scalar/serde.rs index 198b0feed4..e24cfeabfa 100644 --- a/vortex-array/src/scalar/serde.rs +++ b/vortex-array/src/scalar/serde.rs @@ -27,18 +27,17 @@ impl<'a, 'b> ScalarReader<'a, 'b> { pub fn read(&mut self) -> VortexResult { let tag = ScalarTag::try_from(self.reader.read_nbytes::<1>()?[0]) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let nullability = self.reader.nullability()?; + match tag { ScalarTag::Binary => { let slice = self.reader.read_optional_slice()?; - Ok(BinaryScalar::new(slice).into()) + Ok(BinaryScalar::new(slice, nullability)?.into()) } ScalarTag::Bool => { let is_present = self.reader.read_option_tag()?; - if is_present { - Ok(BoolScalar::some(self.reader.read_nbytes::<1>()?[0] != 0).into()) - } else { - Ok(BoolScalar::none().into()) - } + let bool = self.reader.read_nbytes::<1>()?[0] != 0; + Ok(BoolScalar::new(is_present.then_some(bool), nullability)?.into()) } ScalarTag::PrimitiveS => self.read_primitive_scalar().map(|p| p.into()), ScalarTag::List => { @@ -74,10 +73,11 @@ impl<'a, 'b> ScalarReader<'a, 'b> { } ScalarTag::Utf8 => { let value = self.reader.read_optional_slice()?; - Ok( - Utf8Scalar::new(value.map(|v| unsafe { String::from_utf8_unchecked(v) })) - .into(), - ) + Ok(Utf8Scalar::new( + value.map(|v| unsafe { String::from_utf8_unchecked(v) }), + nullability, + )? + .into()) } ScalarTag::Composite => { let dtype = self.reader.dtype()?; @@ -145,11 +145,15 @@ impl<'a, 'b> ScalarWriter<'a, 'b> { pub fn write(&mut self, scalar: &Scalar) -> VortexResult<()> { self.writer .write_fixed_slice([ScalarTag::from(scalar).into()])?; + self.writer.nullability(scalar.nullability())?; + match scalar { - Scalar::Binary(b) => self.writer.write_optional_slice(b.value()), + Scalar::Binary(b) => self + .writer + .write_optional_slice(b.value().map(|b| b.as_slice())), Scalar::Bool(b) => { self.writer.write_option_tag(b.value().is_some())?; - if let Some(v) = b.value() { + if let Some(&v) = b.value() { self.writer.write_fixed_slice([v as u8])?; } Ok(()) diff --git a/vortex-array/src/scalar/utf8.rs b/vortex-array/src/scalar/utf8.rs index f58be585e4..6a8c92ccad 100644 --- a/vortex-array/src/scalar/utf8.rs +++ b/vortex-array/src/scalar/utf8.rs @@ -1,27 +1,20 @@ use std::fmt::{Display, Formatter}; use vortex_error::{VortexError, VortexResult}; -use vortex_schema::{DType, Nullability}; +use vortex_schema::{DType, Nullability::NonNullable, Nullability::Nullable}; +use crate::scalar::value::ScalarValue; use crate::scalar::Scalar; -#[derive(Debug, Clone, PartialEq, PartialOrd)] -pub struct Utf8Scalar { - value: Option, -} +pub type Utf8Scalar = ScalarValue; impl Utf8Scalar { - pub fn new(value: Option) -> Self { - Self { value } - } - - pub fn value(&self) -> Option<&str> { - self.value.as_deref() - } - #[inline] pub fn dtype(&self) -> &DType { - &DType::Utf8(Nullability::NonNullable) + match self.nullability() { + NonNullable => &DType::Utf8(NonNullable), + Nullable => &DType::Utf8(Nullable), + } } pub fn cast(&self, _dtype: &DType) -> VortexResult { @@ -35,13 +28,13 @@ impl Utf8Scalar { impl From for Scalar { fn from(value: String) -> Self { - Utf8Scalar::new(Some(value)).into() + Utf8Scalar::some(value).into() } } impl From<&str> for Scalar { fn from(value: &str) -> Self { - Utf8Scalar::new(Some(value.to_string())).into() + Utf8Scalar::some(value.to_string()).into() } } @@ -52,7 +45,7 @@ impl TryFrom for String { let Scalar::Utf8(u) = value else { return Err(VortexError::InvalidDType(value.dtype().clone())); }; - match u.value { + match u.into_value() { None => Err(VortexError::InvalidDType(u.dtype().clone())), Some(s) => Ok(s), } diff --git a/vortex-array/src/scalar/value.rs b/vortex-array/src/scalar/value.rs new file mode 100644 index 0000000000..1b938f9165 --- /dev/null +++ b/vortex-array/src/scalar/value.rs @@ -0,0 +1,57 @@ +use vortex_error::VortexResult; +use vortex_schema::Nullability; + +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct ScalarValue { + nullability: Nullability, + value: Option, +} + +impl ScalarValue { + pub fn new(value: Option, nullability: Nullability) -> VortexResult { + if value.is_none() && nullability == Nullability::NonNullable { + return Err("Value cannot be None for NonNullable Scalar".into()); + } + Ok(Self { value, nullability }) + } + + pub fn non_nullable(value: T) -> Self { + Self { + value: Some(value), + nullability: Nullability::NonNullable, + } + } + + pub fn nullable(value: T) -> Self { + Self { + value: Some(value), + nullability: Nullability::Nullable, + } + } + + pub fn some(value: T) -> Self { + Self { + value: Some(value), + nullability: Nullability::default(), + } + } + + pub fn none() -> Self { + Self { + value: None, + nullability: Nullability::Nullable, + } + } + + pub fn value(&self) -> Option<&T> { + self.value.as_ref() + } + + pub fn into_value(self) -> Option { + self.value + } + + pub fn nullability(&self) -> Nullability { + self.nullability + } +} diff --git a/vortex-array/src/serde/mod.rs b/vortex-array/src/serde/mod.rs index 9e537522ac..417c42f3ec 100644 --- a/vortex-array/src/serde/mod.rs +++ b/vortex-array/src/serde/mod.rs @@ -95,6 +95,16 @@ impl<'a> ReadCtx<'a> { Ok(typetag.into()) } + pub fn nullability(&mut self) -> VortexResult { + match self.read_nbytes::<1>()? { + [0] => Ok(Nullability::NonNullable), + [1] => Ok(Nullability::Nullable), + _ => Err(VortexError::InvalidArgument( + "Invalid nullability tag".into(), + )), + } + } + #[inline] pub fn scalar(&mut self) -> VortexResult { ScalarReader::new(self).read() @@ -204,6 +214,13 @@ impl<'a> WriteCtx<'a> { self.write_fixed_slice([PTypeTag::from(ptype).into()]) } + pub fn nullability(&mut self, nullability: Nullability) -> VortexResult<()> { + match nullability { + Nullability::NonNullable => self.write_fixed_slice([0u8]), + Nullability::Nullable => self.write_fixed_slice([1u8]), + } + } + pub fn scalar(&mut self, scalar: &Scalar) -> VortexResult<()> { ScalarWriter::new(self).write(scalar) } diff --git a/vortex-schema/src/dtype.rs b/vortex-schema/src/dtype.rs index ac1a6c64f6..7a69e23585 100644 --- a/vortex-schema/src/dtype.rs +++ b/vortex-schema/src/dtype.rs @@ -14,6 +14,13 @@ pub enum Nullability { Nullable, } +impl Default for Nullability { + fn default() -> Self { + // TODO(ngates): is this a sensible default? + Nullability::NonNullable + } +} + impl From for Nullability { fn from(value: bool) -> Self { if value { From a5bc6b4b1b79cb38db54e088228b966835768920 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 20:39:50 +0000 Subject: [PATCH 03/12] nullable scalars --- vortex-array/src/array/constant/compute.rs | 39 +++++----- .../src/array/primitive/compute/scalar_at.rs | 11 +-- vortex-array/src/scalar/bool.rs | 3 +- vortex-array/src/scalar/mod.rs | 34 ++++----- vortex-array/src/scalar/primitive.rs | 74 +++++++++---------- vortex-array/src/scalar/serde.rs | 55 +++----------- vortex-array/src/scalar/utf8.rs | 3 +- vortex-dict/src/compress.rs | 14 +++- vortex-schema/src/dtype.rs | 10 +-- 9 files changed, 107 insertions(+), 136 deletions(-) diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index 623d55f023..7b724291d0 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -1,6 +1,7 @@ use itertools::Itertools; use vortex_error::VortexResult; +use vortex_schema::Nullability; use crate::array::bool::BoolArray; use crate::array::constant::ConstantArray; @@ -14,6 +15,7 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; use crate::match_each_native_ptype; use crate::scalar::Scalar; +use crate::validity::{ArrayValidity, Validity}; impl ArrayCompute for ConstantArray { fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { @@ -51,27 +53,26 @@ impl AsContiguousFn for ConstantArray { impl FlattenFn for ConstantArray { fn flatten(&self) -> VortexResult { + let validity = match self.nullability() { + Nullability::NonNullable => None, + Nullability::Nullable => Some(match self.scalar().is_null() { + true => Validity::Invalid(self.len()), + false => Validity::Valid(self.len()), + }), + }; + Ok(match self.scalar() { - Scalar::Bool(b) => { - if let Some(&bv) = b.value() { - FlattenedArray::Bool(BoolArray::from(vec![bv; self.len()])) - } else { - FlattenedArray::Bool(BoolArray::null(self.len())) - } - } + Scalar::Bool(b) => FlattenedArray::Bool(BoolArray::from_nullable( + vec![b.value().copied().unwrap_or_default(); self.len()], + validity, + )), Scalar::Primitive(p) => { - if let Some(ps) = p.value() { - match_each_native_ptype!(ps.ptype(), |$P| { - FlattenedArray::Primitive(PrimitiveArray::from_value::<$P>( - $P::try_from(self.scalar())?, - self.len(), - )) - }) - } else { - match_each_native_ptype!(p.ptype(), |$P| { - FlattenedArray::Primitive(PrimitiveArray::null::<$P>(self.len())) - }) - } + match_each_native_ptype!(p.ptype(), |$P| { + FlattenedArray::Primitive(PrimitiveArray::from_nullable::<$P>( + vec![$P::try_from(self.scalar())?; self.len()], + validity, + )) + }) } _ => panic!("Unsupported scalar type {}", self.dtype()), }) diff --git a/vortex-array/src/array/primitive/compute/scalar_at.rs b/vortex-array/src/array/primitive/compute/scalar_at.rs index 4e4c717498..97236c6122 100644 --- a/vortex-array/src/array/primitive/compute/scalar_at.rs +++ b/vortex-array/src/array/primitive/compute/scalar_at.rs @@ -8,10 +8,11 @@ use crate::validity::ArrayValidity; impl ScalarAtFn for PrimitiveArray { fn scalar_at(&self, index: usize) -> VortexResult { - if self.is_valid(index) { - Ok(match_each_native_ptype!(self.ptype, |$T| self.typed_data::<$T>()[index].into())) - } else { - Ok(PrimitiveScalar::none(self.ptype).into()) - } + match_each_native_ptype!(self.ptype, |$T| { + Ok(PrimitiveScalar::new( + self.is_valid(index).then(|| self.typed_data::<$T>()[index]), + self.nullability(), + )?.into()) + }) } } diff --git a/vortex-array/src/scalar/bool.rs b/vortex-array/src/scalar/bool.rs index 5d951d816c..6d25e4dfbb 100644 --- a/vortex-array/src/scalar/bool.rs +++ b/vortex-array/src/scalar/bool.rs @@ -43,7 +43,8 @@ impl TryFrom for bool { let Scalar::Bool(b) = value else { return Err(VortexError::InvalidDType(value.dtype().clone())); }; - b.into_value() + b.value() + .cloned() .ok_or_else(|| VortexError::InvalidDType(b.dtype().clone())) } } diff --git a/vortex-array/src/scalar/mod.rs b/vortex-array/src/scalar/mod.rs index 3d17b13bf8..16f2fc0671 100644 --- a/vortex-array/src/scalar/mod.rs +++ b/vortex-array/src/scalar/mod.rs @@ -1,3 +1,4 @@ +use half::f16; use std::fmt::{Debug, Display, Formatter}; pub use binary::*; @@ -12,7 +13,7 @@ pub use utf8::*; use vortex_error::VortexResult; use vortex_schema::{DType, FloatWidth, IntWidth, Nullability, Signedness}; -use crate::ptype::{NativePType, PType}; +use crate::ptype::NativePType; mod binary; mod bool; @@ -104,39 +105,38 @@ impl Scalar { } pub fn null(dtype: &DType) -> Self { + assert!(dtype.is_nullable()); match dtype { DType::Null => NullScalar::new().into(), DType::Bool(_) => BoolScalar::none().into(), DType::Int(w, s, _) => match (w, s) { (IntWidth::Unknown, Signedness::Unknown | Signedness::Signed) => { - PrimitiveScalar::none(PType::I64).into() + PrimitiveScalar::none::().into() } (IntWidth::_8, Signedness::Unknown | Signedness::Signed) => { - PrimitiveScalar::none(PType::I8).into() + PrimitiveScalar::none::().into() } (IntWidth::_16, Signedness::Unknown | Signedness::Signed) => { - PrimitiveScalar::none(PType::I16).into() + PrimitiveScalar::none::().into() } (IntWidth::_32, Signedness::Unknown | Signedness::Signed) => { - PrimitiveScalar::none(PType::I32).into() + PrimitiveScalar::none::().into() } (IntWidth::_64, Signedness::Unknown | Signedness::Signed) => { - PrimitiveScalar::none(PType::I64).into() + PrimitiveScalar::none::().into() } - (IntWidth::Unknown, Signedness::Unsigned) => { - PrimitiveScalar::none(PType::U64).into() - } - (IntWidth::_8, Signedness::Unsigned) => PrimitiveScalar::none(PType::U8).into(), - (IntWidth::_16, Signedness::Unsigned) => PrimitiveScalar::none(PType::U16).into(), - (IntWidth::_32, Signedness::Unsigned) => PrimitiveScalar::none(PType::U32).into(), - (IntWidth::_64, Signedness::Unsigned) => PrimitiveScalar::none(PType::U64).into(), + (IntWidth::Unknown, Signedness::Unsigned) => PrimitiveScalar::none::().into(), + (IntWidth::_8, Signedness::Unsigned) => PrimitiveScalar::none::().into(), + (IntWidth::_16, Signedness::Unsigned) => PrimitiveScalar::none::().into(), + (IntWidth::_32, Signedness::Unsigned) => PrimitiveScalar::none::().into(), + (IntWidth::_64, Signedness::Unsigned) => PrimitiveScalar::none::().into(), }, DType::Decimal(_, _, _) => unimplemented!("DecimalScalar"), DType::Float(w, _) => match w { - FloatWidth::Unknown => PrimitiveScalar::none(PType::F64).into(), - FloatWidth::_16 => PrimitiveScalar::none(PType::F16).into(), - FloatWidth::_32 => PrimitiveScalar::none(PType::F32).into(), - FloatWidth::_64 => PrimitiveScalar::none(PType::F64).into(), + FloatWidth::Unknown => PrimitiveScalar::none::().into(), + FloatWidth::_16 => PrimitiveScalar::none::().into(), + FloatWidth::_32 => PrimitiveScalar::none::().into(), + FloatWidth::_64 => PrimitiveScalar::none::().into(), }, DType::Utf8(_) => Utf8Scalar::none().into(), DType::Binary(_) => BinaryScalar::none().into(), diff --git a/vortex-array/src/scalar/primitive.rs b/vortex-array/src/scalar/primitive.rs index 4acaabb4f6..31c026fdc4 100644 --- a/vortex-array/src/scalar/primitive.rs +++ b/vortex-array/src/scalar/primitive.rs @@ -3,42 +3,49 @@ use std::mem::size_of; use half::f16; +use crate::match_each_native_ptype; use vortex_error::{VortexError, VortexResult}; -use vortex_schema::DType; +use vortex_schema::{DType, Nullability}; use crate::ptype::{NativePType, PType}; -use crate::scalar::composite::CompositeScalar; use crate::scalar::Scalar; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct PrimitiveScalar { ptype: PType, + nullability: Nullability, value: Option, - exponent: u8, } impl PrimitiveScalar { - pub fn new(ptype: PType, value: Option) -> Self { - Self { - ptype, - value, - exponent: 0, + pub fn new(value: Option, nullability: Nullability) -> VortexResult { + if value.is_none() && nullability == Nullability::NonNullable { + return Err("Value cannot be None for NonNullable Scalar".into()); } + Ok(Self { + ptype: T::PTYPE, + nullability, + value: value.map(|v| Into::::into(v)), + }) + } + + pub fn nullable(value: Option) -> Self { + Self::new(value, Nullability::Nullable).unwrap() } - pub fn some(value: PScalar) -> Self { + pub fn some(value: T) -> Self { Self { - ptype: value.ptype(), - value: Some(value), - exponent: 0, + ptype: T::PTYPE, + nullability: Nullability::default(), + value: Some(Into::::into(value)), } } - pub fn none(ptype: PType) -> Self { + pub fn none() -> Self { Self { - ptype, + ptype: T::PTYPE, + nullability: Nullability::Nullable, value: None, - exponent: 0, } } @@ -47,11 +54,6 @@ impl PrimitiveScalar { self.value } - #[inline] - pub fn factor(&self) -> u8 { - self.exponent - } - #[inline] pub fn ptype(&self) -> PType { self.ptype @@ -63,19 +65,17 @@ impl PrimitiveScalar { } pub fn cast(&self, dtype: &DType) -> VortexResult { - let ptype: VortexResult = dtype.try_into(); - ptype - .and_then(|p| match self.value() { - None => Ok(PrimitiveScalar::none(p).into()), - Some(ps) => ps.cast_ptype(p), - }) - .or_else(|_| self.cast_dtype(dtype)) - } - - // General conversion function that handles casting primitive scalar to non-primitive. - // TODO(robert): Implement storage conversions - fn cast_dtype(&self, dtype: &DType) -> VortexResult { - Ok(CompositeScalar::new(dtype.clone(), Box::new(self.clone().into())).into()) + let ptype: PType = dtype.try_into()?; + match_each_native_ptype!(ptype, |$T| { + Ok(PrimitiveScalar::new( + self.value() + .map(|ps| ps.cast_ptype(ptype)) + .transpose()? + .map(|s| $T::try_from(s)) + .transpose()?, + self.nullability, + )?.into()) + }) } pub fn nbytes(&self) -> usize { @@ -190,7 +190,7 @@ macro_rules! pscalar { impl From<$T> for Scalar { fn from(value: $T) -> Self { - PrimitiveScalar::some(PScalar::from(value)).into() + PrimitiveScalar::some(value).into() } } @@ -245,8 +245,8 @@ pscalar!(f64, F64); impl From> for Scalar { fn from(value: Option) -> Self { match value { - Some(value) => value.into(), - None => PrimitiveScalar::new(T::PTYPE, None).into(), + Some(value) => PrimitiveScalar::some::(value).into(), + None => PrimitiveScalar::none::().into(), } } } @@ -254,7 +254,7 @@ impl From> for Scalar { impl From for Scalar { #[inline] fn from(value: usize) -> Self { - PrimitiveScalar::new(PType::U64, Some(PScalar::U64(value as u64))).into() + PrimitiveScalar::some::(value as u64).into() } } diff --git a/vortex-array/src/scalar/serde.rs b/vortex-array/src/scalar/serde.rs index e24cfeabfa..445c6e7bd1 100644 --- a/vortex-array/src/scalar/serde.rs +++ b/vortex-array/src/scalar/serde.rs @@ -1,13 +1,12 @@ use std::io; use std::sync::Arc; -use half::f16; use num_enum::{IntoPrimitive, TryFromPrimitive}; +use crate::match_each_native_ptype; use vortex_error::VortexResult; -use vortex_schema::DType; +use vortex_schema::{DType, Nullability}; -use crate::ptype::PType; use crate::scalar::composite::CompositeScalar; use crate::scalar::{ BinaryScalar, BoolScalar, ListScalar, NullScalar, PScalar, PrimitiveScalar, Scalar, @@ -39,7 +38,7 @@ impl<'a, 'b> ScalarReader<'a, 'b> { let bool = self.reader.read_nbytes::<1>()?[0] != 0; Ok(BoolScalar::new(is_present.then_some(bool), nullability)?.into()) } - ScalarTag::PrimitiveS => self.read_primitive_scalar().map(|p| p.into()), + ScalarTag::PrimitiveS => self.read_primitive_scalar(nullability).map(|p| p.into()), ScalarTag::List => { let is_present = self.reader.read_option_tag()?; if is_present { @@ -87,49 +86,17 @@ impl<'a, 'b> ScalarReader<'a, 'b> { } } - fn read_primitive_scalar(&mut self) -> VortexResult { + fn read_primitive_scalar(&mut self, nullability: Nullability) -> VortexResult { let ptype = self.reader.ptype()?; let is_present = self.reader.read_option_tag()?; - if is_present { - let pscalar = match ptype { - PType::U8 => PrimitiveScalar::some(PScalar::U8(u8::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::U16 => PrimitiveScalar::some(PScalar::U16(u16::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::U32 => PrimitiveScalar::some(PScalar::U32(u32::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::U64 => PrimitiveScalar::some(PScalar::U64(u64::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::I8 => PrimitiveScalar::some(PScalar::I8(i8::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::I16 => PrimitiveScalar::some(PScalar::I16(i16::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::I32 => PrimitiveScalar::some(PScalar::I32(i32::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::I64 => PrimitiveScalar::some(PScalar::I64(i64::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::F16 => PrimitiveScalar::some(PScalar::F16(f16::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::F32 => PrimitiveScalar::some(PScalar::F32(f32::from_le_bytes( - self.reader.read_nbytes()?, - ))), - PType::F64 => PrimitiveScalar::some(PScalar::F64(f64::from_le_bytes( - self.reader.read_nbytes()?, - ))), + match_each_native_ptype!(ptype, |$P| { + let value = if is_present { + Some($P::from_le_bytes(self.reader.read_nbytes()?)) + } else { + None }; - Ok(pscalar) - } else { - Ok(PrimitiveScalar::none(ptype)) - } + Ok(PrimitiveScalar::new::<$P>(value, nullability)?) + }) } } diff --git a/vortex-array/src/scalar/utf8.rs b/vortex-array/src/scalar/utf8.rs index 6a8c92ccad..978acfd6ad 100644 --- a/vortex-array/src/scalar/utf8.rs +++ b/vortex-array/src/scalar/utf8.rs @@ -45,8 +45,9 @@ impl TryFrom for String { let Scalar::Utf8(u) = value else { return Err(VortexError::InvalidDType(value.dtype().clone())); }; + let dt = u.dtype().clone(); match u.into_value() { - None => Err(VortexError::InvalidDType(u.dtype().clone())), + None => Err(VortexError::InvalidDType(dt)), Some(s) => Ok(s), } } diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index 89f8466570..891aa9146d 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -245,8 +245,8 @@ mod test { use vortex::array::primitive::PrimitiveArray; use vortex::array::varbin::VarBinArray; use vortex::compute::scalar_at::scalar_at; - use vortex::ptype::PType; use vortex::scalar::PrimitiveScalar; + use vortex_schema::Nullability::Nullable; use crate::compress::{dict_encode_typed_primitive, dict_encode_varbin}; @@ -277,10 +277,16 @@ mod test { ); assert_eq!( scalar_at(&values, 0), - Ok(PrimitiveScalar::none(PType::I32).into()) + Ok(PrimitiveScalar::new::(None, Nullable).unwrap().into()) + ); + assert_eq!( + scalar_at(&values, 1), + Ok(PrimitiveScalar::nullable(Some(1)).into()) + ); + assert_eq!( + scalar_at(&values, 2), + Ok(PrimitiveScalar::nullable(Some(3)).into()) ); - assert_eq!(scalar_at(&values, 1), Ok(1.into())); - assert_eq!(scalar_at(&values, 2), Ok(3.into())); } #[test] diff --git a/vortex-schema/src/dtype.rs b/vortex-schema/src/dtype.rs index 7a69e23585..44e0ac0611 100644 --- a/vortex-schema/src/dtype.rs +++ b/vortex-schema/src/dtype.rs @@ -8,19 +8,13 @@ use DType::*; use crate::CompositeID; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Ord, PartialOrd)] pub enum Nullability { + #[default] NonNullable, Nullable, } -impl Default for Nullability { - fn default() -> Self { - // TODO(ngates): is this a sensible default? - Nullability::NonNullable - } -} - impl From for Nullability { fn from(value: bool) -> Self { if value { From b598666a38da250421d3cd44d6699f49f9662e5a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 20:45:02 +0000 Subject: [PATCH 04/12] nullable scalars --- vortex-array/src/scalar/primitive.rs | 16 +++++----------- vortex-array/src/scalar/value.rs | 20 ++++---------------- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/vortex-array/src/scalar/primitive.rs b/vortex-array/src/scalar/primitive.rs index 31c026fdc4..73eda83c4f 100644 --- a/vortex-array/src/scalar/primitive.rs +++ b/vortex-array/src/scalar/primitive.rs @@ -13,6 +13,7 @@ use crate::scalar::Scalar; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct PrimitiveScalar { ptype: PType, + dtype: DType, nullability: Nullability, value: Option, } @@ -24,6 +25,7 @@ impl PrimitiveScalar { } Ok(Self { ptype: T::PTYPE, + dtype: DType::from(T::PTYPE).with_nullability(nullability), nullability, value: value.map(|v| Into::::into(v)), }) @@ -34,19 +36,11 @@ impl PrimitiveScalar { } pub fn some(value: T) -> Self { - Self { - ptype: T::PTYPE, - nullability: Nullability::default(), - value: Some(Into::::into(value)), - } + Self::new::(Some(value), Nullability::default()).unwrap() } pub fn none() -> Self { - Self { - ptype: T::PTYPE, - nullability: Nullability::Nullable, - value: None, - } + Self::new::(None, Nullability::Nullable).unwrap() } #[inline] @@ -61,7 +55,7 @@ impl PrimitiveScalar { #[inline] pub fn dtype(&self) -> &DType { - self.ptype.into() + &self.dtype } pub fn cast(&self, dtype: &DType) -> VortexResult { diff --git a/vortex-array/src/scalar/value.rs b/vortex-array/src/scalar/value.rs index 1b938f9165..6a014b57f5 100644 --- a/vortex-array/src/scalar/value.rs +++ b/vortex-array/src/scalar/value.rs @@ -16,31 +16,19 @@ impl ScalarValue { } pub fn non_nullable(value: T) -> Self { - Self { - value: Some(value), - nullability: Nullability::NonNullable, - } + Self::new(Some(value), Nullability::NonNullable).unwrap() } pub fn nullable(value: T) -> Self { - Self { - value: Some(value), - nullability: Nullability::Nullable, - } + Self::new(Some(value), Nullability::Nullable).unwrap() } pub fn some(value: T) -> Self { - Self { - value: Some(value), - nullability: Nullability::default(), - } + Self::new(Some(value), Nullability::default()).unwrap() } pub fn none() -> Self { - Self { - value: None, - nullability: Nullability::Nullable, - } + Self::new(None, Nullability::Nullable).unwrap() } pub fn value(&self) -> Option<&T> { From 048b162c1fdf3e1a139a0c351117e2934ce852e5 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 21:06:52 +0000 Subject: [PATCH 05/12] Things --- bench-vortex/src/taxi_data.rs | 5 ++--- vortex-fastlanes/src/delta/compress.rs | 9 ++++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 69378a6bcc..dfecf3db4b 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -49,14 +49,13 @@ pub fn write_taxi_data() -> PathBuf { .unwrap(); let dtype = DType::from_arrow(reader.schema()); - let _ctx = compress_ctx(); + let ctx = compress_ctx(); let chunks = reader .map(|batch_result| batch_result.unwrap()) .map(|record_batch| { let vortex_array = record_batch.into_array(); - // ctx.compress(&vortex_array, None).unwrap() - vortex_array + ctx.compress(&vortex_array, None).unwrap() }) .collect_vec(); let chunked = ChunkedArray::new(chunks, dtype.clone()); diff --git a/vortex-fastlanes/src/delta/compress.rs b/vortex-fastlanes/src/delta/compress.rs index 8299d2c87f..f3218fc53d 100644 --- a/vortex-fastlanes/src/delta/compress.rs +++ b/vortex-fastlanes/src/delta/compress.rs @@ -13,6 +13,7 @@ use vortex::compute::flatten::flatten_primitive; use vortex::match_each_integer_ptype; use vortex::ptype::NativePType; use vortex::validity::ArrayValidity; +use vortex::validity::Validity; use vortex_error::VortexResult; use crate::{DeltaArray, DeltaEncoding}; @@ -51,7 +52,13 @@ impl EncodingCompression for DeltaEncoding { // Compress the filled array let (bases, deltas) = match_each_integer_ptype!(parray.ptype(), |$T| { let (bases, deltas) = compress_primitive(filled.as_primitive().typed_data::<$T>()); - (PrimitiveArray::from(bases), PrimitiveArray::from(deltas)) + let base_validity = validity.is_some().then(|| Validity::Valid(bases.len())); + let delta_validity = validity.is_some().then(|| Validity::Valid(deltas.len())); + ( + // To preserve nullability, we include Validity + PrimitiveArray::from_nullable(bases, base_validity), + PrimitiveArray::from_nullable(deltas, delta_validity), + ) }); // Recursively compress the bases and deltas From c3438528ae092ec1dac4d14acb518d7121127071 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 21:07:57 +0000 Subject: [PATCH 06/12] try_new --- vortex-array/src/array/bool/compute/mod.rs | 2 +- .../src/array/primitive/compute/scalar_at.rs | 2 +- vortex-array/src/scalar/primitive.rs | 13 ++++++++----- vortex-array/src/scalar/serde.rs | 8 ++++---- vortex-array/src/scalar/value.rs | 10 +++++----- vortex-dict/src/compress.rs | 4 +++- 6 files changed, 22 insertions(+), 17 deletions(-) diff --git a/vortex-array/src/array/bool/compute/mod.rs b/vortex-array/src/array/bool/compute/mod.rs index 8b9a26a77c..5b26eb2304 100644 --- a/vortex-array/src/array/bool/compute/mod.rs +++ b/vortex-array/src/array/bool/compute/mod.rs @@ -87,7 +87,7 @@ impl FlattenFn for BoolArray { impl ScalarAtFn for BoolArray { fn scalar_at(&self, index: usize) -> VortexResult { - Ok(BoolScalar::new( + Ok(BoolScalar::try_new( self.is_valid(index).then(|| self.buffer.value(index)), self.nullability(), ) diff --git a/vortex-array/src/array/primitive/compute/scalar_at.rs b/vortex-array/src/array/primitive/compute/scalar_at.rs index 97236c6122..587deb7ec9 100644 --- a/vortex-array/src/array/primitive/compute/scalar_at.rs +++ b/vortex-array/src/array/primitive/compute/scalar_at.rs @@ -9,7 +9,7 @@ use crate::validity::ArrayValidity; impl ScalarAtFn for PrimitiveArray { fn scalar_at(&self, index: usize) -> VortexResult { match_each_native_ptype!(self.ptype, |$T| { - Ok(PrimitiveScalar::new( + Ok(PrimitiveScalar::try_new( self.is_valid(index).then(|| self.typed_data::<$T>()[index]), self.nullability(), )?.into()) diff --git a/vortex-array/src/scalar/primitive.rs b/vortex-array/src/scalar/primitive.rs index 73eda83c4f..9f0a81c05b 100644 --- a/vortex-array/src/scalar/primitive.rs +++ b/vortex-array/src/scalar/primitive.rs @@ -19,7 +19,10 @@ pub struct PrimitiveScalar { } impl PrimitiveScalar { - pub fn new(value: Option, nullability: Nullability) -> VortexResult { + pub fn try_new( + value: Option, + nullability: Nullability, + ) -> VortexResult { if value.is_none() && nullability == Nullability::NonNullable { return Err("Value cannot be None for NonNullable Scalar".into()); } @@ -32,15 +35,15 @@ impl PrimitiveScalar { } pub fn nullable(value: Option) -> Self { - Self::new(value, Nullability::Nullable).unwrap() + Self::try_new(value, Nullability::Nullable).unwrap() } pub fn some(value: T) -> Self { - Self::new::(Some(value), Nullability::default()).unwrap() + Self::try_new::(Some(value), Nullability::default()).unwrap() } pub fn none() -> Self { - Self::new::(None, Nullability::Nullable).unwrap() + Self::try_new::(None, Nullability::Nullable).unwrap() } #[inline] @@ -61,7 +64,7 @@ impl PrimitiveScalar { pub fn cast(&self, dtype: &DType) -> VortexResult { let ptype: PType = dtype.try_into()?; match_each_native_ptype!(ptype, |$T| { - Ok(PrimitiveScalar::new( + Ok(PrimitiveScalar::try_new( self.value() .map(|ps| ps.cast_ptype(ptype)) .transpose()? diff --git a/vortex-array/src/scalar/serde.rs b/vortex-array/src/scalar/serde.rs index 445c6e7bd1..eeda9145e3 100644 --- a/vortex-array/src/scalar/serde.rs +++ b/vortex-array/src/scalar/serde.rs @@ -31,12 +31,12 @@ impl<'a, 'b> ScalarReader<'a, 'b> { match tag { ScalarTag::Binary => { let slice = self.reader.read_optional_slice()?; - Ok(BinaryScalar::new(slice, nullability)?.into()) + Ok(BinaryScalar::try_new(slice, nullability)?.into()) } ScalarTag::Bool => { let is_present = self.reader.read_option_tag()?; let bool = self.reader.read_nbytes::<1>()?[0] != 0; - Ok(BoolScalar::new(is_present.then_some(bool), nullability)?.into()) + Ok(BoolScalar::try_new(is_present.then_some(bool), nullability)?.into()) } ScalarTag::PrimitiveS => self.read_primitive_scalar(nullability).map(|p| p.into()), ScalarTag::List => { @@ -72,7 +72,7 @@ impl<'a, 'b> ScalarReader<'a, 'b> { } ScalarTag::Utf8 => { let value = self.reader.read_optional_slice()?; - Ok(Utf8Scalar::new( + Ok(Utf8Scalar::try_new( value.map(|v| unsafe { String::from_utf8_unchecked(v) }), nullability, )? @@ -95,7 +95,7 @@ impl<'a, 'b> ScalarReader<'a, 'b> { } else { None }; - Ok(PrimitiveScalar::new::<$P>(value, nullability)?) + Ok(PrimitiveScalar::try_new::<$P>(value, nullability)?) }) } } diff --git a/vortex-array/src/scalar/value.rs b/vortex-array/src/scalar/value.rs index 6a014b57f5..686a4b2d0a 100644 --- a/vortex-array/src/scalar/value.rs +++ b/vortex-array/src/scalar/value.rs @@ -8,7 +8,7 @@ pub struct ScalarValue { } impl ScalarValue { - pub fn new(value: Option, nullability: Nullability) -> VortexResult { + pub fn try_new(value: Option, nullability: Nullability) -> VortexResult { if value.is_none() && nullability == Nullability::NonNullable { return Err("Value cannot be None for NonNullable Scalar".into()); } @@ -16,19 +16,19 @@ impl ScalarValue { } pub fn non_nullable(value: T) -> Self { - Self::new(Some(value), Nullability::NonNullable).unwrap() + Self::try_new(Some(value), Nullability::NonNullable).unwrap() } pub fn nullable(value: T) -> Self { - Self::new(Some(value), Nullability::Nullable).unwrap() + Self::try_new(Some(value), Nullability::Nullable).unwrap() } pub fn some(value: T) -> Self { - Self::new(Some(value), Nullability::default()).unwrap() + Self::try_new(Some(value), Nullability::default()).unwrap() } pub fn none() -> Self { - Self::new(None, Nullability::Nullable).unwrap() + Self::try_new(None, Nullability::Nullable).unwrap() } pub fn value(&self) -> Option<&T> { diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index 891aa9146d..c8a9bad6ec 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -277,7 +277,9 @@ mod test { ); assert_eq!( scalar_at(&values, 0), - Ok(PrimitiveScalar::new::(None, Nullable).unwrap().into()) + Ok(PrimitiveScalar::try_new::(None, Nullable) + .unwrap() + .into()) ); assert_eq!( scalar_at(&values, 1), From ab5e8ecad1bce976104ebbe1305a6f4cd010020d Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 26 Mar 2024 21:13:51 +0000 Subject: [PATCH 07/12] try_new --- vortex-array/src/array/primitive/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 02ee95700e..812b4392c6 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -45,6 +45,9 @@ impl PrimitiveArray { pub fn try_new(ptype: PType, buffer: Buffer, validity: Option) -> VortexResult { if let Some(v) = &validity { + if v.len() != buffer.len() / ptype.byte_width() { + return Err("Validity length does not match buffer length".into()); + } assert_eq!(v.len(), buffer.len() / ptype.byte_width()); } let dtype = DType::from(ptype).with_nullability(validity.is_some().into()); From dc769433c2d35c05d359f31fa5f5426a18e4b094 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 27 Mar 2024 09:13:49 +0000 Subject: [PATCH 08/12] Fix up tests --- bench-vortex/src/bin/serde.rs | 2 +- bench-vortex/src/lib.rs | 7 ++++--- vortex-fastlanes/src/bitpacking/compress.rs | 21 ++++++++++++++------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/bench-vortex/src/bin/serde.rs b/bench-vortex/src/bin/serde.rs index 3667e1504b..ba78d1add1 100644 --- a/bench-vortex/src/bin/serde.rs +++ b/bench-vortex/src/bin/serde.rs @@ -5,6 +5,6 @@ use log::LevelFilter; pub fn main() { setup_logger(LevelFilter::Debug); let taxi_spiral = write_taxi_data(); - let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13]); //, 100_000, 3_000_000]); + let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13, 100_000, 3_000_000]); println!("TAKE TAXI DATA: {:?}", rows); } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 342d8e94b1..3336ae5f12 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -18,7 +18,8 @@ use vortex::compress::{CompressConfig, CompressCtx}; use vortex::formatter::display_tree; use vortex_alp::ALPEncoding; use vortex_datetime::DateTimeEncoding; -use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; +use vortex_dict::DictEncoding; +use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; use vortex_schema::DType; @@ -51,11 +52,11 @@ pub fn enumerate_arrays() -> Vec { println!("FOUND {:?}", ENCODINGS.iter().map(|e| e.id()).collect_vec()); vec![ &ALPEncoding, - //&DictEncoding, + &DictEncoding, &BitPackedEncoding, &FoREncoding, &DateTimeEncoding, - &DeltaEncoding, + // &DeltaEncoding, Blows up the search space too much. &REEEncoding, &RoaringBoolEncoding, // RoaringIntEncoding, diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 3e0bfb0fcf..df52cde228 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -120,13 +120,13 @@ fn bitpack_primitive(array: &[T], bit_width: usize) } // How many fastlanes vectors we will process. - let num_chunks = (array.len() + 1023) / 1024; + let num_chunks = array.len() / 1024; // Allocate a result byte array. let mut output = Vec::with_capacity(num_chunks * bit_width * 128); // Loop over all but the last chunk. - (0..num_chunks - 1).for_each(|i| { + (0..num_chunks).for_each(|i| { let start_elem = i * 1024; let chunk: &[T; 1024] = array_ref![array, start_elem, 1024]; TryBitPack::try_bitpack_into(chunk, bit_width, &mut output).unwrap(); @@ -211,14 +211,14 @@ fn bitunpack_primitive( } // How many fastlanes vectors we will process. - let num_chunks = (length + 1023) / 1024; + let num_chunks = length / 1024; // Allocate a result vector. let mut output = Vec::with_capacity(length); // Loop over all but the last chunk. let bytes_per_chunk = 128 * bit_width; - (0..num_chunks - 1).for_each(|i| { + (0..num_chunks).for_each(|i| { let chunk: &[u8] = &packed[i * bytes_per_chunk..][0..bytes_per_chunk]; TryBitPack::try_bitunpack_into(chunk, bit_width, &mut output).unwrap(); }); @@ -228,7 +228,7 @@ fn bitunpack_primitive( if last_chunk_size > 0 { let mut last_output = Vec::with_capacity(1024); TryBitPack::try_bitunpack_into( - &packed[(num_chunks - 1) * bytes_per_chunk..], + &packed[num_chunks * bytes_per_chunk..], bit_width, &mut last_output, ) @@ -309,11 +309,18 @@ mod test { } #[test] - fn test_decompress() { + fn test_compression_roundtrip() { + compression_roundtrip(125); + compression_roundtrip(1024); + compression_roundtrip(10_000); + compression_roundtrip(10_240); + } + + fn compression_roundtrip(n: usize) { let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); let ctx = CompressCtx::new(Arc::new(cfg)); - let values = PrimitiveArray::from(Vec::from_iter((0..10_000).map(|i| (i % 63) as u8))); + let values = PrimitiveArray::from(Vec::from_iter((0..n).map(|i| (i % 63) as u8))); let compressed = ctx.compress(&values, None).unwrap(); assert_eq!(compressed.encoding().id(), BitPackedEncoding.id()); From 4ea1fec536c37db675824fc6c9401cc03bbdb6b6 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 27 Mar 2024 09:23:55 +0000 Subject: [PATCH 09/12] Fix up tests --- Cargo.lock | 1 + vortex-roaring/Cargo.toml | 1 + vortex-roaring/src/boolean/compute.rs | 28 ++++++++++++++++++++++++++- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 33a9ddb977..ec56211950 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2753,6 +2753,7 @@ dependencies = [ name = "vortex-roaring" version = "0.1.0" dependencies = [ + "arrow-buffer", "croaring", "linkme", "log", diff --git a/vortex-roaring/Cargo.toml b/vortex-roaring/Cargo.toml index 49e3089bdd..68b60f7bfa 100644 --- a/vortex-roaring/Cargo.toml +++ b/vortex-roaring/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] +arrow-buffer = { workspace = true } vortex-array = { path = "../vortex-array" } vortex-error = { path = "../vortex-error" } vortex-schema = { path = "../vortex-schema" } diff --git a/vortex-roaring/src/boolean/compute.rs b/vortex-roaring/src/boolean/compute.rs index c687391c93..f3057a6e6d 100644 --- a/vortex-roaring/src/boolean/compute.rs +++ b/vortex-roaring/src/boolean/compute.rs @@ -1,16 +1,42 @@ +use arrow_buffer::{BooleanBuffer, Buffer}; +use vortex::array::bool::BoolArray; +use vortex::compute::flatten::{FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::ScalarAtFn; use vortex::compute::ArrayCompute; -use vortex::scalar::Scalar; +use vortex::scalar::{AsBytes, Scalar}; +use vortex::validity::ArrayValidity; use vortex_error::VortexResult; use crate::RoaringBoolArray; impl ArrayCompute for RoaringBoolArray { + fn flatten(&self) -> Option<&dyn FlattenFn> { + Some(self) + } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } } +impl FlattenFn for RoaringBoolArray { + fn flatten(&self) -> VortexResult { + // TODO(ngates): benchmark the fastest conversion from BitMap. + // Via bitset requires two copies. + let bitset = self + .bitmap + .to_bitset() + .ok_or("Failed to convert RoaringBitmap to Bitset")?; + + let bytes = &bitset.as_slice().as_bytes()[0..bitset.size_in_bytes()]; + let buffer = Buffer::from_slice_ref(bytes); + Ok(FlattenedArray::Bool(BoolArray::new( + BooleanBuffer::new(buffer, 0, bitset.size_in_bits()), + self.validity(), + ))) + } +} + impl ScalarAtFn for RoaringBoolArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.bitmap.contains(index as u32) { From 66a2343ad00bb3c12710666fabc496e9ddf13705 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 27 Mar 2024 10:17:43 +0000 Subject: [PATCH 10/12] Fix up tests --- Cargo.lock | 2 + bench-vortex/Cargo.toml | 7 +- bench-vortex/src/bin/serde.rs | 7 +- bench-vortex/src/lib.rs | 10 ++- bench-vortex/src/reader.rs | 93 +++++++++++++++++++++++ bench-vortex/src/taxi_data.rs | 138 ++++++++++------------------------ vortex-error/Cargo.toml | 1 + vortex-error/src/lib.rs | 6 ++ 8 files changed, 156 insertions(+), 108 deletions(-) create mode 100644 bench-vortex/src/reader.rs diff --git a/Cargo.lock b/Cargo.lock index ec56211950..c5732b00af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,6 +377,7 @@ dependencies = [ "vortex-array", "vortex-datetime", "vortex-dict", + "vortex-error", "vortex-fastlanes", "vortex-ree", "vortex-roaring", @@ -2714,6 +2715,7 @@ name = "vortex-error" version = "0.1.0" dependencies = [ "arrow-schema", + "parquet", "thiserror", "vortex-schema", ] diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 5f64e0316a..8b189e4bba 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -17,19 +17,20 @@ workspace = true [dependencies] arrow-array = { workspace = true } arrow-select = { workspace = true } +vortex-alp = { path = "../vortex-alp" } vortex-array = { path = "../vortex-array" } vortex-datetime = { path = "../vortex-datetime" } -vortex-alp = { path = "../vortex-alp" } vortex-dict = { path = "../vortex-dict" } +vortex-error = { path = "../vortex-error", features = ["parquet"] } vortex-fastlanes = { path = "../vortex-fastlanes" } vortex-ree = { path = "../vortex-ree" } vortex-roaring = { path = "../vortex-roaring" } vortex-schema = { path = "../vortex-schema" } vortex-zigzag = { path = "../vortex-zigzag" } itertools = { workspace = true } -reqwest = { workspace = true } -parquet = { workspace = true } log = { workspace = true } +parquet = { workspace = true } +reqwest = { workspace = true } simplelog = { workspace = true } [dev-dependencies] diff --git a/bench-vortex/src/bin/serde.rs b/bench-vortex/src/bin/serde.rs index ba78d1add1..7c9451011c 100644 --- a/bench-vortex/src/bin/serde.rs +++ b/bench-vortex/src/bin/serde.rs @@ -1,10 +1,11 @@ +use bench_vortex::reader::take_vortex; use bench_vortex::setup_logger; -use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data}; +use bench_vortex::taxi_data::taxi_data_vortex; use log::LevelFilter; pub fn main() { setup_logger(LevelFilter::Debug); - let taxi_spiral = write_taxi_data(); - let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13, 100_000, 3_000_000]); + let taxi_vortex = taxi_data_vortex(); + let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000]).unwrap(); println!("TAKE TAXI DATA: {:?}", rows); } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 3336ae5f12..615c097344 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -24,18 +24,22 @@ use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; use vortex_schema::DType; +pub mod reader; pub mod taxi_data; -pub fn idempotent(name: &str, f: impl FnOnce(&mut File)) -> PathBuf { +pub fn idempotent( + name: &str, + f: impl FnOnce(&mut File) -> Result, +) -> Result { let path = Path::new(env!("CARGO_MANIFEST_DIR")) .join("data") .join(name); if !path.exists() { create_dir_all(path.parent().unwrap()).unwrap(); let mut file = File::create(&path).unwrap(); - f(&mut file); + f(&mut file)?; } - path.to_path_buf() + Ok(path.to_path_buf()) } pub fn setup_logger(level: LevelFilter) { diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs new file mode 100644 index 0000000000..5d0e3d20d4 --- /dev/null +++ b/bench-vortex/src/reader.rs @@ -0,0 +1,93 @@ +use arrow_array::types::Int64Type; +use arrow_array::{ + ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch, + RecordBatchReader, +}; +use arrow_select::concat::concat_batches; +use arrow_select::take::take_record_batch; +use itertools::Itertools; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::collections::HashMap; +use std::fs::File; +use std::path::Path; +use std::sync::Arc; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::ArrayRef; +use vortex::compute::take::take; +use vortex::ptype::PType; +use vortex::serde::ReadCtx; +use vortex_error::VortexResult; +use vortex_schema::DType; + +pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult { + let chunked = { + let mut file = File::open(path)?; + let dummy_dtype: DType = PType::U8.into(); + let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file); + let dtype = read_ctx.dtype()?; + read_ctx.with_schema(&dtype).read()? + }; + take(&chunked, &PrimitiveArray::from(indices.to_vec())) +} + +pub fn take_arrow(path: &Path, indices: &[u64]) -> VortexResult { + let file = File::open(path)?; + + // TODO(ngates): enable read_page_index + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + + // We figure out which row groups we need to read and a selection filter for each of them. + let mut row_groups = HashMap::new(); + let mut row_group_offsets = vec![0]; + row_group_offsets.extend( + builder + .metadata() + .row_groups() + .iter() + .map(|rg| rg.num_rows()) + .scan(0i64, |acc, x| { + *acc += x; + Some(*acc) + }), + ); + + for idx in indices { + let row_group_idx = row_group_offsets + .binary_search(&(*idx as i64)) + .unwrap_or_else(|e| e - 1); + if !row_groups.contains_key(&row_group_idx) { + row_groups.insert(row_group_idx, Vec::new()); + } + row_groups + .get_mut(&row_group_idx) + .unwrap() + .push((*idx as i64) - row_group_offsets[row_group_idx]); + } + let row_group_indices = row_groups + .keys() + .sorted() + .map(|i| row_groups.get(i).unwrap().clone()) + .collect_vec(); + + let reader = builder + .with_row_groups(row_groups.keys().copied().collect_vec()) + // FIXME(ngates): our indices code assumes the batch size == the row group sizes + .with_batch_size(10_000_000) + .build() + .unwrap(); + + let schema = reader.schema(); + + let batches = reader + .into_iter() + .enumerate() + .map(|(idx, batch)| { + let batch = batch.unwrap(); + let indices = ArrowPrimitiveArray::::from(row_group_indices[idx].clone()); + let indices_array: ArrowArrayRef = Arc::new(indices); + take_record_batch(&batch, &indices_array).unwrap() + }) + .collect_vec(); + + Ok(concat_batches(&schema, &batches)?) +} diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index dfecf3db4b..d5fcb7962d 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -1,24 +1,13 @@ -use arrow_array::types::Int64Type; -use arrow_array::{ - ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch, - RecordBatchReader, -}; -use arrow_select::concat::concat_batches; -use arrow_select::take::take_record_batch; +use arrow_array::RecordBatchReader; use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ProjectionMask; -use std::collections::HashMap; use std::fs::File; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::path::PathBuf; use vortex::array::chunked::ChunkedArray; -use vortex::array::primitive::PrimitiveArray; -use vortex::array::{ArrayRef, IntoArray}; +use vortex::array::IntoArray; use vortex::arrow::FromArrowType; -use vortex::compute::take::take; -use vortex::ptype::PType; -use vortex::serde::{ReadCtx, WriteCtx}; +use vortex::serde::WriteCtx; use vortex_schema::DType; use crate::{compress_ctx, idempotent}; @@ -30,111 +19,62 @@ pub fn download_taxi_data() -> PathBuf { ) .unwrap() .copy_to(file) - .unwrap(); }) + .unwrap() } -pub fn write_taxi_data() -> PathBuf { - idempotent("taxi.spiral", |write| { +pub fn taxi_data_parquet() -> PathBuf { + download_taxi_data() +} + +pub fn taxi_data_vortex() -> PathBuf { + idempotent("taxi-uncompressed.vortex", |write| { let taxi_pq = File::open(download_taxi_data()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); let _mask = ProjectionMask::roots(builder.parquet_schema(), (0..14).collect_vec()); - // FIXME(ngates): the compressor should handle batch size. - let reader = builder - // .with_limit(100) - // .with_projection(_mask) - .with_batch_size(65_536) - .build() - .unwrap(); + // FIXME(ngates): #157 the compressor should handle batch size. + let reader = builder.with_batch_size(65_536).build().unwrap(); let dtype = DType::from_arrow(reader.schema()); - let ctx = compress_ctx(); let chunks = reader .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| { - let vortex_array = record_batch.into_array(); - ctx.compress(&vortex_array, None).unwrap() - }) + .map(|record_batch| record_batch.into_array()) .collect_vec(); let chunked = ChunkedArray::new(chunks, dtype.clone()); let mut write_ctx = WriteCtx::new(write); - write_ctx.dtype(&dtype).unwrap(); - write_ctx.write(&chunked).unwrap(); + write_ctx.dtype(&dtype)?; + write_ctx.write(&chunked) }) + .unwrap() } -pub fn take_taxi_data(path: &Path, indices: &[u64]) -> ArrayRef { - let chunked = { - let mut file = File::open(path).unwrap(); - let dummy_dtype: DType = PType::U8.into(); - let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file); - let dtype = read_ctx.dtype().unwrap(); - read_ctx.with_schema(&dtype).read().unwrap() - }; - take(&chunked, &PrimitiveArray::from(indices.to_vec())).unwrap() -} - -pub fn take_taxi_data_arrow(path: &Path, indices: &[u64]) -> RecordBatch { - let file = File::open(path).unwrap(); - - // TODO(ngates): enable read_page_index - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - - // We figure out which row groups we need to read and a selection filter for each of them. - let mut row_groups = HashMap::new(); - let mut row_group_offsets = vec![0]; - row_group_offsets.extend( - builder - .metadata() - .row_groups() - .iter() - .map(|rg| rg.num_rows()) - .scan(0i64, |acc, x| { - *acc += x; - Some(*acc) - }), - ); - - for idx in indices { - let row_group_idx = row_group_offsets - .binary_search(&(*idx as i64)) - .unwrap_or_else(|e| e - 1); - if !row_groups.contains_key(&row_group_idx) { - row_groups.insert(row_group_idx, Vec::new()); - } - row_groups - .get_mut(&row_group_idx) - .unwrap() - .push((*idx as i64) - row_group_offsets[row_group_idx]); - } - let row_group_indices = row_groups - .keys() - .sorted() - .map(|i| row_groups.get(i).unwrap().clone()) - .collect_vec(); +pub fn taxi_data_vortex_compressed() -> PathBuf { + idempotent("taxi.vortex", |write| { + let taxi_pq = File::open(download_taxi_data())?; + let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; + let _mask = ProjectionMask::roots(builder.parquet_schema(), (0..14).collect_vec()); - let reader = builder - .with_row_groups(row_groups.keys().copied().collect_vec()) - // FIXME(ngates): our indices code assumes the batch size == the row group sizes - .with_batch_size(10_000_000) - .build() - .unwrap(); + // FIXME(ngates): #157 the compressor should handle batch size. + let reader = builder.with_batch_size(65_536).build()?; - let schema = reader.schema(); + let dtype = DType::from_arrow(reader.schema()); + let ctx = compress_ctx(); - let batches = reader - .into_iter() - .enumerate() - .map(|(idx, batch)| { - let batch = batch.unwrap(); - let indices = ArrowPrimitiveArray::::from(row_group_indices[idx].clone()); - let indices_array: ArrowArrayRef = Arc::new(indices); - take_record_batch(&batch, &indices_array).unwrap() - }) - .collect_vec(); + let chunks = reader + .map(|batch_result| batch_result.unwrap()) + .map(|record_batch| { + let vortex_array = record_batch.into_array(); + ctx.compress(&vortex_array, None).unwrap() + }) + .collect_vec(); + let chunked = ChunkedArray::new(chunks, dtype.clone()); - concat_batches(&schema, &batches).unwrap() + let mut write_ctx = WriteCtx::new(write); + write_ctx.dtype(&dtype).unwrap(); + write_ctx.write(&chunked) + }) + .unwrap() } diff --git a/vortex-error/Cargo.toml b/vortex-error/Cargo.toml index 5c5f4bab3b..4f882fa3bb 100644 --- a/vortex-error/Cargo.toml +++ b/vortex-error/Cargo.toml @@ -17,6 +17,7 @@ path = "src/lib.rs" [dependencies] arrow-schema = { workspace = true } +parquet = { workspace = true, optional = true } vortex-schema = { path = "../vortex-schema" } thiserror = { workspace = true } diff --git a/vortex-error/src/lib.rs b/vortex-error/src/lib.rs index 5b2c2c2323..f68675929c 100644 --- a/vortex-error/src/lib.rs +++ b/vortex-error/src/lib.rs @@ -33,6 +33,9 @@ pub enum VortexError { ArrowError(ArrowError), #[error(transparent)] IOError(IOError), + #[cfg(feature = "parquet")] + #[error(transparent)] + ParquetError(ParquetError), } pub type VortexResult = Result; @@ -72,3 +75,6 @@ macro_rules! wrapped_error { wrapped_error!(arrow_schema::ArrowError, ArrowError); wrapped_error!(io::Error, IOError); + +#[cfg(feature = "parquet")] +wrapped_error!(parquet::errors::ParquetError, ParquetError); From 2ecfcd3cf46f05fc964a48ee1b80bc32e5fcdddb Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 27 Mar 2024 10:47:36 +0000 Subject: [PATCH 11/12] Take --- bench-vortex/benches/random_access.rs | 21 ++++++++++----------- bench-vortex/src/reader.rs | 8 ++++++-- bench-vortex/src/taxi_data.rs | 3 --- vortex-alp/src/compute.rs | 19 ++++++++++++++++++- vortex-dict/src/compute.rs | 20 ++++++++++++++++---- vortex-fastlanes/src/for/compute.rs | 17 +++++++++++++++++ 6 files changed, 67 insertions(+), 21 deletions(-) diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index fcb7bb7ce8..b85179e8a5 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -1,23 +1,22 @@ -use bench_vortex::taxi_data::{ - download_taxi_data, take_taxi_data, take_taxi_data_arrow, write_taxi_data, -}; +use bench_vortex::reader::take_vortex; +use bench_vortex::taxi_data::taxi_data_vortex_compressed; use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn random_access(c: &mut Criterion) { let mut group = c.benchmark_group("random access"); - group.sample_size(10); + // group.sample_size(10); let indices = [10, 11, 12, 13, 100_000, 3_000_000]; - let taxi_vortex = write_taxi_data(); + let taxi_vortex = taxi_data_vortex_compressed(); group.bench_function("vortex", |b| { - b.iter(|| black_box(take_taxi_data(&taxi_vortex, &indices))) - }); - - let taxi_parquet = download_taxi_data(); - group.bench_function("arrow", |b| { - b.iter(|| black_box(take_taxi_data_arrow(&taxi_parquet, &indices))) + b.iter(|| black_box(take_vortex(&taxi_vortex, &indices).unwrap())) }); + // + // let taxi_parquet = taxi_data_parquet(); + // group.bench_function("arrow", |b| { + // b.iter(|| black_box(take_parquet(&taxi_parquet, &indices)?)) + // }); } criterion_group!(benches, random_access); diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 5d0e3d20d4..2f9d1dbc98 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -13,6 +13,7 @@ use std::path::Path; use std::sync::Arc; use vortex::array::primitive::PrimitiveArray; use vortex::array::ArrayRef; +use vortex::compute::flatten::flatten; use vortex::compute::take::take; use vortex::ptype::PType; use vortex::serde::ReadCtx; @@ -27,10 +28,13 @@ pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult { let dtype = read_ctx.dtype()?; read_ctx.with_schema(&dtype).read()? }; - take(&chunked, &PrimitiveArray::from(indices.to_vec())) + let taken = take(&chunked, &PrimitiveArray::from(indices.to_vec()))?; + + // For equivalence.... we flatten to make sure we're not cheating too much. + flatten(&taken).map(|x| x.into_array()) } -pub fn take_arrow(path: &Path, indices: &[u64]) -> VortexResult { +pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult { let file = File::open(path)?; // TODO(ngates): enable read_page_index diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index d5fcb7962d..ce17e9a30b 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -1,7 +1,6 @@ use arrow_array::RecordBatchReader; use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use parquet::arrow::ProjectionMask; use std::fs::File; use std::path::PathBuf; use vortex::array::chunked::ChunkedArray; @@ -31,7 +30,6 @@ pub fn taxi_data_vortex() -> PathBuf { idempotent("taxi-uncompressed.vortex", |write| { let taxi_pq = File::open(download_taxi_data()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); - let _mask = ProjectionMask::roots(builder.parquet_schema(), (0..14).collect_vec()); // FIXME(ngates): #157 the compressor should handle batch size. let reader = builder.with_batch_size(65_536).build().unwrap(); @@ -55,7 +53,6 @@ pub fn taxi_data_vortex_compressed() -> PathBuf { idempotent("taxi.vortex", |write| { let taxi_pq = File::open(download_taxi_data())?; let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; - let _mask = ProjectionMask::roots(builder.parquet_schema(), (0..14).collect_vec()); // FIXME(ngates): #157 the compressor should handle batch size. let reader = builder.with_batch_size(65_536).build()?; diff --git a/vortex-alp/src/compute.rs b/vortex-alp/src/compute.rs index f3ad29ffd6..b52a12208b 100644 --- a/vortex-alp/src/compute.rs +++ b/vortex-alp/src/compute.rs @@ -1,6 +1,7 @@ -use vortex::array::Array; +use vortex::array::{Array, ArrayRef}; use vortex::compute::flatten::{FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; +use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex::scalar::Scalar; use vortex_error::VortexResult; @@ -16,6 +17,10 @@ impl ArrayCompute for ALPArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl FlattenFn for ALPArray { @@ -40,3 +45,15 @@ impl ScalarAtFn for ALPArray { }) } } + +impl TakeFn for ALPArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + // TODO(ngates): wrap up indices in an array that caches decompression? + Ok(ALPArray::new( + take(self.encoded(), indices)?, + self.exponents().clone(), + self.patches().map(|p| take(p, indices)).transpose()?, + ) + .into_array()) + } +} diff --git a/vortex-dict/src/compute.rs b/vortex-dict/src/compute.rs index 7bfb0a16a5..8b4cde4aab 100644 --- a/vortex-dict/src/compute.rs +++ b/vortex-dict/src/compute.rs @@ -1,6 +1,7 @@ +use vortex::array::{Array, ArrayRef}; use vortex::compute::flatten::{flatten, FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; -use vortex::compute::take::take; +use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex::scalar::Scalar; use vortex_error::VortexResult; @@ -15,6 +16,16 @@ impl ArrayCompute for DictArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} + +impl FlattenFn for DictArray { + fn flatten(&self) -> VortexResult { + flatten(&take(self.values(), self.codes())?) + } } impl ScalarAtFn for DictArray { @@ -24,9 +35,10 @@ impl ScalarAtFn for DictArray { } } -impl FlattenFn for DictArray { - fn flatten(&self) -> VortexResult { - flatten(&take(self.values(), self.codes())?) +impl TakeFn for DictArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + let codes = take(self.codes(), indices)?; + take(self.values(), &codes) } } diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index e46c546384..7ef2fd3733 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -1,6 +1,8 @@ use crate::r#for::compress::decompress; use crate::FoRArray; +use vortex::array::{Array, ArrayRef}; use vortex::compute::flatten::{FlattenFn, FlattenedArray}; +use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex_error::VortexResult; @@ -8,6 +10,10 @@ impl ArrayCompute for FoRArray { fn flatten(&self) -> Option<&dyn FlattenFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl FlattenFn for FoRArray { @@ -15,3 +21,14 @@ impl FlattenFn for FoRArray { decompress(self).map(FlattenedArray::Primitive) } } + +impl TakeFn for FoRArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + Ok(FoRArray::try_new( + take(self.encoded(), indices)?, + self.reference.clone(), + self.shift, + )? + .into_array()) + } +} From 5e74e70f738849cc2e92dfb730ac159de38b72ad Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 27 Mar 2024 11:17:05 +0000 Subject: [PATCH 12/12] Take --- bench-vortex/benches/compress_benchmark.rs | 4 +- bench-vortex/src/bin/compress.rs | 28 ++++++++++ bench-vortex/src/lib.rs | 26 ++++++---- bench-vortex/src/reader.rs | 59 ++++++++++++++++------ bench-vortex/src/taxi_data.rs | 27 ++-------- vortex-dict/src/compute.rs | 3 ++ 6 files changed, 97 insertions(+), 50 deletions(-) create mode 100644 bench-vortex/src/bin/compress.rs diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 1e0513c3d6..a434538daa 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -1,10 +1,10 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use bench_vortex::compress_taxi_data; -use bench_vortex::taxi_data::download_taxi_data; +use bench_vortex::taxi_data::taxi_data_parquet; fn vortex_compress(c: &mut Criterion) { - download_taxi_data(); + taxi_data_parquet(); let mut group = c.benchmark_group("end to end"); group.sample_size(10); group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); diff --git a/bench-vortex/src/bin/compress.rs b/bench-vortex/src/bin/compress.rs new file mode 100644 index 0000000000..68f157572a --- /dev/null +++ b/bench-vortex/src/bin/compress.rs @@ -0,0 +1,28 @@ +use bench_vortex::reader::{compress_vortex, open_vortex}; +use bench_vortex::setup_logger; +use bench_vortex::taxi_data::taxi_data_parquet; +use log::LevelFilter; +use std::fs::File; +use std::os::unix::prelude::MetadataExt; +use std::path::PathBuf; +use vortex::array::Array; +use vortex::formatter::display_tree; + +pub fn main() { + setup_logger(LevelFilter::Debug); + + let path: PathBuf = "taxi_data.vortex".into(); + { + let mut write = File::create(&path).unwrap(); + compress_vortex(&taxi_data_parquet(), &mut write).unwrap(); + } + + let taxi_vortex = open_vortex(&path).unwrap(); + + let pq_size = taxi_data_parquet().metadata().unwrap().size(); + let vx_size = taxi_vortex.nbytes(); + + println!("{}\n\n", display_tree(taxi_vortex.as_ref())); + println!("Parquet size: {}, Vortex size: {}", pq_size, vx_size); + println!("Compression ratio: {}", vx_size as f32 / pq_size as f32); +} diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 615c097344..29f4ba616c 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -9,6 +9,7 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ProjectionMask; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; +use crate::taxi_data::taxi_data_parquet; use vortex::array::chunked::ChunkedArray; use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::IntoArray; @@ -31,17 +32,24 @@ pub fn idempotent( name: &str, f: impl FnOnce(&mut File) -> Result, ) -> Result { - let path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("data") - .join(name); + let path = data_path(name); if !path.exists() { - create_dir_all(path.parent().unwrap()).unwrap(); let mut file = File::create(&path).unwrap(); f(&mut file)?; } Ok(path.to_path_buf()) } +pub fn data_path(name: &str) -> PathBuf { + let path = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join(name); + if !path.parent().unwrap().exists() { + create_dir_all(path.parent().unwrap()).unwrap(); + } + path +} + pub fn setup_logger(level: LevelFilter) { TermLogger::init( level, @@ -76,7 +84,7 @@ pub fn compress_ctx() -> CompressCtx { } pub fn compress_taxi_data() -> ArrayRef { - let file = File::open(taxi_data::download_taxi_data()).unwrap(); + let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let _mask = ProjectionMask::roots(builder.parquet_schema(), [1]); let _no_datetime_mask = ProjectionMask::roots( @@ -146,7 +154,7 @@ mod test { use vortex::encode::FromArrowArray; use vortex::serde::{ReadCtx, WriteCtx}; - use crate::taxi_data::download_taxi_data; + use crate::taxi_data::taxi_data_parquet; use crate::{compress_ctx, compress_taxi_data, setup_logger}; #[ignore] @@ -159,7 +167,7 @@ mod test { #[ignore] #[test] fn round_trip_serde() { - let file = File::open(download_taxi_data()).unwrap(); + let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let reader = builder.with_limit(1).build().unwrap(); @@ -181,7 +189,7 @@ mod test { #[ignore] #[test] fn round_trip_arrow() { - let file = File::open(download_taxi_data()).unwrap(); + let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let reader = builder.with_limit(1).build().unwrap(); @@ -199,7 +207,7 @@ mod test { #[ignore] #[test] fn round_trip_arrow_compressed() { - let file = File::open(download_taxi_data()).unwrap(); + let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let reader = builder.with_limit(1).build().unwrap(); diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 2f9d1dbc98..986717f55c 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -1,3 +1,4 @@ +use crate::compress_ctx; use arrow_array::types::Int64Type; use arrow_array::{ ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch, @@ -9,27 +10,56 @@ use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use std::collections::HashMap; use std::fs::File; +use std::io::Write; use std::path::Path; use std::sync::Arc; +use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; -use vortex::array::ArrayRef; +use vortex::array::{ArrayRef, IntoArray}; +use vortex::arrow::FromArrowType; use vortex::compute::flatten::flatten; use vortex::compute::take::take; use vortex::ptype::PType; -use vortex::serde::ReadCtx; +use vortex::serde::{ReadCtx, WriteCtx}; use vortex_error::VortexResult; use vortex_schema::DType; -pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult { - let chunked = { - let mut file = File::open(path)?; - let dummy_dtype: DType = PType::U8.into(); - let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file); - let dtype = read_ctx.dtype()?; - read_ctx.with_schema(&dtype).read()? - }; - let taken = take(&chunked, &PrimitiveArray::from(indices.to_vec()))?; +pub fn open_vortex(path: &Path) -> VortexResult { + let mut file = File::open(path)?; + let dummy_dtype: DType = PType::U8.into(); + let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file); + let dtype = read_ctx.dtype()?; + read_ctx.with_schema(&dtype).read() +} + +pub fn compress_vortex(parquet_path: &Path, write: &mut W) -> VortexResult<()> { + let taxi_pq = File::open(parquet_path)?; + let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; + // FIXME(ngates): #157 the compressor should handle batch size. + let reader = builder.with_batch_size(65_536).build()?; + + let dtype = DType::from_arrow(reader.schema()); + let ctx = compress_ctx(); + + let chunks = reader + .map(|batch_result| batch_result.unwrap()) + .map(|record_batch| { + let vortex_array = record_batch.into_array(); + ctx.compress(&vortex_array, None).unwrap() + }) + .collect_vec(); + let chunked = ChunkedArray::new(chunks, dtype.clone()); + + let mut write_ctx = WriteCtx::new(write); + write_ctx.dtype(&dtype).unwrap(); + write_ctx.write(&chunked).unwrap(); + Ok(()) +} + +pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult { + let array = open_vortex(path)?; + let taken = take(&array, &PrimitiveArray::from(indices.to_vec()))?; // For equivalence.... we flatten to make sure we're not cheating too much. flatten(&taken).map(|x| x.into_array()) } @@ -59,12 +89,9 @@ pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult { let row_group_idx = row_group_offsets .binary_search(&(*idx as i64)) .unwrap_or_else(|e| e - 1); - if !row_groups.contains_key(&row_group_idx) { - row_groups.insert(row_group_idx, Vec::new()); - } row_groups - .get_mut(&row_group_idx) - .unwrap() + .entry(row_group_idx) + .or_insert_with(Vec::new) .push((*idx as i64) - row_group_offsets[row_group_idx]); } let row_group_indices = row_groups diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index ce17e9a30b..ad9346ce05 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -9,9 +9,10 @@ use vortex::arrow::FromArrowType; use vortex::serde::WriteCtx; use vortex_schema::DType; -use crate::{compress_ctx, idempotent}; +use crate::idempotent; +use crate::reader::compress_vortex; -pub fn download_taxi_data() -> PathBuf { +fn download_taxi_data() -> PathBuf { idempotent("yellow-tripdata-2023-11.parquet", |file| { reqwest::blocking::get( "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet", @@ -51,27 +52,7 @@ pub fn taxi_data_vortex() -> PathBuf { pub fn taxi_data_vortex_compressed() -> PathBuf { idempotent("taxi.vortex", |write| { - let taxi_pq = File::open(download_taxi_data())?; - let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; - - // FIXME(ngates): #157 the compressor should handle batch size. - let reader = builder.with_batch_size(65_536).build()?; - - let dtype = DType::from_arrow(reader.schema()); - let ctx = compress_ctx(); - - let chunks = reader - .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| { - let vortex_array = record_batch.into_array(); - ctx.compress(&vortex_array, None).unwrap() - }) - .collect_vec(); - let chunked = ChunkedArray::new(chunks, dtype.clone()); - - let mut write_ctx = WriteCtx::new(write); - write_ctx.dtype(&dtype).unwrap(); - write_ctx.write(&chunked) + compress_vortex(&taxi_data_parquet(), write) }) .unwrap() } diff --git a/vortex-dict/src/compute.rs b/vortex-dict/src/compute.rs index 8b4cde4aab..bcaa9341f5 100644 --- a/vortex-dict/src/compute.rs +++ b/vortex-dict/src/compute.rs @@ -38,6 +38,9 @@ impl ScalarAtFn for DictArray { impl TakeFn for DictArray { fn take(&self, indices: &dyn Array) -> VortexResult { let codes = take(self.codes(), indices)?; + // TODO(ngates): we could wrap this back up as a DictArray with the same dictionary. + // But we may later want to run some compaction function to ensure all values in the + // dictionary are actually used. take(self.values(), &codes) } }