diff --git a/Cargo.lock b/Cargo.lock index 7b9de194..2af55809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,15 +958,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", "serde", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -2892,6 +2892,7 @@ dependencies = [ "optd-datafusion-bridge", "optd-datafusion-repr", "optd-gungnir", + "parquet", "prettytable-rs", "rayon", "regex", diff --git a/optd-perftest/Cargo.toml b/optd-perftest/Cargo.toml index 0e85ba07..fe08820e 100644 --- a/optd-perftest/Cargo.toml +++ b/optd-perftest/Cargo.toml @@ -46,6 +46,7 @@ serde_json = "1.0" itertools = "0.12.1" test-case = "3.3" rayon = "1.10" +parquet = "47.0.0" [dev_dependencies] assert_cmd = "2.0" diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 3e819cd0..5cbb922b 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -18,9 +18,7 @@ use crate::{ use async_trait::async_trait; use datafusion::{ arrow::{ - array::{RecordBatch, RecordBatchIterator}, - csv::ReaderBuilder, - datatypes::SchemaRef, + array::RecordBatch, error::ArrowError, util::display::{ArrayFormatter, FormatOptions}, }, @@ -31,6 +29,7 @@ use datafusion::{ }, sql::{parser::DFParser, sqlparser::dialect::GenericDialect}, }; + use datafusion_optd_cli::helper::unescape_input; use futures::executor::block_on; use lazy_static::lazy_static; @@ -39,6 +38,7 @@ use optd_datafusion_repr::{ cost::{DataFusionBaseTableStats, DataFusionPerTableStats}, DatafusionOptimizer, }; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use rayon::prelude::*; use regex::Regex; @@ -359,7 +359,6 @@ impl DatafusionDBMS { fn gen_base_stats( tbl_paths: Vec, ctx: SessionContext, - delim: u8, ) -> anyhow::Result { let base_table_stats = Mutex::new(DataFusionBaseTableStats::default()); let now = Instant::now(); @@ -383,8 +382,8 @@ impl DatafusionDBMS { let single_cols = (0..nb_cols).map(|v| vec![v]).collect::>(); let stats_result = DataFusionPerTableStats::from_record_batches( - Self::create_batch_channel(tbl_fpath.clone(), schema.clone(), delim), - Self::create_batch_channel(tbl_fpath.clone(), schema.clone(), delim), + Self::create_batch_channel(tbl_fpath.clone()), + Self::create_batch_channel(tbl_fpath.clone()), single_cols, schema, ); @@ -408,26 +407,27 @@ impl DatafusionDBMS { fn create_batch_channel( tbl_fpath: PathBuf, - schema: SchemaRef, - delim: u8, ) -> impl FnOnce() -> (JoinHandle<()>, Receiver>) { move || { let (sender, receiver) = mpsc::channel(); let handle = thread::spawn(move || { + // Get the number of row groups. let tbl_file = File::open(tbl_fpath).expect("Failed to open file"); - let csv_reader = ReaderBuilder::new(schema.clone()) - .has_header(false) - .with_delimiter(delim) - .with_escape(b'\\') - .with_batch_size(1024) - .build(tbl_file) - .expect("Failed to build CSV reader"); - - let batch_iter = RecordBatchIterator::new(csv_reader, schema); - for batch in batch_iter { - sender.send(batch).expect("Failed to send batch"); - } + let builder = + ParquetRecordBatchReaderBuilder::try_new(tbl_file.try_clone().unwrap()) + .unwrap(); + let num_row_groups = builder.metadata().num_row_groups(); + + // Read row groups in parallel. + (0..num_row_groups).into_par_iter().for_each(|i| { + ParquetRecordBatchReaderBuilder::try_new(tbl_file.try_clone().unwrap()) + .unwrap() + .with_row_groups(vec![i]) + .build() + .unwrap() + .for_each(|batch| sender.send(batch).expect("Failed to send batch")) + }); }); (handle, receiver) @@ -456,7 +456,7 @@ impl DatafusionDBMS { // Compute base statistics. let tbl_paths = tpch_kit.get_tbl_fpath_vec(tpch_kit_config)?; - Self::gen_base_stats(tbl_paths, ctx, b'|') + Self::gen_base_stats(tbl_paths, ctx) } async fn get_job_stats( @@ -480,8 +480,8 @@ impl DatafusionDBMS { } // Compute base statistics. - let tbl_paths = job_kit.get_tbl_fpath_vec().unwrap(); - Self::gen_base_stats(tbl_paths, ctx, b',') + let tbl_paths = job_kit.get_tbl_fpath_vec("parquet").unwrap(); + Self::gen_base_stats(tbl_paths, ctx) } } diff --git a/optd-perftest/src/job.rs b/optd-perftest/src/job.rs index 61804ac2..b6eb4f14 100644 --- a/optd-perftest/src/job.rs +++ b/optd-perftest/src/job.rs @@ -138,7 +138,7 @@ impl JobKit { } /// Get a vector of all generated .csv files in a given directory path - pub fn get_tbl_fpath_vec(&self) -> io::Result> { + pub fn get_tbl_fpath_vec(&self, target_ext: &str) -> io::Result> { let dirent_iter = fs::read_dir(&self.downloaded_tables_dpath)?; let entries: Vec<_> = dirent_iter.collect::, io::Error>>()?; @@ -150,7 +150,7 @@ impl JobKit { if path .extension() .and_then(|ext| ext.to_str()) - .map(|ext| ext == "csv") + .map(|ext| ext == target_ext) .unwrap_or(false) { Some(path) diff --git a/optd-perftest/src/postgres_dbms.rs b/optd-perftest/src/postgres_dbms.rs index f870ae9d..bc5af388 100644 --- a/optd-perftest/src/postgres_dbms.rs +++ b/optd-perftest/src/postgres_dbms.rs @@ -193,7 +193,7 @@ impl PostgresDBMS { // load the tables job_kit.download_tables(job_kit_config)?; - for tbl_fpath in job_kit.get_tbl_fpath_vec()? { + for tbl_fpath in job_kit.get_tbl_fpath_vec("csv")? { Self::copy_from_stdin(client, tbl_fpath, ",", "\\").await?; }