Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinOptimization: Add build side pushdown to probe side #13054

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

Lordworms
Copy link
Contributor

Which issue does this PR close?

Closes #7955

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate common Related to common crate functions labels Oct 22, 2024
@Lordworms
Copy link
Contributor Author

Lordworms commented Oct 22, 2024

For query like

select count(*) from part join lineitem on l_partkey=p_partkey where p_partkey > 10;

with

set datafusion.execution.parquet.pushdown_filters = true;

and with data scale like
image

The execution time is 5 times faster

before:
before

after:
after

The script I used to generate test parquet files
Screenshot 2024-10-21 at 6 21 34 PM

Since datafusion is unlike duckdb, the default way to do join is to utilize partitions, I added an extra phase for aggregate build side information.

I tested on my M3Pro Mac, with cpu info like
image

@Lordworms
Copy link
Contributor Author

@Dandandan Sorry for the late response, I have reinvested it and format the code. Really appreciate your suggestion. Thanks a lot.

@Dandandan
Copy link
Contributor

Really nice @Lordworms , will have a good look later today.

return Ok(Transformed::yes(Arc::new(new_hash_join)));
}
Ok(Transformed::no(plan))
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could allow this cool optimization on any ExecutionPlan by adding the "with_dynamic_filter" to the ExecutionPlan?
Maybe also a "supports_dynamic_filter" to know when to call "with_dynamic_filter".
So same principle as the existing static filter pushdown setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense

Copy link
Contributor Author

@Lordworms Lordworms Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll. refactor this, but currently support ParquetExec first? Since I think for other PhysicalScanExec, the way to add dynamic filter is to add a FilterExec above, but in parquet, we could utilize the predicate to add filters dynamically?

@@ -711,10 +724,15 @@ impl DisplayAs for ParquetExec {
)
})
.unwrap_or_default();

let dynamic_filter =
format!("dynamic_filter: {:?}", self.dynamic_filters);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to only write this if available. + Would be good to implement Display for DynamicFilterInfo as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that, this is for debug use, I'll refactor it

}
}

impl PhysicalOptimizerRule for JoinFilterPushdown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed during plan time, won't it be reasonable to execute this during execution of HashJoin after build side is loaded?

data_types: Vec<&DataType>,
total_batches: usize,
) -> Result<Self, DataFusionError> {
let (max_accumulators, min_accumulators) = data_types
Copy link
Contributor

@Dandandan Dandandan Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use a logical plan / physical plan instead of manually invoking min/max accumulators? Not sure if that makes sense :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know either, First I want to try using min/max ScalarFunction, but the ScalaFunction translation from logical_expr to physical_expr is not implemented yet... I guess using accumulator would be more Intuitive?

}

inner.batch_count = inner.batch_count.saturating_sub(1);
if records.num_rows() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the input of a join partition be empty, in which case it will never be finalized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't try that, I'll add a test for that

Copy link
Contributor Author

@Lordworms Lordworms Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it happens sometime, haven't got a way to solve it yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new judge "if a partition id has been recorded and it return empty batch again, we treat it as an empty partition and return true"

let max_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(max_scalar));
let min_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(min_scalar));

let range_condition: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Copy link
Contributor

@Dandandan Dandandan Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to not create a dynamic filter in certain conditions, eg min is minimum value of data type, max is maximum of datatype (expression will not filter out any rows)?

let max_scalar = max_value.clone();
let min_scalar = min_value.clone();

let max_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(max_scalar));
Copy link
Contributor

@Dandandan Dandandan Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability, it's better to use LogicalPlan here and convert to physical? This might also support some optimizations for free (e.g. expression simplification)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.


struct DynamicFilterInfoInner {
max_accumulators: Vec<MaxAccumulator>,
min_accumulators: Vec<MinAccumulator>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just use batches: Vec<Vec<Arc<dyn Array>>> here and compute any filters from that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it pretty easy to support additional filters. It's pretty easy to add InListExpr filter based on this (I have some code locally).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I'll refactor it, after that maybe I can combine your commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, after changes have cooled down a bit I can update and share my branch.
Adding something like that on top is actually quite easy after making the make_set helper public:

+                let col =
+                    Arc::<datafusion_physical_expr::expressions::Column>::clone(column);
+                let batches: Vec<&dyn Array> = inner.batches[i]
+                    .iter()
+                    .map(|x| x as &dyn Array)
+                    .collect();
+
+                // use inlist rather than min/max, use some threshold to avoid big inputs
+                // TODO: big inputs could be using bloom filter
+                if batches.iter().map(|x|x.len()).sum::<usize>() < 100 {
+                    let batches = concat(batches.as_slice())?;
+                    let set = make_set(&batches)?;
+    
+                    let unique_condition = InListExpr::new(col, vec![], false, Some(set));
+                    condition = Arc::new(BinaryExpr::new(
+                        Arc::new(unique_condition),
                         Operator::And,
-                        range_condition,
-                    ))
-                        as Arc<dyn PhysicalExpr>)),
-                    None => Ok(Some(range_condition)),
+                        condition,
+                    ));
                 }
+                Ok(Some(condition))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that would be fun to try is creating a PhysicalExpr from the JoinHashMap - that way we can build a good filter without any overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that


let data_type = arrays[0].data_type();
match data_type {
DataType::Int8 => process_min_max!(arrays, Int8Array, Int8, i8),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently support numeric types columns(which adds a range filter), would support string types in next PR(add IN (xx, xx) filter)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String types could perform range as well, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it.

@berkaysynnada
Copy link
Contributor

Could you update the display of sources to include these pushed-down filters? I believe it would be more clear if we could see them clearly in the output (I'm also curious to see how this impacts the test plans). Additionally, are there any benchmark results you could share with us?

@Dandandan
Copy link
Contributor

Could you update the display of sources to include these pushed-down filters? I believe it would be more clear if we could see them clearly in the output (I'm also curious to see how this impacts the test plans). Additionally, are there any benchmark results you could share with us?

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

@berkaysynnada
Copy link
Contributor

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

Doesn’t JoinFilterPushdown: PhysicalOptimizerRule modify the dynamic_filters in ParquetExec during optimizations? My intention was to ensure the rule is functioning correctly and to prevent this feature from being silently broken in the future. Perhaps we could add some unit tests to verify this.

@Dandandan
Copy link
Contributor

Dandandan commented Oct 24, 2024

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

Doesn’t JoinFilterPushdown: PhysicalOptimizerRule modify the dynamic_filters in ParquetExec during optimizations? My intention was to ensure the rule is functioning correctly and to prevent this feature from being silently broken in the future. Perhaps we could add some unit tests to verify this.

Yes that's correct, just wanted to stress the actual filter isn't added during the optimization phase, only a "placeholder" that might be filled with some filter expressions during execution for particular columns. So it only shows ParquetExec might be filtered dynamically from the join values on certain columns, but the range or set of values will not be added at that point.

@Lordworms
Copy link
Contributor Author

Lordworms commented Oct 25, 2024

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

Doesn’t JoinFilterPushdown: PhysicalOptimizerRule modify the dynamic_filters in ParquetExec during optimizations? My intention was to ensure the rule is functioning correctly and to prevent this feature from being silently broken in the future. Perhaps we could add some unit tests to verify this.

Maybe I can show them in 'explain analyze'

Something like
image

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Oct 25, 2024
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation functions logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc)
4 participants