Skip to content

Commit

Permalink
nested loop join issue 12633 test
Browse files Browse the repository at this point in the history
Add a test that exercises the large batch size issue described in
issue apache#12633. This was a code review request.
  • Loading branch information
mhilton committed Sep 27, 2024
1 parent e0dadee commit 0fa1977
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,4 +1624,58 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_issue_12633() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
let batches = vec![
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]))],
)?,
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]))],
)?,
];
let left = MemoryExec::try_new(&[batches.clone()], schema.clone(), None)?;
let right = MemoryExec::try_new(&[batches.clone()], schema.clone(), None)?;

let (schema, column_indices) =
build_join_schema(schema.as_ref(), schema.as_ref(), &JoinType::Full);

let filter = JoinFilter::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("v", 0)),
Operator::NotEq,
Arc::new(Column::new("v", 1)),
)),
column_indices,
schema,
);

let config = SessionConfig::new().with_batch_size(5);
let task_ctx = Arc::new(TaskContext::default().with_session_config(config));

let (_, batches) = multi_partitioned_join_collect(
Arc::new(left),
Arc::new(right),
&JoinType::Full,
Some(filter),
task_ctx,
)
.await
.unwrap();

let rows = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
assert_eq!(rows, 90);
let max_rows = batches
.iter()
.map(|batch| batch.num_rows())
.max()
.unwrap_or(0);
assert!(max_rows <= 5);

Ok(())
}
}

0 comments on commit 0fa1977

Please sign in to comment.