diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5aa702550b08..c16f3ad104b8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1201,6 +1201,9 @@ impl DefaultPhysicalPlanner { // statement can be prepared) return not_impl_err!("Unsupported logical plan: Prepare"); } + LogicalPlan::Execute(_) => { + return not_impl_err!("Unsupported logical plan: Execute"); + } LogicalPlan::Dml(dml) => { // DataFusion is a read-only query engine, but also a library, so consumers may implement this return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op); diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index d6d5c3e2931c..c86696854ca3 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -314,6 +314,7 @@ impl NamePreserver { | LogicalPlan::Join(_) | LogicalPlan::TableScan(_) | LogicalPlan::Limit(_) + | LogicalPlan::Execute(_) ), } } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index c0549451a776..9aea7747c414 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -20,10 +20,10 @@ use std::collections::HashMap; use std::fmt; use crate::{ - expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, - Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, - Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, - Unnest, Values, Window, + expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute, + Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, + RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias, + TableProviderFilterPushDown, TableScan, Unnest, Values, Window, }; use crate::dml::CopyTo; @@ -626,6 +626,15 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Data Types": format!("{:?}", data_types) }) } + LogicalPlan::Execute(Execute { + name, parameters, .. + }) => { + json!({ + "Node Type": "Execute", + "Name": name, + "Parameters": expr_vec_fmt!(parameters), + }) + } LogicalPlan::DescribeTable(DescribeTable { .. }) => { json!({ "Node Type": "DescribeTable" diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 80a896212442..59654a227829 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -36,7 +36,7 @@ pub use ddl::{ pub use dml::{DmlStatement, WriteOp}; pub use plan::{ projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, - DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join, + DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 8ba2a44842bc..191a42e38e3a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -266,6 +266,8 @@ pub enum LogicalPlan { /// Prepare a statement and find any bind parameters /// (e.g. `?`). This is used to implement SQL-prepared statements. Prepare(Prepare), + /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'. + Execute(Execute), /// Data Manipulation Language (DML): Insert / Update / Delete Dml(DmlStatement), /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS @@ -314,6 +316,7 @@ impl LogicalPlan { LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(), LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(), + LogicalPlan::Execute(Execute { schema, .. }) => schema, LogicalPlan::Explain(explain) => &explain.schema, LogicalPlan::Analyze(analyze) => &analyze.schema, LogicalPlan::Extension(extension) => extension.node.schema(), @@ -457,6 +460,7 @@ impl LogicalPlan { | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } + | LogicalPlan::Execute { .. } | LogicalPlan::DescribeTable(_) => vec![], } } @@ -560,6 +564,7 @@ impl LogicalPlan { LogicalPlan::Subquery(_) => Ok(None), LogicalPlan::EmptyRelation(_) | LogicalPlan::Prepare(_) + | LogicalPlan::Execute(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) | LogicalPlan::Explain(_) @@ -712,6 +717,7 @@ impl LogicalPlan { LogicalPlan::Analyze(_) => Ok(self), LogicalPlan::Explain(_) => Ok(self), LogicalPlan::Prepare(_) => Ok(self), + LogicalPlan::Execute(_) => Ok(self), LogicalPlan::TableScan(_) => Ok(self), LogicalPlan::EmptyRelation(_) => Ok(self), LogicalPlan::Statement(_) => Ok(self), @@ -1072,6 +1078,14 @@ impl LogicalPlan { input: Arc::new(input), })) } + LogicalPlan::Execute(Execute { name, schema, .. }) => { + self.assert_no_inputs(inputs)?; + Ok(LogicalPlan::Execute(Execute { + name: name.clone(), + schema: Arc::clone(schema), + parameters: expr, + })) + } LogicalPlan::TableScan(ts) => { self.assert_no_inputs(inputs)?; Ok(LogicalPlan::TableScan(TableScan { @@ -1330,6 +1344,7 @@ impl LogicalPlan { | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Prepare(_) + | LogicalPlan::Execute(_) | LogicalPlan::Statement(_) | LogicalPlan::Extension(_) => None, } @@ -1933,6 +1948,9 @@ impl LogicalPlan { }) => { write!(f, "Prepare: {name:?} {data_types:?} ") } + LogicalPlan::Execute(Execute { name, parameters, .. }) => { + write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters)) + } LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } @@ -2599,6 +2617,27 @@ pub struct Prepare { pub input: Arc, } +/// Execute a prepared statement. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Execute { + /// The name of the prepared statement to execute + pub name: String, + /// The execute parameters + pub parameters: Vec, + /// Dummy schema + pub schema: DFSchemaRef, +} + +// Comparison excludes the `schema` field. +impl PartialOrd for Execute { + fn partial_cmp(&self, other: &Self) -> Option { + match self.name.partial_cmp(&other.name) { + Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters), + cmp => cmp, + } + } +} + /// Describe the schema of table /// /// # Example output: diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 0658f7029740..ff2c1ec1d58f 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -38,10 +38,10 @@ //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions use crate::{ dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, - Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit, - LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort, - Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, - Window, + Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, + Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, + Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, + Values, Window, }; use std::ops::Deref; use std::sync::Arc; @@ -363,6 +363,7 @@ impl TreeNode for LogicalPlan { | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } + | LogicalPlan::Execute { .. } | LogicalPlan::DescribeTable(_) => Transformed::no(self), }) } @@ -505,6 +506,9 @@ impl LogicalPlan { .chain(fetch.iter()) .map(|e| e.deref()) .apply_until_stop(f), + LogicalPlan::Execute(Execute { parameters, .. }) => { + parameters.iter().apply_until_stop(f) + } // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) @@ -734,6 +738,20 @@ impl LogicalPlan { }) }) } + LogicalPlan::Execute(Execute { + parameters, + name, + schema, + }) => parameters + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|parameters| { + LogicalPlan::Execute(Execute { + parameters, + name, + schema, + }) + }), // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::Unnest(_) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index ee9ae9fb15a7..4fe22d252744 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -553,7 +553,8 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Prepare(_) => { + | LogicalPlan::Prepare(_) + | LogicalPlan::Execute(_) => { // This rule handles recursion itself in a `ApplyOrder::TopDown` like // manner. plan.map_children(|c| self.rewrite(c, config))? diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 94c04d6328ed..ec2225bbc042 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -348,7 +348,8 @@ fn optimize_projections( | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) - | LogicalPlan::DescribeTable(_) => { + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Execute(_) => { // These operators have no inputs, so stop the optimization process. return Ok(Transformed::no(plan)); } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index b90ae88aa74a..1993598f5cf7 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1633,6 +1633,9 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::RecursiveQuery(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for RecursiveQuery", )), + LogicalPlan::Execute(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for Execute", + )), } } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index abb9912b712a..00949aa13ae1 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -48,7 +48,7 @@ use datafusion_expr::{ CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody, CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation, - Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, + Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, @@ -642,6 +642,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { input: Arc::new(plan), })) } + Statement::Execute { + name, + parameters, + using, + } => { + // `USING` is a MySQL-specific syntax and currently not supported. + if !using.is_empty() { + return not_impl_err!( + "Execute statement with USING is not supported" + ); + } + + let empty_schema = DFSchema::empty(); + let parameters = parameters + .into_iter() + .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context)) + .collect::>>()?; + + Ok(LogicalPlan::Execute(Execute { + name: ident_to_string(&name), + parameters, + schema: DFSchemaRef::new(empty_schema), + })) + } Statement::ShowTables { extended, diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 6348aba49082..8167ddacffb4 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -112,6 +112,7 @@ impl Unparser<'_> { | LogicalPlan::Analyze(_) | LogicalPlan::Extension(_) | LogicalPlan::Prepare(_) + | LogicalPlan::Execute(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) diff --git a/datafusion/sqllogictest/test_files/prepare.slt b/datafusion/sqllogictest/test_files/prepare.slt index ce4b7217f990..e306ec7767c7 100644 --- a/datafusion/sqllogictest/test_files/prepare.slt +++ b/datafusion/sqllogictest/test_files/prepare.slt @@ -80,3 +80,21 @@ PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id, SUM(age) FROM person statement error PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter); + +# test creating logical plan for EXECUTE statements +query TT +EXPLAIN EXECUTE my_plan; +---- +logical_plan Execute: my_plan params=[] + +query TT +EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo'); +---- +logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")] + +query error DataFusion error: Schema error: No field named a\. +EXPLAIN EXECUTE my_plan(a); + +# TODO: support EXECUTE queries +query error DataFusion error: This feature is not implemented: Unsupported logical plan: Execute +EXECUTE my_plan;