Skip to content

Commit

Permalink
refactor(sqlplannertest): split directories, allow subdirectory selec…
Browse files Browse the repository at this point in the history
…tion, rename with_logical (#189)

* with_logical is now with_df_logical (which means that with datafusion
logical optimizer)
* apply planner test now supports subdirectories, so that we don't run
all tests, that's super slow
* apply planner test now shows time spent on a test case

Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored Oct 26, 2024
1 parent 5065c42 commit 9633315
Show file tree
Hide file tree
Showing 29 changed files with 113 additions and 78 deletions.
16 changes: 12 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions optd-perfbench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ tokio = { version = "1.24", features = [
shlex = "1.3"
tokio-postgres = "0.7"
regex = "1.10"
clap = { version = "4.5.4", features = [
"derive",
] }
clap = { version = "4.5.4", features = ["derive"] }
log = "0.4"
env_logger = "0.11"
lazy_static = "1.4.0"
Expand Down
8 changes: 4 additions & 4 deletions optd-perfbench/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,22 @@ impl DatafusionDBMS {
stats: Option<DataFusionBaseTableStats>,
benchmark: &Benchmark,
) -> anyhow::Result<()> {
let with_logical = match benchmark {
let use_df_logical = match benchmark {
Benchmark::Tpch(_) => WITH_LOGICAL_FOR_TPCH,
Benchmark::Job(_) | Benchmark::Joblight(_) => WITH_LOGICAL_FOR_JOB,
};
self.ctx = Some(Self::new_session_ctx(stats, self.adaptive, with_logical).await?);
self.ctx = Some(Self::new_session_ctx(stats, self.adaptive, use_df_logical).await?);
Ok(())
}

async fn new_session_ctx(
stats: Option<DataFusionBaseTableStats>,
adaptive: bool,
with_logical: bool,
use_df_logical: bool,
) -> anyhow::Result<SessionContext> {
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);

if !with_logical {
if !use_df_logical {
session_config.options_mut().optimizer.max_passes = 0;
}

Expand Down
3 changes: 2 additions & 1 deletion optd-sqlplannertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
anyhow = { version = "1", features = ["backtrace"] }
sqlplannertest = { git = "https://github.com/risinglightdb/sqlplannertest-rs" }
sqlplannertest = { git = "https://github.com/risinglightdb/sqlplannertest-rs", branch = "main" }
async-trait = "0.1"
datafusion-optd-cli = { path = "../datafusion-optd-cli" }
rand = "0.8"
Expand Down
17 changes: 10 additions & 7 deletions optd-sqlplannertest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

```shell
cargo run -p optd-sqlplannertest --bin planner_test_apply
# or, supply a list of directories to scan from
cargo run -p optd-sqlplannertest --bin planner_test_apply -- subqueries
```

## Verify the test cases
Expand All @@ -22,23 +24,24 @@ The `explain` and `execute` task will be run with datafusion's logical optimizer

#### Flags

| Name | Description |
| -- | -- |
| with_logical | Enable Datafusion's logical optimizer |
| Name | Description |
| -------------- | ------------------------------------- |
| use_df_logical | Enable Datafusion's logical optimizer |

### Explain Task

#### Flags

| Name | Description |
| -- | -- |
| with_logical | Enable Datafusion's logical optimizer |
| verbose | Display estimated cost in physical plan |
| Name | Description |
| -------------- | --------------------------------------- |
| use_df_logical | Enable Datafusion's logical optimizer |
| verbose | Display estimated cost in physical plan |

Currently we have the following options for the explain task:

- `logical_datafusion`: datafusion's logical plan.
- `logical_optd`: optd's logical plan before optimization.
- `optimized_logical_optd`: optd's logical plan after heuristics optimization and before cascades optimization.
- `physical_optd`: optd's physical plan after optimization.
- `physical_datafusion`: datafusion's physical plan.
- `join_orders`: physical join orders.
Expand Down
35 changes: 30 additions & 5 deletions optd-sqlplannertest/src/bin/planner_test_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,37 @@ use std::path::Path;

use anyhow::Result;

use clap::Parser;

#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
/// Optional list of directories to apply the test; if empty, apply all tests
directories: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
sqlplannertest::planner_test_apply(
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"),
|| async { optd_sqlplannertest::DatafusionDBMS::new().await },
)
.await?;
let cli = Cli::parse();

if cli.directories.is_empty() {
println!("Running all tests");
sqlplannertest::planner_test_apply(
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"),
|| async { optd_sqlplannertest::DatafusionDBMS::new().await },
)
.await?;
} else {
for directory in cli.directories {
println!("Running tests in {}", directory);
sqlplannertest::planner_test_apply(
Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join(directory),
|| async { optd_sqlplannertest::DatafusionDBMS::new().await },
)
.await?;
}
}
Ok(())
}
32 changes: 16 additions & 16 deletions optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ use async_trait::async_trait;
pub struct DatafusionDBMS {
ctx: SessionContext,
/// Context enabling datafusion's logical optimizer.
with_logical_ctx: SessionContext,
use_df_logical_ctx: SessionContext,
}

impl DatafusionDBMS {
pub async fn new() -> Result<Self> {
let ctx = DatafusionDBMS::new_session_ctx(false, None).await?;
let with_logical_ctx =
let use_df_logical_ctx =
DatafusionDBMS::new_session_ctx(true, Some(ctx.state().catalog_list().clone())).await?;
Ok(Self {
ctx,
with_logical_ctx,
use_df_logical_ctx,
})
}

/// Creates a new session context. If the `with_logical` flag is set, datafusion's logical optimizer will be used.
/// Creates a new session context. If the `use_df_logical` flag is set, datafusion's logical optimizer will be used.
async fn new_session_ctx(
with_logical: bool,
use_df_logical: bool,
catalog: Option<Arc<dyn CatalogList>>,
) -> Result<SessionContext> {
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
if !with_logical {
if !use_df_logical {
session_config.options_mut().optimizer.max_passes = 0;
}

Expand All @@ -67,7 +67,7 @@ impl DatafusionDBMS {
BaseTableStats::default(),
false,
);
if !with_logical {
if !use_df_logical {
// clean up optimizer rules so that we can plug in our own optimizer
state = state.with_optimizer_rules(vec![]);
}
Expand All @@ -80,19 +80,19 @@ impl DatafusionDBMS {
Ok(ctx)
}

pub async fn execute(&self, sql: &str, with_logical: bool) -> Result<Vec<Vec<String>>> {
pub async fn execute(&self, sql: &str, use_df_logical: bool) -> Result<Vec<Vec<String>>> {
let sql = unescape_input(sql)?;
let dialect = Box::new(GenericDialect);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
let mut result = Vec::new();
for statement in statements {
let df = if with_logical {
let df = if use_df_logical {
let plan = self
.with_logical_ctx
.use_df_logical_ctx
.state()
.statement_to_plan(statement)
.await?;
self.with_logical_ctx.execute_logical_plan(plan).await?
self.use_df_logical_ctx.execute_logical_plan(plan).await?
} else {
let plan = self.ctx.state().statement_to_plan(statement).await?;
self.ctx.execute_logical_plan(plan).await?
Expand Down Expand Up @@ -125,8 +125,8 @@ impl DatafusionDBMS {
/// Executes the `execute` task.
async fn task_execute(&mut self, r: &mut String, sql: &str, flags: &[String]) -> Result<()> {
use std::fmt::Write;
let with_logical = flags.contains(&"with_logical".to_string());
let result = self.execute(sql, with_logical).await?;
let use_df_logical = flags.contains(&"use_df_logical".to_string());
let result = self.execute(sql, use_df_logical).await?;
writeln!(r, "{}", result.into_iter().map(|x| x.join(" ")).join("\n"))?;
writeln!(r)?;
Ok(())
Expand All @@ -142,14 +142,14 @@ impl DatafusionDBMS {
) -> Result<()> {
use std::fmt::Write;

let with_logical = flags.contains(&"with_logical".to_string());
let use_df_logical = flags.contains(&"use_df_logical".to_string());
let verbose = flags.contains(&"verbose".to_string());
let explain_sql = if verbose {
format!("explain verbose {}", &sql)
} else {
format!("explain {}", &sql)
};
let result = self.execute(&explain_sql, with_logical).await?;
let result = self.execute(&explain_sql, use_df_logical).await?;
let subtask_start_pos = task.find(':').unwrap() + 1;
for subtask in task[subtask_start_pos..].split(',') {
let subtask = subtask.trim();
Expand Down Expand Up @@ -260,7 +260,7 @@ lazy_static! {
}

/// Extract the flags from a task. The flags are specified in square brackets.
/// For example, the flags for the task `explain[with_logical, verbose]` are `["with_logical", "verbose"]`.
/// For example, the flags for the task `explain[use_df_logical, verbose]` are `["use_df_logical", "verbose"]`.
fn extract_flags(task: &str) -> Result<Vec<String>> {
if let Some(captures) = FLAGS_REGEX.captures(task) {
Ok(captures
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
create table t1(t1v1 int, t1v2 int);
insert into t1 values (0, 0), (1, 1), (2, 2);
tasks:
- execute[with_logical]
- execute[use_df_logical]
- sql: |
select * from t1 where t1v1 = 0;
desc: Test whether the optimizer handles integer equality predicates correctly.
tasks:
- execute[with_logical]
- execute[use_df_logical]
- sql: |
select * from t1 where t1v1 = 0 and t1v2 = 1;
desc: Test whether the optimizer handles multiple integer equality predicates correctly.
tasks:
- execute[with_logical]
- execute[use_df_logical]
- sql: |
select * from t1 where t1v1 = 0 and t1v2 != 1;
desc: Test whether the optimizer handles multiple integer inequality predicates correctly.
tasks:
- execute[with_logical]
- execute[use_df_logical]
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
insert into t2 values (0, 200), (1, 201), (2, 202);
insert into t3 values (0, 300), (1, 301), (2, 302);
tasks:
- execute[with_logical]
- execute[use_df_logical]
- sql: |
select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2;
desc: Test whether the optimizer enumerates all join orders.
tasks:
- explain[with_logical]:logical_join_orders
- execute[with_logical]
- explain[use_df_logical]:logical_join_orders
- execute[use_df_logical]
- sql: |
select * from t1, t2, t3 where t1v1 = t2v1 and t1v2 = t3v2;
desc: Test whether the optimizer enumerates all join orders.
tasks:
- explain[with_logical]:logical_join_orders
- execute[with_logical]
- explain[use_df_logical]:logical_join_orders
- execute[use_df_logical]
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
create table t2(t2v1 int, t2v3 int);
create table t3(t3v2 int, t3v4 int);
tasks:
- execute[with_logical]
- execute[use_df_logical]
# - sql: |
# select * from t1 where t1v1 in (select t2v1 from t2);
# desc: Test whether the optimizer can unnest "in" subqueries. -- failing with unsupported expression
Expand All @@ -13,4 +13,4 @@
select * from t1 where (select sum(t2v3) from t2 where t2v1 = t1v1) > 100;
desc: Test whether the optimizer can unnest correlated subqueries.
tasks:
- explain:logical_optd,physical_optd
- explain:logical_optd,optimized_logical_optd,physical_optd
File renamed without changes.
Loading

0 comments on commit 9633315

Please sign in to comment.