Skip to content

Commit

Permalink
Benchmark random access against R2 (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Jul 10, 2024
1 parent 7fee1c0 commit e46e868
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 19 deletions.
95 changes: 95 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
mimalloc = { workspace = true }
object_store = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parquet = { workspace = true, features = [] }
rand = { workspace = true }
reqwest = { workspace = true }
Expand Down
71 changes: 60 additions & 11 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,88 @@
use bench_vortex::reader::{take_parquet, take_vortex_object_store, take_vortex_tokio};
use std::sync::Arc;

use bench_vortex::reader::{
take_parquet, take_parquet_object_store, take_vortex_object_store, take_vortex_tokio,
};
use bench_vortex::taxi_data::{taxi_data_parquet, taxi_data_vortex};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mimalloc::MiMalloc;
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use tokio::runtime::Runtime;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

fn random_access(c: &mut Criterion) {
let mut group = c.benchmark_group("random access");
const INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000];

let indices = [10, 11, 12, 13, 100_000, 3_000_000];
/// Benchmarks against object stores require setting
/// * AWS_ACCESS_KEY_ID
/// * AWS_SECRET_ACCESS_KEY
/// * AWS_BUCKET
/// * AWS_ENDPOINT
///
/// environment variables and assume files to read are already present
fn random_access_vortex(c: &mut Criterion) {
let mut group = c.benchmark_group("vortex");

let taxi_vortex = taxi_data_vortex();
group.bench_function("vortex tokio", |b| {
group.bench_function("tokio local disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &indices).await.unwrap()) })
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) })
});

let local_fs = LocalFileSystem::new();
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
group.bench_function("localfs", |b| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&local_fs, &local_fs_path, &INDICES)
.await
.unwrap(),
)
})
});

group.bench_function("vortex object_store", |b| {
let r2_fs = AmazonS3Builder::from_env().build().unwrap();
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
group.sample_size(10).bench_function("R2", |b| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&taxi_vortex, &indices)
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
.await
.unwrap(),
)
})
});
}

fn random_access_parquet(c: &mut Criterion) {
let mut group = c.benchmark_group("parquet");
group.sample_size(10);

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let taxi_parquet = taxi_data_parquet();
group.sample_size(10).bench_function("parquet", |b| {
group.bench_function("tokio local disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_parquet(&taxi_parquet, &indices).await.unwrap()) })
.iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) })
});

let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();
group.bench_function("R2", |b| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES)
.await
.unwrap(),
)
})
});
}

criterion_group!(benches, random_access);
criterion_group!(benches, random_access_vortex, random_access_parquet);
criterion_main!(benches);
33 changes: 26 additions & 7 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use bytes::{Bytes, BytesMut};
use futures::stream;
use itertools::Itertools;
use log::info;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use serde::{Deserialize, Serialize};
use stream::StreamExt;
Expand Down Expand Up @@ -167,12 +167,14 @@ pub async fn read_vortex_footer_format<R: VortexReadAt>(
)
}

pub async fn take_vortex_object_store(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let fs = LocalFileSystem::new();
let ob_path = object_store::path::Path::from_filesystem_path(path).unwrap();
let head = fs.head(&ob_path).await?;
pub async fn take_vortex_object_store<O: ObjectStore>(
fs: &O,
path: &object_store::path::Path,
indices: &[u64],
) -> VortexResult<Array> {
let head = fs.head(path).await?;
let indices_array = indices.to_vec().into_array();
let taken = read_vortex_footer_format(fs.vortex_reader(&ob_path), head.size as u64)
let taken = read_vortex_footer_format(fs.vortex_reader(path), head.size as u64)
.await?
.take_rows(&indices_array)
.await?;
Expand All @@ -191,14 +193,31 @@ pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Arr
Ok(taken.into_canonical()?.into_array())
}

pub async fn take_parquet_object_store(
fs: Arc<dyn ObjectStore>,
path: &object_store::path::Path,
indices: &[u64],
) -> VortexResult<RecordBatch> {
let meta = fs.head(path).await?;
let reader = ParquetObjectReader::new(fs, meta);
parquet_take_from_stream(reader, indices).await
}

pub async fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
let file = tokio::fs::File::open(path).await?;
parquet_take_from_stream(file, indices).await
}

async fn parquet_take_from_stream<T: AsyncFileReader + Unpin + Send + 'static>(
async_reader: T,
indices: &[u64],
) -> VortexResult<RecordBatch> {
let builder = ParquetRecordBatchStreamBuilder::new_with_options(
file,
async_reader,
ArrowReaderOptions::new().with_page_index(true),
)
.await?;

// We figure out which row groups we need to read and a selection filter for each of them.
let mut row_groups = HashMap::new();
let mut row_group_offsets = vec![0];
Expand Down

0 comments on commit e46e868

Please sign in to comment.