Skip to content

Commit

Permalink
Remove ViewContext and assign stable ids to encodings
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Jul 10, 2024
1 parent e46e868 commit 26e0166
Show file tree
Hide file tree
Showing 39 changed files with 136 additions and 369 deletions.
9 changes: 3 additions & 6 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::{IntoArray, ToArrayData, ViewContext};
use vortex::{IntoArray, ToArrayData};
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_ipc::io::TokioAdapter;
use vortex_ipc::writer::ArrayWriter;

use crate::idempotent;
use crate::reader::BATCH_SIZE;
use crate::{idempotent, CTX};

pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf {
idempotent(&fname, |path| {
Expand Down Expand Up @@ -57,10 +57,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
.unwrap()
.block_on(async move {
let write = tokio::fs::File::create(path).await.unwrap();
ArrayWriter::new(TokioAdapter(write), ViewContext::from(&CTX.clone()))
.write_context()
.await
.unwrap()
ArrayWriter::new(TokioAdapter(write))
.write_array(array)
.await
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::HashSet;
use std::env::temp_dir;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow_array::RecordBatchReader;
use humansize::DECIMAL;
Expand Down Expand Up @@ -48,7 +49,7 @@ pub mod taxi_data;
pub mod vortex_utils;

lazy_static! {
pub static ref CTX: Context = Context::default().with_encodings([
pub static ref CTX: Arc<Context> = Arc::new(Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&DictEncoding,
&BitPackedEncoding,
Expand All @@ -60,7 +61,7 @@ lazy_static! {
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]);
]));
}

lazy_static! {
Expand Down
30 changes: 12 additions & 18 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;

use arrow_array::types::Int64Type;
use arrow_array::{
ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch,
RecordBatchReader,
};
use arrow_array::types::Int64Type;
use arrow_select::concat::concat_batches;
use arrow_select::take::take_record_batch;
use bytes::{Bytes, BytesMut};
Expand All @@ -22,20 +22,21 @@ use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use serde::{Deserialize, Serialize};
use stream::StreamExt;

use vortex::{Array, IntoArray, IntoCanonical, ToArrayData};
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, IntoArray, IntoCanonical, ToArrayData, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_ipc::chunked_reader::ChunkedArrayReader;
use vortex_ipc::io::ObjectStoreExt;
use vortex_ipc::io::{TokioAdapter, VortexReadAt, VortexWrite};
use vortex_ipc::writer::ArrayWriter;
use vortex_ipc::io::ObjectStoreExt;
use vortex_ipc::MessageReader;
use vortex_ipc::writer::ArrayWriter;
use vortex_sampling_compressor::SamplingCompressor;

use crate::{COMPRESSORS, CTX};
Expand All @@ -46,13 +47,13 @@ pub const BATCH_SIZE: usize = 65_536;
pub struct VortexFooter {
pub byte_offsets: Vec<u64>,
pub row_offsets: Vec<u64>,
pub view_context_dtype_range: Range<u64>,
pub dtype_range: Range<u64>,
}

pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(&CTX)
msgs.array_stream_from_messages(CTX.clone())
.await
.unwrap()
.collect_chunked()
Expand All @@ -66,20 +67,17 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

let written = ArrayWriter::new(write, ViewContext::from(&CTX.clone()))
.write_context()
.await?
let written = ArrayWriter::new(write)
.write_array_stream(chunked.array_stream())
.await?;

let view_ctx_range = written.view_context_range().unwrap();
let layout = written.array_layouts()[0].clone();
let mut w = written.into_inner();
let mut s = flexbuffers::FlexbufferSerializer::new();
VortexFooter {
byte_offsets: layout.chunks.byte_offsets,
row_offsets: layout.chunks.row_offsets,
view_context_dtype_range: view_ctx_range.begin..layout.dtype.end,
dtype_range: layout.dtype.begin..layout.dtype.end,
}
.serialize(&mut s)?;
let footer_bytes = Buffer::Bytes(Bytes::from(s.take_buffer()));
Expand Down Expand Up @@ -147,20 +145,16 @@ pub async fn read_vortex_footer_format<R: VortexReadAt>(
flexbuffers::Reader::get_root(buf.as_ref()).map_err(|e| vortex_err!("{}", e))?,
)?;

let header_len =
(footer.view_context_dtype_range.end - footer.view_context_dtype_range.start) as usize;
let header_len = (footer.dtype_range.end - footer.dtype_range.start) as usize;
buf.reserve(header_len - buf.len());
unsafe { buf.set_len(header_len) }
buf = reader
.read_at_into(footer.view_context_dtype_range.start, buf)
.await?;
buf = reader.read_at_into(footer.dtype_range.start, buf).await?;
let mut header_reader = MessageReader::try_new(buf).await?;
let view_ctx = header_reader.read_view_context(&CTX).await?;
let dtype = header_reader.read_dtype().await?;

ChunkedArrayReader::try_new(
reader,
view_ctx,
CTX.clone(),
dtype,
PrimitiveArray::from(footer.byte_offsets).into_array(),
PrimitiveArray::from(footer.row_offsets).into_array(),
Expand Down
2 changes: 1 addition & 1 deletion encodings/alp/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_error::vortex_bail;
use crate::alp::Exponents;
use crate::compress::{alp_encode, decompress};

impl_encoding!("vortex.alp", ALP);
impl_encoding!("vortex.alp", 13u16, ALP);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ALPMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/byte_bool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_buffer::Buffer;
mod compute;
mod stats;

impl_encoding!("vortex.byte_bool", ByteBool);
impl_encoding!("vortex.byte_bool", 12u16, ByteBool);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ByteBoolMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/datetime-parts/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vortex_error::vortex_bail;

use crate::compute::decode_to_localdatetime;

impl_encoding!("vortex.datetimeparts", DateTimeParts);
impl_encoding!("vortex.datetimeparts", 20u16, DateTimeParts);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DateTimePartsMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/dict/src/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vortex::{impl_encoding, ArrayDType, Canonical, IntoCanonical};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::vortex_bail;

impl_encoding!("vortex.dict", Dict);
impl_encoding!("vortex.dict", 20u16, Dict);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DictMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vortex_error::{vortex_bail, vortex_err};
mod compress;
mod compute;

impl_encoding!("fastlanes.bitpacked", BitPacked);
impl_encoding!("fastlanes.bitpacked", 14u16, BitPacked);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BitPackedMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/delta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vortex_error::vortex_bail;
mod compress;
mod compute;

impl_encoding!("fastlanes.delta", Delta);
impl_encoding!("fastlanes.delta", 16u16, Delta);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/for/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vortex_scalar::Scalar;
mod compress;
mod compute;

impl_encoding!("fastlanes.for", FoR);
impl_encoding!("fastlanes.for", 15u16, FoR);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FoRMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/roaring/src/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vortex_error::{vortex_bail, vortex_err};
mod compress;
mod compute;

impl_encoding!("vortex.roaring_bool", RoaringBool);
impl_encoding!("vortex.roaring_bool", 17u16, RoaringBool);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoaringBoolMetadata {
Expand Down
2 changes: 1 addition & 1 deletion encodings/roaring/src/integer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_error::{vortex_bail, vortex_err};
mod compress;
mod compute;

impl_encoding!("vortex.roaring_int", RoaringInt);
impl_encoding!("vortex.roaring_int", 18u16, RoaringInt);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoaringIntMetadata {
Expand Down
7 changes: 4 additions & 3 deletions encodings/runend/src/runend.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use serde::{Deserialize, Serialize};

use vortex::{ArrayDType, Canonical, impl_encoding, IntoArrayVariant, IntoCanonical};
use vortex::array::primitive::{Primitive, PrimitiveArray};
use vortex::compute::search_sorted::{search_sorted, SearchSortedSide};
use vortex::compute::unary::scalar_at::scalar_at;
use vortex::stats::{ArrayStatistics, ArrayStatisticsCompute};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{impl_encoding, ArrayDType, Canonical, IntoArrayVariant, IntoCanonical};
use vortex_error::vortex_bail;

use crate::compress::{runend_decode, runend_encode};

impl_encoding!("vortex.runend", RunEnd);
impl_encoding!("vortex.runend", 19u16, RunEnd);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunEndMetadata {
Expand Down Expand Up @@ -138,10 +139,10 @@ impl ArrayTrait for RunEndArray {

#[cfg(test)]
mod test {
use vortex::{ArrayDType, ArrayTrait, IntoArray, IntoCanonical};
use vortex::compute::slice::slice;
use vortex::compute::unary::scalar_at::scalar_at;
use vortex::validity::Validity;
use vortex::{ArrayDType, ArrayTrait, IntoArray, IntoCanonical};
use vortex_dtype::{DType, Nullability, PType};

use crate::RunEndArray;
Expand Down
2 changes: 1 addition & 1 deletion encodings/zigzag/src/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vortex_error::{vortex_bail, vortex_err};

use crate::compress::zigzag_encode;

impl_encoding!("vortex.zigzag", ZigZag);
impl_encoding!("vortex.zigzag", 21u16, ZigZag);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZigZagMetadata;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod accessors;
mod compute;
mod stats;

impl_encoding!("vortex.bool", Bool);
impl_encoding!("vortex.bool", 2u16, Bool);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BoolMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod canonical;
mod compute;
mod stats;

impl_encoding!("vortex.chunked", Chunked);
impl_encoding!("vortex.chunked", 11u16, Chunked);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChunkedMetadata;
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use crate::impl_encoding;
use crate::stats::Stat;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};

mod canonical;
mod compute;
mod stats;

impl_encoding!("vortex.constant", Constant);
impl_encoding!("vortex.constant", 10u16, Constant);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConstantMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{impl_encoding, ArrayDType, Canonical, IntoCanonical};

mod compute;

impl_encoding!("vortex.ext", Extension);
impl_encoding!("vortex.ext", 16u16, Extension);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtensionMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/null/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{impl_encoding, Canonical, IntoCanonical};

mod compute;

impl_encoding!("vortex.null", Null);
impl_encoding!("vortex.null", 1u16, Null);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NullMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod accessor;
mod compute;
mod stats;

impl_encoding!("vortex.primitive", Primitive);
impl_encoding!("vortex.primitive", 3u16, Primitive);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PrimitiveMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{impl_encoding, ArrayDType, IntoCanonical};
mod compute;
mod flatten;

impl_encoding!("vortex.sparse", Sparse);
impl_encoding!("vortex.sparse", 9u16, Sparse);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SparseMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{Canonical, IntoCanonical};

mod compute;

impl_encoding!("vortex.struct", Struct);
impl_encoding!("vortex.struct", 8u16, Struct);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StructMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod compute;
mod flatten;
mod stats;

impl_encoding!("vortex.varbin", VarBin);
impl_encoding!("vortex.varbin", 4u16, VarBin);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VarBinMetadata {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Debug for BinaryView {

pub const VIEW_SIZE: usize = mem::size_of::<BinaryView>();

impl_encoding!("vortex.varbinview", VarBinView);
impl_encoding!("vortex.varbinview", 5u16, VarBinView);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VarBinViewMetadata {
Expand Down
Loading

0 comments on commit 26e0166

Please sign in to comment.