Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically converting csv to parquet #181

Merged
merged 10 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions dev_scripts/which_queries_work.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ fi
successful_ids=()
IFS=','
for id in $all_ids; do
# make sure to execute with --adaptive so that we actually run the query in datafusion
cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id --adaptive &>/dev/null
cargo run --release --bin optd-perftest cardtest $benchmark_name --query-ids $id &>/dev/null

if [ $? -eq 0 ]; then
echo >&2 $id succeeded
Expand Down
5 changes: 4 additions & 1 deletion optd-perftest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tokio = { version = "1.24", features = [
shlex = "1.3"
tokio-postgres = "0.7"
regex = "1.10"
clap = { version = "4.5", features = [
clap = { version = "4.5.4", features = [
"derive",
] }
log = "0.4"
Expand All @@ -47,6 +47,9 @@ itertools = "0.12.1"
test-case = "3.3"
rayon = "1.10"
parquet = "47.0.0"
csv2parquet = { git = "https://github.com/wangpatrick57/arrow-tools.git", branch = "main" }
arrow-schema = { version = "47.0.0", features = ["serde"] }


[dev_dependencies]
assert_cmd = "2.0"
82 changes: 60 additions & 22 deletions optd-perftest/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl DatafusionDBMS {
match benchmark {
Benchmark::Tpch(_) => {
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
self.create_tpch_tables(&tpch_kit).await?;
Self::create_tpch_tables(self.get_ctx(), &tpch_kit).await?;
}
Benchmark::Job(_) | Benchmark::Joblight(_) => {
let job_kit = JobKit::build(&self.workspace_dpath)?;
Expand All @@ -332,15 +332,15 @@ impl DatafusionDBMS {
Ok(())
}

async fn create_tpch_tables(&mut self, tpch_kit: &TpchKit) -> anyhow::Result<()> {
async fn create_tpch_tables(ctx: &SessionContext, tpch_kit: &TpchKit) -> anyhow::Result<()> {
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
for ddl in ddls {
Self::execute(self.get_ctx(), ddl).await?;
Self::execute(ctx, ddl).await?;
}
Ok(())
}
Expand All @@ -362,12 +362,12 @@ impl DatafusionDBMS {
&mut self,
tpch_kit_config: &TpchKitConfig,
) -> anyhow::Result<()> {
// Generate the tables.
// Generate the tables and convert them to Parquet.
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
tpch_kit.gen_tables(tpch_kit_config)?;

// Create the tables.
self.create_tpch_tables(&tpch_kit).await?;
Self::create_tpch_tables(self.get_ctx(), &tpch_kit).await?;

// Load the data by creating an external table first and copying the data to real tables.
let tbl_fpath_iter = tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "tbl").unwrap();
Expand All @@ -394,6 +394,10 @@ impl DatafusionDBMS {
.await
.unwrap()
.schema();

// DEBUG(phw2)
println!("schema={}", serde_json::to_string_pretty(&schema).unwrap());

let projection_list = (1..=schema.fields().len())
.map(|i| format!("column_{}", i))
.collect::<Vec<_>>()
Expand Down Expand Up @@ -477,7 +481,35 @@ impl DatafusionDBMS {

println!("Total execution time {:?}...", now.elapsed());

Ok(base_table_stats.into_inner()?)
let stats = base_table_stats.into_inner();
let l = stats.unwrap();
// Useful for debugging stats so I kept it
// l.iter().for_each(|(table_name, stats)| {
// println!("Table: {} (num_rows: {})", table_name, stats.row_cnt);
// stats
// .column_comb_stats
// .iter()
// .sorted_by_key(|x| x.0[0])
// .for_each(|x| {
// let sum_freq: f64 = x.1.mcvs.frequencies().values().copied().sum();
// println!(
// "Col: {} (n_distinct: {}) (n_frac: {}) (mcvs: {} {}) (tdigests: {:?} {:?} {:?} {:?} {:?})",
// x.0[0],
// x.1.ndistinct,
// x.1.null_frac,
// x.1.mcvs.frequencies().len(),
// sum_freq,
// x.1.distr.as_ref().map(|d| d.quantile(0.01)),
// x.1.distr.as_ref().map(|d| d.quantile(0.25)),
// x.1.distr.as_ref().map(|d| d.quantile(0.50)),
// x.1.distr.as_ref().map(|d| d.quantile(0.75)),
// x.1.distr.as_ref().map(|d| d.quantile(0.99)),
// );
// });
// });
// println!("{:#?}", stats);

Ok(l)
}

// Load job data from a .csv file.
Expand All @@ -487,14 +519,14 @@ impl DatafusionDBMS {
) -> anyhow::Result<()> {
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;

// Download the tables.
// Download the tables and convert them to Parquet.
let job_kit = JobKit::build(&self.workspace_dpath)?;
job_kit.download_tables(job_kit_config)?;

// Create the tables.
Self::create_job_tables(&ctx, &job_kit).await?;

// Load each table using register_csv()
// Load each table using register_csv().
let tbl_fpath_iter = job_kit.get_tbl_fpath_vec("csv").unwrap();
for tbl_fpath in tbl_fpath_iter {
let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap();
Expand Down Expand Up @@ -525,34 +557,39 @@ impl DatafusionDBMS {
&mut self,
tpch_kit_config: &TpchKitConfig,
) -> anyhow::Result<DataFusionBaseTableStats> {
// Generate the tables
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
tpch_kit.gen_tables(tpch_kit_config)?;

// To get the schema of each table.
// Create tables in a temporary context to get the schema provider.
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_TPCH).await?;
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
for ddl in ddls {
Self::execute(&ctx, ddl).await?;
}
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
Self::create_tpch_tables(&ctx, &tpch_kit).await?;
let schema_provider = ctx.catalog("datafusion").unwrap().schema("public").unwrap();

// Generate the tables
tpch_kit.gen_tables(tpch_kit_config)?;
tpch_kit
.make_parquet_files(tpch_kit_config, schema_provider)
.await?;
// Compute base statistics on Parquet.
let tbl_paths = tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "parquet")?;
assert!(tbl_paths.len() == tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "tbl")?.len());
Self::gen_base_stats(tbl_paths)
}

async fn get_job_stats(
&mut self,
job_kit_config: &JobKitConfig,
) -> anyhow::Result<DataFusionBaseTableStats> {
// Create tables in a temporary context to get the schema provider.
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;
let job_kit = JobKit::build(&self.workspace_dpath)?;
Self::create_job_tables(&ctx, &job_kit).await?;
let schema_provider = ctx.catalog("datafusion").unwrap().schema("public").unwrap();

// Generate the tables.
let job_kit = JobKit::build(&self.workspace_dpath)?;
job_kit.download_tables(job_kit_config)?;
job_kit
.make_parquet_files(job_kit_config, schema_provider)
.await?;

// To get the schema of each table.
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;
Expand All @@ -568,6 +605,7 @@ impl DatafusionDBMS {

// Compute base statistics on Parquet.
let tbl_paths = job_kit.get_tbl_fpath_vec("parquet").unwrap();
assert!(tbl_paths.len() == job_kit.get_tbl_fpath_vec("csv")?.len());
Self::gen_base_stats(tbl_paths)
}
}
Expand Down
Loading
Loading