Skip to content

Commit

Permalink
feat: support logical plan for EXECUTE statement (#13194)
Browse files Browse the repository at this point in the history
* Add Execute LogicalPlan

* Fix compile

* Add tests
  • Loading branch information
jonahgao authored Nov 1, 2024
1 parent f2bebcd commit d2a15b3
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 12 deletions.
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,9 @@ impl DefaultPhysicalPlanner {
// statement can be prepared)
return not_impl_err!("Unsupported logical plan: Prepare");
}
LogicalPlan::Execute(_) => {
return not_impl_err!("Unsupported logical plan: Execute");
}
LogicalPlan::Dml(dml) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl NamePreserver {
| LogicalPlan::Join(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Execute(_)
),
}
}
Expand Down
17 changes: 13 additions & 4 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::collections::HashMap;
use std::fmt;

use crate::{
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
Unnest, Values, Window,
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute,
Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
};

use crate::dml::CopyTo;
Expand Down Expand Up @@ -626,6 +626,15 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Data Types": format!("{:?}", data_types)
})
}
LogicalPlan::Execute(Execute {
name, parameters, ..
}) => {
json!({
"Node Type": "Execute",
"Name": name,
"Parameters": expr_vec_fmt!(parameters),
})
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
json!({
"Node Type": "DescribeTable"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use ddl::{
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
39 changes: 39 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ pub enum LogicalPlan {
/// Prepare a statement and find any bind parameters
/// (e.g. `?`). This is used to implement SQL-prepared statements.
Prepare(Prepare),
/// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
Execute(Execute),
/// Data Manipulation Language (DML): Insert / Update / Delete
Dml(DmlStatement),
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
Expand Down Expand Up @@ -314,6 +316,7 @@ impl LogicalPlan {
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
LogicalPlan::Execute(Execute { schema, .. }) => schema,
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
Expand Down Expand Up @@ -457,6 +460,7 @@ impl LogicalPlan {
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => vec![],
}
}
Expand Down Expand Up @@ -560,6 +564,7 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
Expand Down Expand Up @@ -712,6 +717,7 @@ impl LogicalPlan {
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
LogicalPlan::Execute(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
Expand Down Expand Up @@ -1072,6 +1078,14 @@ impl LogicalPlan {
input: Arc::new(input),
}))
}
LogicalPlan::Execute(Execute { name, schema, .. }) => {
self.assert_no_inputs(inputs)?;
Ok(LogicalPlan::Execute(Execute {
name: name.clone(),
schema: Arc::clone(schema),
parameters: expr,
}))
}
LogicalPlan::TableScan(ts) => {
self.assert_no_inputs(inputs)?;
Ok(LogicalPlan::TableScan(TableScan {
Expand Down Expand Up @@ -1330,6 +1344,7 @@ impl LogicalPlan {
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Extension(_) => None,
}
Expand Down Expand Up @@ -1933,6 +1948,9 @@ impl LogicalPlan {
}) => {
write!(f, "Prepare: {name:?} {data_types:?} ")
}
LogicalPlan::Execute(Execute { name, parameters, .. }) => {
write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters))
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
Expand Down Expand Up @@ -2599,6 +2617,27 @@ pub struct Prepare {
pub input: Arc<LogicalPlan>,
}

/// Execute a prepared statement.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Execute {
/// The name of the prepared statement to execute
pub name: String,
/// The execute parameters
pub parameters: Vec<Expr>,
/// Dummy schema
pub schema: DFSchemaRef,
}

// Comparison excludes the `schema` field.
impl PartialOrd for Execute {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters),
cmp => cmp,
}
}
}

/// Describe the schema of table
///
/// # Example output:
Expand Down
26 changes: 22 additions & 4 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit,
LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values,
Window,
Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
Values, Window,
};
use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -363,6 +363,7 @@ impl TreeNode for LogicalPlan {
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => Transformed::no(self),
})
}
Expand Down Expand Up @@ -505,6 +506,9 @@ impl LogicalPlan {
.chain(fetch.iter())
.map(|e| e.deref())
.apply_until_stop(f),
LogicalPlan::Execute(Execute { parameters, .. }) => {
parameters.iter().apply_until_stop(f)
}
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::RecursiveQuery(_)
Expand Down Expand Up @@ -734,6 +738,20 @@ impl LogicalPlan {
})
})
}
LogicalPlan::Execute(Execute {
parameters,
name,
schema,
}) => parameters
.into_iter()
.map_until_stop_and_collect(f)?
.update_data(|parameters| {
LogicalPlan::Execute(Execute {
parameters,
name,
schema,
})
}),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Unnest(_)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Prepare(_) => {
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_) => {
// This rule handles recursion itself in a `ApplyOrder::TopDown` like
// manner.
plan.map_children(|c| self.rewrite(c, config))?
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ fn optimize_projections(
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::DescribeTable(_) => {
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Execute(_) => {
// These operators have no inputs, so stop the optimization process.
return Ok(Transformed::no(plan));
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::RecursiveQuery(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for RecursiveQuery",
)),
LogicalPlan::Execute(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Execute",
)),
}
}
}
26 changes: 25 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion_expr::{
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable,
DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation,
Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
Expand Down Expand Up @@ -642,6 +642,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: Arc::new(plan),
}))
}
Statement::Execute {
name,
parameters,
using,
} => {
// `USING` is a MySQL-specific syntax and currently not supported.
if !using.is_empty() {
return not_impl_err!(
"Execute statement with USING is not supported"
);
}

let empty_schema = DFSchema::empty();
let parameters = parameters
.into_iter()
.map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
.collect::<Result<Vec<Expr>>>()?;

Ok(LogicalPlan::Execute(Execute {
name: ident_to_string(&name),
parameters,
schema: DFSchemaRef::new(empty_schema),
}))
}

Statement::ShowTables {
extended,
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl Unparser<'_> {
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
Expand Down
18 changes: 18 additions & 0 deletions datafusion/sqllogictest/test_files/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id, SUM(age) FROM person

statement error
PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);

# test creating logical plan for EXECUTE statements
query TT
EXPLAIN EXECUTE my_plan;
----
logical_plan Execute: my_plan params=[]

query TT
EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
----
logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]

query error DataFusion error: Schema error: No field named a\.
EXPLAIN EXECUTE my_plan(a);

# TODO: support EXECUTE queries
query error DataFusion error: This feature is not implemented: Unsupported logical plan: Execute
EXECUTE my_plan;

0 comments on commit d2a15b3

Please sign in to comment.