Skip to content

Commit

Permalink
Fix:fix HashJoin projection swap (#12967)
Browse files Browse the repository at this point in the history
* swap_hash_join works with joins with projections

* use non swapped hash join's projection

* clean up

* fix hashjoin projection swap.

* assert hashjoinexec.

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Eduard Karacharov <[email protected]>

* fix clippy.

---------

Co-authored-by: Onur Satici <[email protected]>
Co-authored-by: Eduard Karacharov <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent 87e931c commit e9435a9
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ pub fn swap_hash_join(
partition_mode,
hash_join.null_equals_null(),
)?;
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
hash_join.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
) {
) || hash_join.projection.is_some()
{
Ok(Arc::new(new_join))
} else {
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
Expand Down Expand Up @@ -1287,6 +1289,33 @@ mod tests_statistical {
);
}

#[tokio::test]
async fn test_hash_join_swap_on_joins_with_projections() -> Result<()> {
let (big, small) = create_big_and_small();
let join = Arc::new(HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
vec![(
Arc::new(Column::new_with_schema("big_col", &big.schema())?),
Arc::new(Column::new_with_schema("small_col", &small.schema())?),
)],
None,
&JoinType::Inner,
Some(vec![1]),
PartitionMode::Partitioned,
false,
)?);
let swapped = swap_hash_join(&join.clone(), PartitionMode::Partitioned)
.expect("swap_hash_join must support joins with projections");
let swapped_join = swapped.as_any().downcast_ref::<HashJoinExec>().expect(
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
);
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
assert_eq!(swapped.schema().fields.len(), 1);
assert_eq!(swapped.schema().fields[0].name(), "small_col");
Ok(())
}

#[tokio::test]
async fn test_swap_reverting_projection() {
let left_schema = Schema::new(vec![
Expand Down

0 comments on commit e9435a9

Please sign in to comment.