Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Aug 8, 2024
1 parent 7c827cc commit 55f52f6
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 28 deletions.
6 changes: 3 additions & 3 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use datafusion::prelude::{col, DataFrame, SessionContext};
use lazy_static::lazy_static;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_datafusion::{VortexMemTable, VortexMemTableOptions};
use vortex::{Array, Context};
use vortex_datafusion::memory::{VortexMemTable, VortexMemTableOptions};
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
Expand Down Expand Up @@ -81,7 +81,7 @@ fn toy_dataset_arrow() -> RecordBatch {
}

fn toy_dataset_vortex(compress: bool) -> Array {
let uncompressed = toy_dataset_arrow().to_array_data().into_array();
let uncompressed = toy_dataset_arrow().into();

if !compress {
return uncompressed;
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mimalloc::MiMalloc;
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use tokio::runtime::Runtime;

#[global_allocator]
Expand All @@ -31,7 +32,7 @@ fn random_access_vortex(c: &mut Criterion) {
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) })
});

let local_fs = LocalFileSystem::new();
let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
group.bench_function("localfs", |b| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
Expand All @@ -43,7 +44,7 @@ fn random_access_vortex(c: &mut Criterion) {
})
});

let r2_fs = AmazonS3Builder::from_env().build().unwrap();
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use bzip2::read::BzDecoder;
use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::{IntoArray, ToArrayData};
use vortex::{Array, IntoArray};
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_serde::io::TokioAdapter;
Expand Down Expand Up @@ -46,7 +46,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
let array = ChunkedArray::try_new(
reader
.into_iter()
.map(|batch_result| batch_result.unwrap().to_array_data().into_array())
.map(|batch_result| Array::from(batch_result.unwrap()))
.collect(),
dtype,
)
Expand Down
12 changes: 6 additions & 6 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use log::{info, LevelFilter};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::chunked::ChunkedArray;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex::{Array, Context, IntoArray};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
Expand Down Expand Up @@ -188,7 +188,7 @@ pub fn compress_taxi_data() -> Array {
let chunks = reader
.into_iter()
.map(|batch_result| batch_result.unwrap())
.map(|batch| batch.to_array_data().into_array())
.map(Array::from)
.map(|array| {
uncompressed_size += array.nbytes();
compressor.compress(&array).unwrap()
Expand Down Expand Up @@ -262,7 +262,7 @@ mod test {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::{ArrayData, IntoArray, IntoCanonical};
use vortex::{Array, IntoCanonical};
use vortex_sampling_compressor::SamplingCompressor;

use crate::taxi_data::taxi_data_parquet;
Expand All @@ -285,7 +285,7 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();
let vortex_array = Array::from_arrow(arrow_array.clone(), false);
let vortex_as_arrow = vortex_array.into_canonical().unwrap().into_arrow();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
Expand All @@ -304,7 +304,7 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();
let vortex_array = Array::from_arrow(arrow_array.clone(), false);

let compressed = compressor.compress(&vortex_array).unwrap();
let compressed_as_arrow = compressed.into_canonical().unwrap().into_arrow();
Expand Down
15 changes: 7 additions & 8 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use serde::{Deserialize, Serialize};
use stream::StreamExt;
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{ChunkedArray, PrimitiveArray};
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, IntoArray, IntoCanonical, ToArrayData};
use vortex::{Array, IntoArray, IntoCanonical};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
Expand Down Expand Up @@ -99,7 +98,7 @@ pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedAr
let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| {
let vortex_array = record_batch.to_array_data().into_array();
let vortex_array = Array::from(record_batch);
compressor.compress(&vortex_array).unwrap()
})
.collect_vec();
Expand Down Expand Up @@ -159,8 +158,8 @@ pub async fn read_vortex_footer_format<R: VortexReadAt>(
)
}

pub async fn take_vortex_object_store<O: ObjectStore>(
fs: &O,
pub async fn take_vortex_object_store(
fs: &Arc<dyn ObjectStore>,
path: &object_store::path::Path,
indices: &[u64],
) -> VortexResult<Array> {
Expand All @@ -171,7 +170,7 @@ pub async fn take_vortex_object_store<O: ObjectStore>(
.take_rows(&indices_array)
.await?;
// For equivalence.... we flatten to make sure we're not cheating too much.
Ok(taken.into_canonical()?.into_array())
Ok(taken.into_canonical()?.into())
}

pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Array> {
Expand All @@ -182,7 +181,7 @@ pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Arr
.take_rows(&indices_array)
.await?;
// For equivalence.... we flatten to make sure we're not cheating too much.
Ok(taken.into_canonical()?.into_array())
Ok(taken.into_canonical()?.into())
}

pub async fn take_parquet_object_store(
Expand Down
9 changes: 5 additions & 4 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use arrow_schema::Schema;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray};
use vortex_datafusion::{SessionContextExt, VortexMemTableOptions};
use vortex::{Array, ArrayDType, IntoArray};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::SessionContextExt;

use crate::idempotent_async;

Expand Down Expand Up @@ -192,7 +193,7 @@ async fn register_vortex(
.iter()
.cloned()
.map(StructArray::from)
.map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array())
.map(|struct_array| Array::from_arrow(&struct_array, false))
.collect();

let dtype = chunks[0].dtype().clone();
Expand Down
3 changes: 1 addition & 2 deletions bench-vortex/src/vortex_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::fs::File;
use std::os::unix::prelude::MetadataExt;
use std::path::PathBuf;

use vortex::array::chunked::ChunkedArray;
use vortex::array::struct_::StructArray;
use vortex::array::{ChunkedArray, StructArray};
use vortex::variants::StructArrayTrait;
use vortex::ArrayDType;
use vortex_dtype::DType;
Expand Down

0 comments on commit 55f52f6

Please sign in to comment.