diff --git a/Cargo.lock b/Cargo.lock index 5113341d82..cdb953449e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -636,6 +636,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.52.5", ] @@ -1698,6 +1699,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -2319,13 +2321,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" dependencies = [ "async-trait", + "base64", "bytes", "chrono", "futures", "humantime", + "hyper", "itertools 0.12.1", + "md-5", "parking_lot", "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring", + "serde", + "serde_json", "snafu", "tokio", "tracing", @@ -2801,6 +2812,63 @@ dependencies = [ "vortex-zigzag", ] +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash", + "rustls", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -2932,7 +3000,11 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", + "rustls-native-certs", "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", @@ -2940,10 +3012,13 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] @@ -2969,6 +3044,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.0" @@ -2998,12 +3079,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" dependencies = [ "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index e3eb4d2afc..e0c60a9ed4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -30,7 +30,7 @@ itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } mimalloc = { workspace = true } -object_store = { workspace = true } +object_store = { workspace = true, features = ["aws"] } parquet = { workspace = true, features = [] } rand = { workspace = true } reqwest = { workspace = true } diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 8f4d196043..7d856ecc26 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -1,39 +1,88 @@ -use bench_vortex::reader::{take_parquet, take_vortex_object_store, take_vortex_tokio}; +use std::sync::Arc; + +use bench_vortex::reader::{ + take_parquet, take_parquet_object_store, take_vortex_object_store, take_vortex_tokio, +}; use bench_vortex::taxi_data::{taxi_data_parquet, taxi_data_vortex}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use mimalloc::MiMalloc; +use object_store::aws::AmazonS3Builder; +use object_store::local::LocalFileSystem; use tokio::runtime::Runtime; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; -fn random_access(c: &mut Criterion) { - let mut group = c.benchmark_group("random access"); +const INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; - let indices = [10, 11, 12, 13, 100_000, 3_000_000]; +/// Benchmarks against object stores require setting +/// * AWS_ACCESS_KEY_ID +/// * AWS_SECRET_ACCESS_KEY +/// * AWS_BUCKET +/// * AWS_ENDPOINT +/// +/// environment variables and assume files to read are already present +fn random_access_vortex(c: &mut Criterion) { + let mut group = c.benchmark_group("vortex"); let taxi_vortex = taxi_data_vortex(); - group.bench_function("vortex tokio", |b| { + group.bench_function("tokio local disk", |b| { b.to_async(Runtime::new().unwrap()) - .iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &indices).await.unwrap()) }) + .iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) }) + }); + + let local_fs = LocalFileSystem::new(); + let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap(); + group.bench_function("localfs", |b| { + b.to_async(Runtime::new().unwrap()).iter(|| async { + black_box( + take_vortex_object_store(&local_fs, &local_fs_path, &INDICES) + .await + .unwrap(), + ) + }) }); - group.bench_function("vortex object_store", |b| { + let r2_fs = AmazonS3Builder::from_env().build().unwrap(); + let r2_path = + object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap()) + .unwrap(); + group.sample_size(10).bench_function("R2", |b| { b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( - take_vortex_object_store(&taxi_vortex, &indices) + take_vortex_object_store(&r2_fs, &r2_path, &INDICES) .await .unwrap(), ) }) }); +} +fn random_access_parquet(c: &mut Criterion) { + let mut group = c.benchmark_group("parquet"); + group.sample_size(10); + + let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()); let taxi_parquet = taxi_data_parquet(); - group.sample_size(10).bench_function("parquet", |b| { + group.bench_function("tokio local disk", |b| { b.to_async(Runtime::new().unwrap()) - .iter(|| async { black_box(take_parquet(&taxi_parquet, &indices).await.unwrap()) }) + .iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) }) + }); + + let r2_parquet_path = object_store::path::Path::from_url_path( + taxi_parquet.file_name().unwrap().to_str().unwrap(), + ) + .unwrap(); + group.bench_function("R2", |b| { + b.to_async(Runtime::new().unwrap()).iter(|| async { + black_box( + take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES) + .await + .unwrap(), + ) + }) }); } -criterion_group!(benches, random_access); +criterion_group!(benches, random_access_vortex, random_access_parquet); criterion_main!(benches); diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 4716b02577..234de63fd5 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -16,9 +16,9 @@ use bytes::{Bytes, BytesMut}; use futures::stream; use itertools::Itertools; use log::info; -use object_store::local::LocalFileSystem; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::arrow::ParquetRecordBatchStreamBuilder; use serde::{Deserialize, Serialize}; use stream::StreamExt; @@ -167,12 +167,14 @@ pub async fn read_vortex_footer_format( ) } -pub async fn take_vortex_object_store(path: &Path, indices: &[u64]) -> VortexResult { - let fs = LocalFileSystem::new(); - let ob_path = object_store::path::Path::from_filesystem_path(path).unwrap(); - let head = fs.head(&ob_path).await?; +pub async fn take_vortex_object_store( + fs: &O, + path: &object_store::path::Path, + indices: &[u64], +) -> VortexResult { + let head = fs.head(path).await?; let indices_array = indices.to_vec().into_array(); - let taken = read_vortex_footer_format(fs.vortex_reader(&ob_path), head.size as u64) + let taken = read_vortex_footer_format(fs.vortex_reader(path), head.size as u64) .await? .take_rows(&indices_array) .await?; @@ -191,14 +193,31 @@ pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult, + path: &object_store::path::Path, + indices: &[u64], +) -> VortexResult { + let meta = fs.head(path).await?; + let reader = ParquetObjectReader::new(fs, meta); + parquet_take_from_stream(reader, indices).await +} + pub async fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult { let file = tokio::fs::File::open(path).await?; + parquet_take_from_stream(file, indices).await +} +async fn parquet_take_from_stream( + async_reader: T, + indices: &[u64], +) -> VortexResult { let builder = ParquetRecordBatchStreamBuilder::new_with_options( - file, + async_reader, ArrowReaderOptions::new().with_page_index(true), ) .await?; + // We figure out which row groups we need to read and a selection filter for each of them. let mut row_groups = HashMap::new(); let mut row_group_offsets = vec![0];