Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vortex compressed benchmarks #577

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fs;
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -12,8 +13,9 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use tokio::fs::OpenOptions;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
Expand Down Expand Up @@ -194,12 +196,16 @@ async fn register_vortex_file(
schema: &Schema,
enable_compression: bool,
) -> anyhow::Result<()> {
let path = if enable_compression {
file.with_extension("").with_extension("vtxcmp")
let vortex_dir = file.parent().unwrap().join(if enable_compression {
"vortex_compressed"
} else {
file.with_extension("").with_extension("vtxucmp")
};
let vtx_file = idempotent_async(&path, |vtx_file| async move {
"vortex_uncompressed"
});
create_dir_all(&vortex_dir)?;
let output_file = &vortex_dir
.join(file.file_name().unwrap())
.with_extension("vxf");
let vtx_file = idempotent_async(output_file, |vtx_file| async move {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
Expand Down Expand Up @@ -276,6 +282,12 @@ async fn register_vortex_file(
})
.await?;

let ctx = if enable_compression {
Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings()))
} else {
Arc::new(Context::default())
};

let f = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -294,6 +306,7 @@ async fn register_vortex_file(
vtx_file.to_str().unwrap().to_string(),
file_size,
)],
ctx,
),
)?;

Expand Down
8 changes: 6 additions & 2 deletions encodings/zigzag/src/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::variants::{ArrayVariants, PrimitiveArrayTrait};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{
impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoArray, IntoCanonical,
impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoArray, IntoArrayVariant,
IntoCanonical,
};
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::compress::zigzag_encode;
use crate::zigzag_decode;

impl_encoding!("vortex.zigzag", 21u16, ZigZag);

Expand Down Expand Up @@ -83,6 +85,8 @@ impl ArrayStatisticsCompute for ZigZagArray {}

impl IntoCanonical for ZigZagArray {
fn into_canonical(self) -> VortexResult<Canonical> {
todo!("ZigZagArray::flatten")
Ok(Canonical::Primitive(zigzag_decode(
&self.encoded().into_primitive()?,
)))
}
}
5 changes: 5 additions & 0 deletions vortex-array/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::collections::HashSet;

use vortex_error::VortexResult;

use crate::encoding::EncodingRef;
use crate::Array;

pub trait CompressionStrategy {
fn compress(&self, array: &Array) -> VortexResult<Array>;

fn used_encodings(&self) -> HashSet<EncodingRef>;
}

/// Check that compression did not alter the length of the validity array.
Expand Down
3 changes: 2 additions & 1 deletion vortex-datafusion/examples/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::fs::OpenOptions;
use url::Url;
use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray};
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex::{Context, IntoArray};
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::persistent::provider::VortexFileTableProvider;
use vortex_serde::layouts::writer::LayoutWriter;
Expand Down Expand Up @@ -66,6 +66,7 @@ async fn main() -> anyhow::Result<()> {
Field::new("numbers", DataType::UInt32, false),
])),
vec![VortexFile::new(p, file_size)],
Arc::new(Context::default()),
);

let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?);
Expand Down
7 changes: 6 additions & 1 deletion vortex-datafusion/src/persistent/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::sync::Arc;

use arrow_schema::SchemaRef;
use chrono::TimeZone as _;
use datafusion::datasource::listing::PartitionedFile;
use object_store::path::Path;
use object_store::ObjectMeta;
use vortex::Context;

#[derive(Clone)]
pub struct VortexFile {
Expand Down Expand Up @@ -33,13 +36,15 @@ impl VortexFile {
pub struct VortexTableOptions {
pub(crate) data_files: Vec<VortexFile>,
pub(crate) schema: Option<SchemaRef>,
pub(crate) ctx: Arc<Context>,
}

impl VortexTableOptions {
pub fn new(schema: SchemaRef, data_files: Vec<VortexFile>) -> Self {
pub fn new(schema: SchemaRef, data_files: Vec<VortexFile>, ctx: Arc<Context>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

99% of the time (100% of the time right now) you're gonna just be using the default codecs.

maybe it'd make more sense to have an extra_encodings config rather than requiring clients to build a whole context just to pass defaults

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with you. I am not happy with the flow here. I think there's probably just a default that has to have erverything for now.

Self {
data_files,
schema: Some(schema),
ctx,
}
}
}
5 changes: 5 additions & 0 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use vortex::Context;

use crate::persistent::opener::VortexFileOpener;

Expand All @@ -18,6 +19,7 @@ pub struct VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
plan_properties: PlanProperties,
ctx: Arc<Context>,
}

impl VortexExec {
Expand All @@ -26,6 +28,7 @@ impl VortexExec {
metrics: ExecutionPlanMetricsSet,
projection: Option<&Vec<usize>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: Arc<Context>,
) -> DFResult<Self> {
let projected_schema = project_schema(&file_scan_config.file_schema, projection)?;
let plan_properties = PlanProperties::new(
Expand All @@ -39,6 +42,7 @@ impl VortexExec {
metrics,
predicate,
plan_properties,
ctx,
})
}
pub(crate) fn into_arc(self) -> Arc<dyn ExecutionPlan> {
Expand Down Expand Up @@ -88,6 +92,7 @@ impl ExecutionPlan for VortexExec {
.runtime_env()
.object_store(&self.file_scan_config.object_store_url)?;
let opener = VortexFileOpener {
ctx: self.ctx.clone(),
object_store,
projection: self.file_scan_config.projection.clone(),
batch_size: None,
Expand Down
9 changes: 7 additions & 2 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use datafusion_common::Result as DFResult;
use datafusion_physical_expr::PhysicalExpr;
use futures::{FutureExt as _, TryStreamExt};
use object_store::ObjectStore;
use vortex::Context;
use vortex_serde::io::ObjectStoreReadAt;
use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder;
use vortex_serde::layouts::reader::context::LayoutDeserializer;
use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer};
use vortex_serde::layouts::reader::projections::Projection;

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
pub object_store: Arc<dyn ObjectStore>,
pub batch_size: Option<usize>,
pub projection: Option<Vec<usize>>,
Expand All @@ -23,7 +25,10 @@ impl FileOpener for VortexFileOpener {
let read_at =
ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone());

let mut builder = VortexLayoutReaderBuilder::new(read_at, LayoutDeserializer::default());
let mut builder = VortexLayoutReaderBuilder::new(
read_at,
LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())),
);

if let Some(batch_size) = self.batch_size {
builder = builder.with_batch_size(batch_size);
Expand Down
10 changes: 8 additions & 2 deletions vortex-datafusion/src/persistent/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ impl TableProvider for VortexFileTableProvider {
)
.with_projection(projection.cloned());

let exec =
VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc();
let exec = VortexExec::try_new(
file_scan_config,
metrics,
projection,
predicate,
self.config.ctx.clone(),
)?
.into_arc();

Ok(exec)
}
Expand Down
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_alp::{alp_encode_components, match_each_alp_float_ptype, ALPArray, ALP};
use vortex_alp::{alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALP};
use vortex_dtype::PType;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -70,4 +73,8 @@ impl EncodingCompressor for ALPCompressor {
)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&ALPEncoding as EncodingRef])
}
}
8 changes: 8 additions & 0 deletions vortex-sampling-compressor/src/compressors/bitpacked.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::{vortex_err, VortexResult};
use vortex_fastlanes::{
bitpack, bitpack_patches, count_exceptions, find_best_bit_width, BitPacked, BitPackedArray,
BitPackedEncoding,
};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
Expand Down Expand Up @@ -83,4 +87,8 @@ impl EncodingCompressor for BitPackedCompressor {
)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&BitPackedEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/constant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use vortex::array::{Constant, ConstantArray};
use std::collections::HashSet;

use vortex::array::{Constant, ConstantArray, ConstantEncoding};
use vortex::compute::unary::scalar_at;
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;
Expand Down Expand Up @@ -31,4 +34,8 @@ impl EncodingCompressor for ConstantCompressor {
Some(CompressionTree::flat(self)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&ConstantEncoding as EncodingRef])
}
}
11 changes: 10 additions & 1 deletion vortex-sampling-compressor/src/compressors/date_time_parts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::collections::HashSet;

use vortex::array::temporal::TemporalMetadata;
use vortex::array::TemporalArray;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDType, ArrayDef, IntoArray};
use vortex_datetime_parts::{compress_temporal, DateTimeParts, DateTimePartsArray};
use vortex_datetime_parts::{
compress_temporal, DateTimeParts, DateTimePartsArray, DateTimePartsEncoding,
};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
Expand Down Expand Up @@ -58,4 +63,8 @@ impl EncodingCompressor for DateTimePartsCompressor {
)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&DateTimePartsEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;
use vortex_fastlanes::{delta_compress, Delta, DeltaArray};
use vortex_fastlanes::{delta_compress, Delta, DeltaArray, DeltaEncoding};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
Expand Down Expand Up @@ -51,4 +54,8 @@ impl EncodingCompressor for DeltaCompressor {
Some(CompressionTree::new(self, vec![bases.path, deltas.path])),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&DeltaEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashSet;

use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray};
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray};
use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
Expand Down Expand Up @@ -69,4 +72,8 @@ impl EncodingCompressor for DictCompressor {
Some(CompressionTree::new(self, vec![codes.path, values.path])),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&DictEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::{trailing_zeros, ArrayStatistics};
use vortex::validity::ArrayValidity;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_fastlanes::{for_compress, FoR, FoRArray};
use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
Expand Down Expand Up @@ -60,4 +63,8 @@ impl EncodingCompressor for FoRCompressor {
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&FoREncoding as EncodingRef])
}
}
Loading
Loading