-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Infer data type from schema for Values
and add struct coercion to coalesce
#12864
Conversation
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
13)------CoalesceBatchesExec: target_batch_size=2 | ||
14)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 | ||
15)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
16)------------MemoryExec: partitions=1, partition_sizes=[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not pretty sure about this kind of change 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In main
branch, when you create table with values, RoundRobinBatch
is applied to it because we have cast expr. The value is i64 by default, so when we have int column, we need to cast to i32.
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
let all_simple_exprs = self
.expr
.iter()
.all(|(e, _)| e.as_any().is::<Column>() || e.as_any().is::<Literal>());
// If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename,
// and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
vec![!all_simple_exprs]
}
But, in this change, it is already cast in Value, so there is no cast expr in Projection, so it is like MemoryExec: partitions=1, partition_sizes=[1]
instead of MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
.
Therefore, I think the plan makes sense to me.
@@ -3579,8 +3580,7 @@ physical_plan | |||
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | |||
09)----------------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@6 as sum_amount] | |||
10)------------------BoundedWindowAggExec: wdw=[sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] | |||
11)--------------------CoalescePartitionsExec | |||
12)----------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] | |||
11)--------------------MemoryExec: partitions=1, partition_sizes=[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like improvement
@@ -3360,7 +3360,8 @@ physical_plan | |||
05)--------CoalesceBatchesExec: target_batch_size=4 | |||
06)----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 | |||
07)------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] | |||
08)--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] | |||
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | |||
09)----------------MemoryExec: partitions=1, partition_sizes=[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems equivalent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(for other readers the explanation of why this changed is below at https://github.com/apache/datafusion/pull/12864/files#r1797538174)
If this change is too large, I can try to split this to several one |
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Hi @jayzhan211, what is the current status of this PR? I have time to review it if it is ready |
This is ready for review, I forgot to turn on it. |
Conflict again |
Signed-off-by: jayzhan211 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jayzhan211 and @findepi
It seems to me that this PR is an improvement over what is on main, and thus makes sense to merge in.
I had some API / comment suggestions, but I also think they are not required.
@@ -177,7 +179,7 @@ impl LogicalPlanBuilder { | |||
/// so it's usually better to override the default names with a table alias list. | |||
/// | |||
/// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. | |||
pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> { | |||
pub fn values(values: Vec<Vec<Expr>>, schema: Option<&DFSchemaRef>) -> Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 This makes a lot of sense to me to pass in the schema too if knon.
I think it might be nicer on users if we didn't make an API change and instead added a new API like
pub fn values_with_schema(values: Vec<Vec<Expr>>, schema: Option<&DFSchemaRef>) -> Result<Self> {
...
}
Even if we also deprecated values
it would help users prepare for upgrade
} | ||
} | ||
|
||
fn infer_from_schema(values: Vec<Vec<Expr>>, schema: &DFSchema) -> Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can give this a name to make it clear it is related to VALUES
processing
fn infer_from_schema(values: Vec<Vec<Expr>>, schema: &DFSchema) -> Result<Self> { | |
fn infer_values_from_schema(values: Vec<Vec<Expr>>, schema: &DFSchema) -> Result<Self> { |
datafusion/sql/src/planner.rs
Outdated
@@ -154,6 +156,7 @@ impl PlannerContext { | |||
ctes: HashMap::new(), | |||
outer_query_schema: None, | |||
outer_from_schema: None, | |||
table_schema: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can name it create_table_schema
to reflect that it is used for CREATE TABLE
processing?
@@ -3360,7 +3360,8 @@ physical_plan | |||
05)--------CoalesceBatchesExec: target_batch_size=4 | |||
06)----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 | |||
07)------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] | |||
08)--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] | |||
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | |||
09)----------------MemoryExec: partitions=1, partition_sizes=[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(for other readers the explanation of why this changed is below at https://github.com/apache/datafusion/pull/12864/files#r1797538174)
@@ -392,12 +392,12 @@ create table t(a struct<r varchar, c int>, b struct<r varchar, c float>) as valu | |||
query T | |||
select arrow_typeof([a, b]) from t; | |||
---- | |||
List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) | |||
List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems much more correct to me -- "c" is float
not Int32
👍
select * from t; | ||
---- | ||
{c0: a, c1: 1.0} | ||
{c0: b, c1: 2.3} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
I merged up from main and resolved a clippy failure |
datafusion/sql/src/planner.rs
Outdated
@@ -154,6 +156,7 @@ impl PlannerContext { | |||
ctes: HashMap::new(), | |||
outer_query_schema: None, | |||
outer_from_schema: None, | |||
table_schema: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can name it
create_table_schema
to reflect that it is used forCREATE TABLE
processing?
that would work too
datafusion/sql/src/values.rs
Outdated
let schema = planner_context | ||
.table_schema() | ||
.unwrap_or(Arc::new(DFSchema::empty())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's good point. I think sql_to_expr
must still get empty schema. -- The VALUES being constructed cannot refer to columns of the table being created.
datafusion/sql/src/values.rs
Outdated
if schema.fields().is_empty() { | ||
LogicalPlanBuilder::values(values, None)?.build() | ||
} else { | ||
LogicalPlanBuilder::values(values, Some(&schema))?.build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we know that VALUES being processed here should actually obey the schema of the "main table" of the query?
My point here is that doing what we're doing here is legal only for certain query shapes involving CREATE TABLE + VALUES (eg example above), but is not applicable to other query shapes involving CREATE TABLE and VALUES (e.g. CREATE TABLE + SELECT .... FROM VALUES)
Where is the logic guaranteeing that table-inferred schema gets applied in the former case, but not in the latter?
Signed-off-by: jayzhan211 <[email protected]>
@@ -417,7 +417,7 @@ mod tests { | |||
Box::new(lit(1)), | |||
)); | |||
let values = vec![vec![expr1, expr2]]; | |||
let plan = LogicalPlanBuilder::values(values, None)?.build()?; | |||
let plan = LogicalPlanBuilder::values_with_schema(values, None)?.build()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just values(values)
since schema is None?
@@ -282,7 +282,7 @@ impl AsLogicalPlan for LogicalPlanNode { | |||
.map_err(|e| e.into()) | |||
}?; | |||
|
|||
LogicalPlanBuilder::values(values, None)?.build() | |||
LogicalPlanBuilder::values_with_schema(values, None)?.build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just values(values)
since schema is None?
This should fail to build the initial plan. Instead it fails at some later stages:
It seems it'sa regression |
This is invalid query, but it succeeds
It seems it'sa regression |
Signed-off-by: jayzhan211 <[email protected]>
@findepi what is your suggested path forward? From my perspective this PR improves several cases (queries that should run, but currently do not on So my personal suggestion is
|
datafusion/sql/src/values.rs
Outdated
if schema.fields().is_empty() { | ||
LogicalPlanBuilder::values_with_schema(values, None)?.build() | ||
} else { | ||
LogicalPlanBuilder::values_with_schema(values, Some(&schema))?.build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This remains correct only for VALUES that are direct input to CREATE TABLE.
I am not convinced that that code we eventually should have will be natural evolution of the code here, so it's hard for me to judge whether this is a step in the right direction. How should this be solved? I think this is a question to you @alamb more than me. But I can try to guess. For example, when planning CREATE TABLE we could check whether it's input is VALUES and bypass generic What are your thoughts? |
In my mind the additional test cases that pass with this code but not on main represent the step forward The internal implementation (aka the code in this PR) may well change over time / need a different structure than what is in this PR, but the end interface (aka what SQL can run / what the plans are) should be the same. |
I think as long as there is no regression we should move on and file the ticket for the remaining issue. I guess what @findepi metioned is something like
I would be nice if there is an example for this. I think it is something like |
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Sorry for not following earlier, was on a full-day event yesterday. There are regressions. Before the change
on current
|
It seems the query is accidentally correct in before this change, because we don't know the result of the function when we build up Values plan. After this change, incorrect values ('abcd') is cast to column a instead of the result of the function. The ideally solution is to find the result type of the function and check whether it matches the column type. btw, I wonder is this query valid in postgres or elsewhere? Valid query in postgres Invalid query in postgres
It seems there is no way to insert value together with table if the column type is defined 🤔 |
Values plan is build for What was accidental about this? |
I think mistakenly output the values on the wrong branch. Let's find out such a valid query in postgres to make sure we need to support this kind of query in datafusion. And add the test to ensure the coverage. |
I have filed #13124 to track the issues raised above explicitly |
thank you @alamb |
Which issue does this PR close?
Closes #5046 .
Follow up from #12839
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?