diff --git a/.github/workflows/bench-pr.yml b/.github/workflows/bench-pr.yml index 46846cd20b..0bdccd103e 100644 --- a/.github/workflows/bench-pr.yml +++ b/.github/workflows/bench-pr.yml @@ -10,6 +10,7 @@ permissions: actions: write contents: read pull-requests: write + id-token: write jobs: label_trigger: @@ -58,9 +59,9 @@ jobs: RUSTFLAGS: '-C target-cpu=native' run: | cargo install cargo-criterion - + cargo criterion --bench ${{ matrix.benchmark.id }} --message-format=json 2>&1 | tee out.json - + cat out.json sudo apt-get update && sudo apt-get install -y jq @@ -145,3 +146,52 @@ jobs: AWS_ENDPOINT: ${{ secrets.AWS_ENDPOINT }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + clickbench: + needs: label_trigger + runs-on: self-hosted + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/cleanup + - uses: ./.github/actions/setup-rust + - uses: spiraldb/actions/.github/actions/setup-uv@0.2.0 + + # The compression benchmarks rely on DuckDB being installed to convert CSV to Parquet + - name: Install DuckDB + uses: opt-nc/setup-duckdb-action@v1.0.9 + if: runner.environment != 'self-hosted' + with: + version: v1.0.0 + + - name: Set tempdir + if: runner.environment == 'self-hosted' + run: | + echo "TMPDIR=/work" >> $GITHUB_ENV + + - name: Run ClickBench benchmark + shell: bash + env: + BENCH_VORTEX_RATIOS: '.*' + RUSTFLAGS: '-C target-cpu=native' + HOME: /home/ci-runner + run: | + cargo run --bin clickbench --release -- -d gh-json | tee clickbench.json + - name: Store benchmark result + if: '!cancelled()' + uses: benchmark-action/github-action-benchmark@v1 + with: + name: 'Clickbench' + tool: 'customSmallerIsBetter' + gh-pages-branch: gh-pages-bench + github-token: ${{ secrets.GITHUB_TOKEN }} + output-file-path: clickbench.json + summary-always: true + comment-always: true + auto-push: false + save-data-file: false + fail-on-alert: false + env: + # AWS Credentials for R2 storage tests + AWS_BUCKET: vortex-test + AWS_ENDPOINT: ${{ secrets.AWS_ENDPOINT }} + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} \ No newline at end of file diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index b65c118ce8..4eea7566d3 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -128,4 +128,51 @@ jobs: AWS_ENDPOINT: ${{ secrets.AWS_ENDPOINT }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + clickbench: + runs-on: self-hosted + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/cleanup + - uses: ./.github/actions/setup-rust + - uses: spiraldb/actions/.github/actions/setup-uv@0.2.0 + + # The compression benchmarks rely on DuckDB being installed to convert CSV to Parquet + - name: Install DuckDB + uses: opt-nc/setup-duckdb-action@v1.0.9 + if: runner.environment != 'self-hosted' + with: + version: v1.0.0 + + - name: Set tempdir + if: runner.environment == 'self-hosted' + run: | + echo "TMPDIR=/work" >> $GITHUB_ENV + + - name: Run Clickbench benchmark + shell: bash + env: + BENCH_VORTEX_RATIOS: '.*' + RUSTFLAGS: '-C target-cpu=native' + HOME: /home/ci-runner + run: | + cargo run --bin clickbench --release -- -d gh-json | tee clickbench.json + - name: Store benchmark result + if: '!cancelled()' + uses: benchmark-action/github-action-benchmark@v1 + with: + name: 'Clickbench' + tool: 'customSmallerIsBetter' + gh-pages-branch: gh-pages-bench + github-token: ${{ secrets.GITHUB_TOKEN }} + output-file-path: clickbench.json + summary-always: true + auto-push: true + fail-on-alert: false + env: + # AWS Credentials for R2 storage tests + AWS_BUCKET: vortex-test + AWS_ENDPOINT: ${{ secrets.AWS_ENDPOINT }} + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3df78b52e1..6e75fe4e72 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -174,7 +174,7 @@ jobs: with: version: v1.0.0 - name: Rust Bench as test - run: cargo bench --bench '*[!noci]' --profile benchtest -- --test + run: cargo bench --bench '*[!noci|clickbench]' --profile benchtest -- --test generated-files: name: "Check generated proto/fbs files are up to date" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 104c840f2a..57f6f1d268 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -98,3 +98,8 @@ harness = false name = "compressor_throughput" test = false harness = false + +[[bench]] +name = "clickbench" +test = false +harness = false diff --git a/bench-vortex/benches/clickbench.rs b/bench-vortex/benches/clickbench.rs new file mode 100644 index 0000000000..34293e3867 --- /dev/null +++ b/bench-vortex/benches/clickbench.rs @@ -0,0 +1,69 @@ +#![feature(exit_status_error)] + +use std::path::PathBuf; +use std::process::Command; + +use bench_vortex::clickbench::{clickbench_queries, HITS_SCHEMA}; +use bench_vortex::{clickbench, execute_query, idempotent, IdempotentPath}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::prelude::SessionContext; +use tokio::runtime::Builder; + +fn benchmark(c: &mut Criterion) { + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let basepath = "clickbench".to_data_path(); + + // The clickbench-provided file is missing some higher-level type info, so we reprocess it + // to add that info, see https://github.com/ClickHouse/ClickBench/issues/7. + for idx in 0..100 { + let output_path = basepath.join(format!("hits_{idx}.parquet")); + idempotent(&output_path, |output_path| { + eprintln!("Fixing parquet file {idx}"); + let command = format!( + " + SET home_directory='/home/ci-runner/'; + INSTALL HTTPFS; + COPY (SELECT * REPLACE + (epoch_ms(EventTime * 1000) AS EventTime, \ + epoch_ms(ClientEventTime * 1000) AS ClientEventTime, \ + epoch_ms(LocalEventTime * 1000) AS LocalEventTime, \ + DATE '1970-01-01' + INTERVAL (EventDate) DAYS AS EventDate) \ + FROM read_parquet('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{idx}.parquet', binary_as_string=True)) TO '{}' (FORMAT 'parquet');", + output_path.to_str().unwrap() + ); + Command::new("duckdb") + .arg("-c") + .arg(command) + .status()? + .exit_ok()?; + + anyhow::Ok(PathBuf::from(output_path)) + }) + .unwrap(); + } + + let session_context = SessionContext::new(); + let context = session_context.clone(); + runtime.block_on(async move { + clickbench::register_vortex_file(&context, "hits", basepath.as_path(), &HITS_SCHEMA) + .await + .unwrap(); + }); + + let mut group = c.benchmark_group("clickbench"); + + for (idx, query) in clickbench_queries().into_iter() { + let context = session_context.clone(); + group.bench_function(format!("q-{:02}", idx), |b| { + b.to_async(&runtime) + .iter(|| async { execute_query(&context, &query).await.unwrap() }); + }); + } +} + +criterion_group!( + name = benches; + config = Criterion::default().sample_size(10); + targets = benchmark +); +criterion_main!(benches); diff --git a/bench-vortex/benches/tpch.rs b/bench-vortex/benches/tpch.rs index 0590a83fbe..0a68a0a025 100644 --- a/bench-vortex/benches/tpch.rs +++ b/bench-vortex/benches/tpch.rs @@ -1,5 +1,6 @@ use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; -use bench_vortex::tpch::{load_datasets, run_tpch_query, tpch_queries, Format}; +use bench_vortex::tpch::{load_datasets, run_tpch_query, tpch_queries}; +use bench_vortex::Format; use criterion::{criterion_group, criterion_main, Criterion}; use tokio::runtime::Builder; diff --git a/bench-vortex/clickbench_queries.sql b/bench-vortex/clickbench_queries.sql new file mode 100644 index 0000000000..a5f4eccb25 --- /dev/null +++ b/bench-vortex/clickbench_queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM hits; +SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0; +SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits; +SELECT AVG(UserID) FROM hits; +SELECT COUNT(DISTINCT UserID) FROM hits; +SELECT COUNT(DISTINCT SearchPhrase) FROM hits; +SELECT MIN(EventDate), MAX(EventDate) FROM hits; +SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; +SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; +SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10; +SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; +SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10; +SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID FROM hits WHERE UserID = 435090932899640449; +SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'; +SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; +SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits; +SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10; +SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; +SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; +SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000; \ No newline at end of file diff --git a/bench-vortex/src/bin/clickbench.rs b/bench-vortex/src/bin/clickbench.rs new file mode 100644 index 0000000000..3e8176da44 --- /dev/null +++ b/bench-vortex/src/bin/clickbench.rs @@ -0,0 +1,126 @@ +#![feature(exit_status_error)] + +use std::path::PathBuf; +use std::process::Command; +use std::time::{Duration, Instant}; + +use bench_vortex::clickbench::{self, clickbench_queries, HITS_SCHEMA}; +use bench_vortex::display::{print_measurements_json, render_table, DisplayFormat}; +use bench_vortex::{execute_query, idempotent, Format, IdempotentPath as _, Measurement}; +use clap::Parser; +use datafusion::prelude::SessionContext; +use indicatif::ProgressBar; +use itertools::Itertools; +use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; +use tokio::runtime::Builder; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + #[arg(short, long)] + threads: Option, + #[arg(short, long, default_value_t, value_enum)] + display_format: DisplayFormat, + #[arg(short, long, value_delimiter = ',')] + queries: Option>, +} + +fn main() { + let args = Args::parse(); + + let runtime = match args.threads { + Some(0) => panic!("Can't use 0 threads for runtime"), + Some(1) => Builder::new_current_thread().enable_all().build(), + Some(n) => Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build(), + None => Builder::new_multi_thread().enable_all().build(), + } + .expect("Failed building the Runtime"); + let basepath = "clickbench".to_data_path(); + + // The clickbench-provided file is missing some higher-level type info, so we reprocess it + // to add that info, see https://github.com/ClickHouse/ClickBench/issues/7. + (0_u32..100).into_par_iter().for_each(|idx| { + let output_path = basepath.join(format!("hits_{idx}.parquet")); + idempotent(&output_path, |output_path| { + eprintln!("Fixing parquet file {idx}"); + let command = format!( + " + SET home_directory='/home/ci-runner/'; + INSTALL HTTPFS; + COPY (SELECT * REPLACE + (epoch_ms(EventTime * 1000) AS EventTime, \ + epoch_ms(ClientEventTime * 1000) AS ClientEventTime, \ + epoch_ms(LocalEventTime * 1000) AS LocalEventTime, \ + DATE '1970-01-01' + INTERVAL (EventDate) DAYS AS EventDate) \ + FROM read_parquet('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{idx}.parquet', binary_as_string=True)) TO '{}' (FORMAT 'parquet');", + output_path.to_str().unwrap() + ); + Command::new("duckdb") + .arg("-c") + .arg(command) + .status()? + .exit_ok()?; + + anyhow::Ok(PathBuf::from(output_path)) + }) + .unwrap(); + }); + + let session_context = SessionContext::new(); + let context = session_context.clone(); + runtime.block_on(async { + clickbench::register_vortex_file(&context, "hits", basepath.as_path(), &HITS_SCHEMA) + .await + .unwrap(); + }); + + let mut all_measurements = Vec::default(); + + let queries = match args.queries { + None => clickbench_queries(), + Some(queries) => clickbench_queries() + .into_iter() + .filter(|(q_idx, _)| queries.iter().contains(q_idx)) + .collect(), + }; + + let progress_bar = ProgressBar::new(queries.len() as u64); + + for (query_idx, query) in queries.into_iter() { + let mut fastest_result = Duration::from_millis(u64::MAX); + for _ in 0..10 { + let exec_duration = runtime.block_on(async { + let start = Instant::now(); + execute_query(&context, &query).await.unwrap(); + start.elapsed() + }); + + fastest_result = fastest_result.min(exec_duration); + } + + progress_bar.inc(1); + + all_measurements.push(Measurement { + query_idx, + time: fastest_result, + format: Format::OnDiskVortex { + enable_compression: true, + }, + dataset: "clickbench".to_string(), + }); + } + + match args.display_format { + DisplayFormat::Table => render_table( + all_measurements, + &[Format::OnDiskVortex { + enable_compression: true, + }], + ) + .unwrap(), + DisplayFormat::GhJson => print_measurements_json(all_measurements).unwrap(), + } +} diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 0cc753dfe8..1c7e0866fb 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -1,22 +1,17 @@ use std::process::ExitCode; use std::sync; -use std::sync::mpsc::Receiver; -use std::time::{Duration, Instant}; +use std::time::Instant; -use bench_vortex::setup_logger; +use bench_vortex::display::{print_measurements_json, render_table, DisplayFormat}; use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; -use bench_vortex::tpch::{ - load_datasets, run_tpch_query, tpch_queries, Format, EXPECTED_ROW_COUNTS, -}; -use clap::{ArgAction, Parser, ValueEnum}; +use bench_vortex::tpch::{load_datasets, run_tpch_query, tpch_queries, EXPECTED_ROW_COUNTS}; +use bench_vortex::{setup_logger, Format, Measurement}; +use clap::{ArgAction, Parser}; use futures::future::try_join_all; use indicatif::ProgressBar; use itertools::Itertools; use log::LevelFilter; -use serde::Serialize; -use tabled::builder::Builder; -use tabled::settings::themes::Colorization; -use tabled::settings::{Color, Style}; +use tokio::runtime::Builder; use vortex::aliases::hash_map::HashMap; #[derive(Parser, Debug)] @@ -40,43 +35,6 @@ struct Args { display_format: DisplayFormat, } -#[derive(ValueEnum, Default, Clone, Debug)] -enum DisplayFormat { - #[default] - Table, - GhJson, -} - -#[derive(Clone)] -struct Measurement { - query_idx: usize, - time: Duration, - format: Format, -} - -#[derive(Serialize)] -struct JsonValue { - name: String, - unit: String, - value: u128, -} - -impl Measurement { - fn to_json(&self) -> JsonValue { - let name = format!( - "tpch_q{query_idx}/{format}", - format = self.format.name(), - query_idx = self.query_idx - ); - - JsonValue { - name, - unit: "ns".to_string(), - value: self.time.as_nanos(), - } - } -} - fn main() -> ExitCode { let args = Args::parse(); @@ -88,16 +46,12 @@ fn main() -> ExitCode { let runtime = match args.threads { Some(0) => panic!("Can't use 0 threads for runtime"), - Some(1) => tokio::runtime::Builder::new_current_thread() - .enable_all() - .build(), - Some(n) => tokio::runtime::Builder::new_multi_thread() + Some(1) => Builder::new_current_thread().enable_all().build(), + Some(n) => Builder::new_multi_thread() .worker_threads(n) .enable_all() .build(), - None => tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build(), + None => Builder::new_multi_thread().enable_all().build(), } .expect("Failed building the Runtime"); @@ -111,85 +65,6 @@ fn main() -> ExitCode { )) } -fn render_table(receiver: Receiver, formats: &[Format]) -> anyhow::Result<()> { - let mut measurements: HashMap> = HashMap::default(); - - while let Ok(m) = receiver.recv() { - measurements.entry(m.format).or_default().push(m); - } - - measurements.values_mut().for_each(|v| { - v.sort_by_key(|m| m.query_idx); - }); - - // The first format serves as the baseline - let baseline_format = &formats[0]; - let baseline = measurements[baseline_format].clone(); - - let mut table_builder = Builder::default(); - let mut colors = vec![]; - - let mut header = vec!["Query".to_string()]; - header.extend(formats.iter().map(|f| format!("{:?}", f))); - table_builder.push_record(header); - - for (query_idx, baseline_measure) in baseline.iter().enumerate().take(22) { - let query_baseline = baseline_measure.time.as_micros(); - let mut row = vec![(query_idx + 1).to_string()]; - for (col_idx, format) in formats.iter().enumerate() { - let time_us = measurements[format][query_idx].time.as_micros(); - - if format != baseline_format { - let color = color(query_baseline, time_us); - - colors.push(Colorization::exact( - vec![color], - (query_idx + 1, col_idx + 1), - )) - } - - let ratio = time_us as f64 / query_baseline as f64; - row.push(format!("{time_us} us ({ratio:.2})")); - } - table_builder.push_record(row); - } - - let mut table = table_builder.build(); - table.with(Style::modern()); - - for color in colors.into_iter() { - table.with(color); - } - - println!("{table}"); - - Ok(()) -} - -fn color(baseline_time: u128, test_time: u128) -> Color { - if test_time > (baseline_time + baseline_time / 2) { - Color::BG_RED - } else if test_time > (baseline_time + baseline_time / 10) { - Color::BG_YELLOW - } else { - Color::BG_BRIGHT_GREEN - } -} - -fn print_measurements_json(receiver: Receiver) -> anyhow::Result<()> { - let mut measurements = Vec::new(); - - while let Ok(m) = receiver.recv() { - measurements.push(m.to_json()); - } - - let output = serde_json::to_string(&measurements)?; - - print!("{output}"); - - Ok(()) -} - async fn bench_main( queries: Option>, exclude_queries: Option>, @@ -283,6 +158,7 @@ async fn bench_main( query_idx, time: fastest, format: *format, + dataset: "tpch".to_string(), }) .unwrap(); @@ -318,12 +194,14 @@ async fn bench_main( }) } + let all_measurements = measurements_rx.into_iter().collect::>(); + match display_format { DisplayFormat::Table => { - render_table(measurements_rx, &formats).unwrap(); + render_table(all_measurements, &formats).unwrap(); } DisplayFormat::GhJson => { - print_measurements_json(measurements_rx).unwrap(); + print_measurements_json(all_measurements).unwrap(); } } diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs new file mode 100644 index 0000000000..91a0d13e32 --- /dev/null +++ b/bench-vortex/src/clickbench.rs @@ -0,0 +1,251 @@ +use std::path::Path; +use std::sync::{Arc, LazyLock}; + +use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use tokio::fs::{create_dir_all, OpenOptions}; +use vortex::aliases::hash_map::HashMap; +use vortex::array::{ChunkedArray, StructArray}; +use vortex::dtype::DType; +use vortex::error::vortex_err; +use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::sampling_compressor::SamplingCompressor; +use vortex::variants::StructArrayTrait; +use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; +use vortex_datafusion::persistent::format::VortexFormat; + +use crate::{idempotent_async, CTX}; + +pub static HITS_SCHEMA: LazyLock = LazyLock::new(|| { + use DataType::*; + Schema::new(vec![ + Field::new("watchid", Int64, false), + Field::new("javaenable", Int16, false), + Field::new("title", Utf8View, false), + Field::new("goodevent", Int16, false), + Field::new("eventtime", Timestamp(TimeUnit::Microsecond, None), false), + Field::new("eventdate", Timestamp(TimeUnit::Microsecond, None), false), + Field::new("counterid", Int32, false), + Field::new("clientip", Int32, false), + Field::new("regionid", Int32, false), + Field::new("userid", Int64, false), + Field::new("counterclass", Int16, false), + Field::new("os", Int16, false), + Field::new("useragent", Int16, false), + Field::new("url", Utf8View, false), + Field::new("referer", Utf8View, false), + Field::new("isrefresh", Int16, false), + Field::new("referercategoryid", Int16, false), + Field::new("refererregionid", Int32, false), + Field::new("urlcategoryid", Int16, false), + Field::new("urlregionid", Int32, false), + Field::new("resolutionwidth", Int16, false), + Field::new("resolutionheight", Int16, false), + Field::new("resolutiondepth", Int16, false), + Field::new("flashmajor", Int16, false), + Field::new("flashminor", Int16, false), + Field::new("flashminor2", Utf8View, false), + Field::new("netmajor", Int16, false), + Field::new("netminor", Int16, false), + Field::new("useragentmajor", Int16, false), + Field::new("useragentminor", Utf8View, false), + Field::new("cookieenable", Int16, false), + Field::new("javascriptenable", Int16, false), + Field::new("ismobile", Int16, false), + Field::new("mobilephone", Int16, false), + Field::new("mobilephonemodel", Utf8View, false), + Field::new("params", Utf8View, false), + Field::new("ipnetworkid", Int32, false), + Field::new("traficsourceid", Int16, false), + Field::new("searchengineid", Int16, false), + Field::new("searchphrase", Utf8View, false), + Field::new("advengineid", Int16, false), + Field::new("isartifical", Int16, false), + Field::new("windowclientwidth", Int16, false), + Field::new("windowclientheight", Int16, false), + Field::new("clienttimezone", Int16, false), + Field::new( + "clienteventtime", + Timestamp(TimeUnit::Microsecond, None), + false, + ), + Field::new("silverlightversion1", Int16, false), + Field::new("silverlightversion2", Int16, false), + Field::new("silverlightversion3", Int32, false), + Field::new("silverlightversion4", Int16, false), + Field::new("pagecharset", Utf8View, false), + Field::new("codeversion", Int32, false), + Field::new("islink", Int16, false), + Field::new("isdownload", Int16, false), + Field::new("isnotbounce", Int16, false), + Field::new("funiqid", Int64, false), + Field::new("originalurl", Utf8View, false), + Field::new("hid", Int32, false), + Field::new("isoldcounter", Int16, false), + Field::new("isevent", Int16, false), + Field::new("isparameter", Int16, false), + Field::new("dontcounthits", Int16, false), + Field::new("withhash", Int16, false), + Field::new("hitcolor", Utf8View, false), + Field::new( + "localeventtime", + Timestamp(TimeUnit::Microsecond, None), + false, + ), + Field::new("age", Int16, false), + Field::new("sex", Int16, false), + Field::new("income", Int16, false), + Field::new("interests", Int16, false), + Field::new("robotness", Int16, false), + Field::new("remoteip", Int32, false), + Field::new("windowname", Int32, false), + Field::new("openername", Int32, false), + Field::new("historylength", Int16, false), + Field::new("browserlanguage", Utf8View, false), + Field::new("browsercountry", Utf8View, false), + Field::new("socialnetwork", Utf8View, false), + Field::new("socialaction", Utf8View, false), + Field::new("httperror", Int16, false), + Field::new("sendtiming", Int32, false), + Field::new("dnstiming", Int32, false), + Field::new("connecttiming", Int32, false), + Field::new("responsestarttiming", Int32, false), + Field::new("responseendtiming", Int32, false), + Field::new("fetchtiming", Int32, false), + Field::new("socialsourcenetworkid", Int16, false), + Field::new("socialsourcepage", Utf8View, false), + Field::new("paramprice", Int64, false), + Field::new("paramorderid", Utf8View, false), + Field::new("paramcurrency", Utf8View, false), + Field::new("paramcurrencyid", Int16, false), + Field::new("openstatservicename", Utf8View, false), + Field::new("openstatcampaignid", Utf8View, false), + Field::new("openstatadid", Utf8View, false), + Field::new("openstatsourceid", Utf8View, false), + Field::new("utmsource", Utf8View, false), + Field::new("utmmedium", Utf8View, false), + Field::new("utmcampaign", Utf8View, false), + Field::new("utmcontent", Utf8View, false), + Field::new("utmterm", Utf8View, false), + Field::new("fromtag", Utf8View, false), + Field::new("hasgclid", Int16, false), + Field::new("refererhash", Int64, false), + Field::new("urlhash", Int64, false), + Field::new("clid", Int32, false), + ]) +}); + +pub async fn register_vortex_file( + session: &SessionContext, + table_name: &str, + input_path: &Path, + schema: &Schema, +) -> anyhow::Result<()> { + let vortex_dir = input_path.parent().unwrap().join("vortex_compressed"); + create_dir_all(&vortex_dir).await?; + + for idx in 0..100 { + let parquet_file_path = input_path.join(format!("hits_{idx}.parquet")); + let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}")); + idempotent_async(&output_path, |vtx_file| async move { + eprintln!("Processing file {idx}"); + let record_batches = session + .read_parquet( + parquet_file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await? + .collect() + .await?; + + // Create a ChunkedArray from the set of chunks. + let sts = record_batches + .into_iter() + .map(ArrayData::try_from) + .map(|a| a.unwrap().into_struct().unwrap()) + .collect::>(); + + let mut arrays_map: HashMap, Vec> = HashMap::default(); + let mut types_map: HashMap, DType> = HashMap::default(); + + for st in sts.into_iter() { + let struct_dtype = st.dtype().as_struct().unwrap(); + let names = struct_dtype.names().iter(); + let types = struct_dtype.dtypes().iter(); + + for (field_name, field_type) in names.zip(types) { + let lower_case: Arc = field_name.to_lowercase().into(); + let val = arrays_map.entry(lower_case.clone()).or_default(); + val.push(st.field_by_name(field_name.as_ref()).unwrap()); + + types_map.insert(lower_case, field_type.clone()); + } + } + + let fields = schema + .fields() + .iter() + .map(|field| { + let name: Arc = field.name().to_ascii_lowercase().as_str().into(); + let dtype = types_map[&name].clone(); + let chunks = arrays_map.remove(&name).unwrap(); + let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap(); + + (name, chunked_child.into_array()) + }) + .collect::>(); + + let data = StructArray::from_fields(&fields)?.into_array(); + + let compressor = SamplingCompressor::default(); + let data = compressor.compress(&data, None)?.into_array(); + + let f = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&vtx_file) + .await?; + + let mut writer = VortexFileWriter::new(f); + writer = writer.write_array_columns(data).await?; + writer.finalize().await?; + + anyhow::Ok(()) + }) + .await?; + } + + let format = Arc::new(VortexFormat::new(&CTX)); + let table_path = vortex_dir + .to_str() + .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?; + let table_path = format!("file://{table_path}/"); + let table_url = ListingTableUrl::parse(table_path)?; + + let config = ListingTableConfig::new(table_url) + .with_listing_options(ListingOptions::new(format as _)) + .with_schema(schema.clone().into()); + + let listing_table = Arc::new(ListingTable::try_new(config)?); + + session.register_table(table_name, listing_table as _)?; + + Ok(()) +} + +pub fn clickbench_queries() -> Vec<(usize, String)> { + let queries_file = Path::new(env!("CARGO_MANIFEST_DIR")).join("clickbench_queries.sql"); + + std::fs::read_to_string(queries_file) + .unwrap() + .split(';') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .enumerate() + .collect() +} diff --git a/bench-vortex/src/display.rs b/bench-vortex/src/display.rs new file mode 100644 index 0000000000..3dbf1cc958 --- /dev/null +++ b/bench-vortex/src/display.rs @@ -0,0 +1,92 @@ +use clap::ValueEnum; +use tabled::builder::Builder; +use tabled::settings::themes::Colorization; +use tabled::settings::{Color, Style}; +use vortex::aliases::hash_map::HashMap; + +use crate::{Format, Measurement}; + +#[derive(ValueEnum, Default, Clone, Debug)] +pub enum DisplayFormat { + #[default] + Table, + GhJson, +} + +pub fn render_table(all_measurements: Vec, formats: &[Format]) -> anyhow::Result<()> { + let mut measurements: HashMap> = HashMap::default(); + + for m in all_measurements.into_iter() { + measurements.entry(m.format).or_default().push(m); + } + + measurements.values_mut().for_each(|v| { + v.sort_by_key(|m| m.query_idx); + }); + + // The first format serves as the baseline + let baseline_format = &formats[0]; + let baseline = measurements[baseline_format].clone(); + + let mut table_builder = Builder::default(); + let mut colors = vec![]; + + let mut header = vec!["Query".to_string()]; + header.extend(formats.iter().map(|f| format!("{:?}", f))); + table_builder.push_record(header); + + for (query_idx, baseline_measure) in baseline.iter().enumerate() { + let query_baseline = baseline_measure.time.as_micros(); + let mut row = vec![(baseline_measure.query_idx).to_string()]; + for (col_idx, format) in formats.iter().enumerate() { + let time_us = measurements[format][query_idx].time.as_micros(); + + if format != baseline_format { + let color = color(query_baseline, time_us); + + colors.push(Colorization::exact( + vec![color], + (query_idx + 1, col_idx + 1), + )) + } + + let ratio = time_us as f64 / query_baseline as f64; + row.push(format!("{time_us} us ({ratio:.2})")); + } + table_builder.push_record(row); + } + + let mut table = table_builder.build(); + table.with(Style::modern()); + + for color in colors.into_iter() { + table.with(color); + } + + println!("{table}"); + + Ok(()) +} + +pub fn print_measurements_json(all_measurements: Vec) -> anyhow::Result<()> { + let measurements = all_measurements + .into_iter() + .map(|v| v.to_json()) + .collect::>(); + + let output = serde_json::to_string(&measurements)?; + + print!("{output}"); + + Ok(()) +} + +fn color(baseline_time: u128, test_time: u128) -> Color { + if test_time > (baseline_time + baseline_time / 2) { + Color::BG_RED + } else if test_time > (baseline_time + baseline_time / 10) { + Color::BG_YELLOW + } else { + Color::BG_BRIGHT_GREEN + } +} diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 9d3cb882e0..242d2af66a 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -5,11 +5,15 @@ use std::fs::{create_dir_all, File}; use std::future::Future; use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock}; +use std::time::Duration; -use arrow_array::RecordBatchReader; +use arrow_array::{RecordBatch, RecordBatchReader}; +use datafusion::prelude::SessionContext; +use datafusion_physical_plan::collect; use itertools::Itertools; use log::LevelFilter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use serde::Serialize; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; @@ -23,7 +27,9 @@ use crate::data_downloads::FileType; use crate::reader::BATCH_SIZE; use crate::taxi_data::taxi_data_parquet; +pub mod clickbench; pub mod data_downloads; +pub mod display; pub mod parquet_utils; pub mod public_bi_data; pub mod reader; @@ -31,6 +37,10 @@ pub mod taxi_data; pub mod tpch; pub mod vortex_utils; +// Sizes match default compressor configuration +const TARGET_BLOCK_BYTESIZE: usize = 16 * (1 << 20); +const TARGET_BLOCK_SIZE: usize = 64 * (1 << 10); + pub static CTX: LazyLock> = LazyLock::new(|| { Arc::new( Context::default() @@ -39,6 +49,53 @@ pub static CTX: LazyLock> = LazyLock::new(|| { ) }); +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub enum Format { + Csv, + Arrow, + Parquet, + InMemoryVortex { enable_pushdown: bool }, + OnDiskVortex { enable_compression: bool }, +} + +impl std::fmt::Display for Format { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Format::Csv => write!(f, "csv"), + Format::Arrow => write!(f, "arrow"), + Format::Parquet => write!(f, "parquet"), + Format::InMemoryVortex { enable_pushdown } => { + write!(f, "in_memory_vortex(pushdown={enable_pushdown})") + } + Format::OnDiskVortex { enable_compression } => { + write!(f, "on_disk_vortex(compressed={enable_compression})") + } + } + } +} + +impl Format { + pub fn name(&self) -> String { + match self { + Format::Csv => "csv".to_string(), + Format::Arrow => "arrow".to_string(), + Format::Parquet => "parquet".to_string(), + Format::InMemoryVortex { enable_pushdown } => if *enable_pushdown { + "vortex-in-memory-pushdown" + } else { + "vortex-in-memory" + } + .to_string(), + Format::OnDiskVortex { enable_compression } => if *enable_compression { + "vortex-file-compressed" + } else { + "vortex-file-uncompressed" + } + .to_string(), + } + } +} + /// Creates a file if it doesn't already exist. /// NB: Does NOT modify the given path to ensure that it resides in the data directory. pub fn idempotent( @@ -189,6 +246,47 @@ pub struct CompressionRunResults { pub total_compressed_size: Option, } +pub async fn execute_query(ctx: &SessionContext, query: &str) -> anyhow::Result> { + let plan = ctx.sql(query).await?; + let (state, plan) = plan.into_parts(); + let optimized = state.optimize(&plan)?; + let physical_plan = state.create_physical_plan(&optimized).await?; + let result = collect(physical_plan.clone(), state.task_ctx()).await?; + Ok(result) +} + +#[derive(Clone)] +pub struct Measurement { + pub query_idx: usize, + pub time: Duration, + pub format: Format, + pub dataset: String, +} + +#[derive(Serialize)] +pub struct JsonValue { + pub name: String, + pub unit: String, + pub value: u128, +} + +impl Measurement { + pub fn to_json(&self) -> JsonValue { + let name = format!( + "{dataset}_q{query_idx}/{format}", + dataset = self.dataset, + format = self.format.name(), + query_idx = self.query_idx + ); + + JsonValue { + name, + unit: "ns".to_string(), + value: self.time.as_nanos(), + } + } +} + #[cfg(test)] mod test { use std::fs::File; diff --git a/bench-vortex/src/tpch/execute.rs b/bench-vortex/src/tpch/execute.rs index ac44be0353..4c828e84d3 100644 --- a/bench-vortex/src/tpch/execute.rs +++ b/bench-vortex/src/tpch/execute.rs @@ -1,9 +1,6 @@ -use arrow_array::RecordBatch; use datafusion::prelude::SessionContext; -use datafusion_common::Result; -use datafusion_physical_plan::collect; -use crate::tpch::Format; +use crate::{execute_query, Format}; pub async fn run_tpch_query( ctx: &SessionContext, @@ -41,12 +38,3 @@ pub async fn run_tpch_query( .sum() } } - -pub async fn execute_query(ctx: &SessionContext, query: &str) -> Result> { - let plan = ctx.sql(query).await?; - let (state, plan) = plan.into_parts(); - let optimized = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&optimized).await?; - let result = collect(physical_plan.clone(), state.task_ctx()).await?; - Ok(result) -} diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 863de4a01c..34578641e8 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -1,4 +1,3 @@ -use std::fmt::{Display, Formatter}; use std::fs; use std::fs::create_dir_all; use std::path::Path; @@ -26,7 +25,7 @@ use vortex_datafusion::memory::VortexMemTableOptions; use vortex_datafusion::persistent::format::VortexFormat; use vortex_datafusion::SessionContextExt; -use crate::{idempotent_async, CTX}; +use crate::{idempotent_async, Format, CTX, TARGET_BLOCK_BYTESIZE, TARGET_BLOCK_SIZE}; pub mod dbgen; mod execute; @@ -38,57 +37,6 @@ pub const EXPECTED_ROW_COUNTS: [usize; 23] = [ 0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7, ]; -// Sizes match default compressor configuration -const TARGET_BLOCK_BYTESIZE: usize = 16 * (1 << 20); -const TARGET_BLOCK_SIZE: usize = 64 * (1 << 10); - -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] -pub enum Format { - Csv, - Arrow, - Parquet, - InMemoryVortex { enable_pushdown: bool }, - OnDiskVortex { enable_compression: bool }, -} - -impl Display for Format { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Format::Csv => write!(f, "csv"), - Format::Arrow => write!(f, "arrow"), - Format::Parquet => write!(f, "parquet"), - Format::InMemoryVortex { enable_pushdown } => { - write!(f, "in_memory_vortex(pushdown={enable_pushdown})") - } - Format::OnDiskVortex { enable_compression } => { - write!(f, "on_disk_vortex(compressed={enable_compression})") - } - } - } -} - -impl Format { - pub fn name(&self) -> String { - match self { - Format::Csv => "csv".to_string(), - Format::Arrow => "arrow".to_string(), - Format::Parquet => "parquet".to_string(), - Format::InMemoryVortex { enable_pushdown } => if *enable_pushdown { - "vortex-in-memory-pushdown" - } else { - "vortex-in-memory" - } - .to_string(), - Format::OnDiskVortex { enable_compression } => if *enable_compression { - "vortex-file-compressed" - } else { - "vortex-file-uncompressed" - } - .to_string(), - } - } -} - // Generate table dataset. pub async fn load_datasets>( base_dir: P, diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 6b418016a8..876e4594de 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -80,7 +80,9 @@ impl FileFormat for VortexFormat { file_schemas.push(s); } - Ok(Arc::new(Schema::try_merge(file_schemas)?)) + let schema = Arc::new(Schema::try_merge(file_schemas)?); + + Ok(schema) } async fn infer_stats(