Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split output batches of joins that do not respect batch size #12969

Merged
merged 11 commits into from
Oct 18, 2024

Conversation

alihan-synnada
Copy link
Contributor

Which issue does this PR close?

Closes #12633

Rationale for this change

A join operation chain can create a RecordBatch whose size is thousands or even millions of rows.

What changes are included in this PR?

Adds a new config called enforce_batch_size_in_joins that is disabled by default. Enabling the config restricts the maximum output batch size of join operators to batch_size. #12634 is similar but it splits the join indices and then builds the output batches, which causes performance issues. This PR splits the output batches after the join is processed.

Improves adjust_indices_by_join_type performance by optimizing PrimitiveArray concatenation using MutableArrayData

Are these changes tested?

Includes unit tests for BatchSplitter

Are there any user-facing changes?

Users can optionally enable enforce_batch_size_in_joins in cases where joins cause out-of-memory. No breaking changes

@github-actions github-actions bot added documentation Improvements or additions to documentation physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) common Related to common crate execution Related to the execution crate labels Oct 16, 2024
@alamb
Copy link
Contributor

alamb commented Oct 16, 2024

fyi @mhilton

@alamb
Copy link
Contributor

alamb commented Oct 16, 2024

I will review this later today

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this carefully and it looks good to me.

This doesn't cause any performance issues but solves the "growing batch sizes" problem in plans with cascaded joins

.iter()
.chain(right_unmatched_indices.iter())
.collect();
let mut new_right_indices = MutableArrayData::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use Vec<u64> and Vec<u32> instead of MutableArrayData

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the following code but it's about 200% slower. Is there a more optimized way to use Vec in this context?

// the new left indices: left_indices + null array
let mut new_left_indices = Vec::with_capacity(left_size + unmatched_size);
new_left_indices.extend(left_indices.values().iter().map(|v| Some(*v)));
new_left_indices.append(&mut vec![None; unmatched_size]);
let new_left_indices = UInt64Array::from(new_left_indices);

// the new right indices: right_indices + right_unmatched_indices
let mut new_right_indices = Vec::with_capacity(right_size + unmatched_size);
new_right_indices.extend(right_indices.values().iter());
new_right_indices.extend(right_unmatched_indices.values().iter());
let new_right_indices = UInt32Array::from(new_right_indices);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm you're right, sorry I missed that it was producing nulls, in that case this is not very efficient indeed.

Seems though we can just use https://docs.rs/arrow/latest/arrow/array/struct.PrimitiveBuilder.html then instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I ended up using into_builder(), it's incredibly fast now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good news: It runs much faster with into_builder.
Bad news: It seems like we actually exercise the internal_err (i.e. ref count > 2 to the underlying buffer) in some cases. We will debug and finalize (hopefully) tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: I think we might be even better off in future to pass the (owned) primitive builders to the different methods instead of trying to convert the arrays to builders again, this probably saves some copies in other places as well.

@mhilton
Copy link
Contributor

mhilton commented Oct 17, 2024

Unfortunately this doesn't address the actual problem with creating giant batches, which is they require a lot of memory and that memory isn't accounted for in any MemoryPool. Wiring a MemoryReservation into BatchSplitter would probably be enough to address this though.

@ozankabak
Copy link
Contributor

Wiring a MemoryReservation into BatchSplitter would probably be enough to address this though.

Thanks for bringing this to our attention. We will add this in a quick follow-on PR.

@andygrove andygrove mentioned this pull request Oct 17, 2024
4 tasks
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting -- thank you @alihan-synnada and @ozankabak

I wonder if you have considered updating the join algorithms themselves to incrementally produce output (rather than generating one large RecordBatch and the slicing it up?)

We found in the GroupBy that the slicing requires non trivial time -- see #9562 and the POC by @Rachelint in #11943

datafusion/common/src/config.rs Outdated Show resolved Hide resolved
indices_cache,
right_side_ordered,
state: NestedLoopJoinStreamState::WaitBuildSide,
batch_transformer: BatchSplitter::new(batch_size),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a clever idea to parameterize the joins stream on the transformer 👍

datafusion/physical-plan/src/joins/utils.rs Show resolved Hide resolved
null_equals_null: self.null_equals_null,
state: SHJStreamState::PullRight,
reservation,
batch_transformer: BatchSplitter::new(batch_size),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the ovehead of a BatchTransformer is likely small (one function call per output batch). I suggest trying to use a dyn trait object here and in the other joins instead (e.g. batch_transformer: Box<dyn BatchTransformer>)

I suspect it would not make any noticable performance difference

0,
"expected left indices to have no nulls"
);
builder.append_slice(left_indices.values());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if at this point the left array Arc can be dropped such that the right side can be converted to builder without issue.

@ozankabak
Copy link
Contributor

I will go ahead and merge this soon unless someone catches a critical issue.

Here are the things to work on in the near future:

  1. Add MemoryReservation to batch splitting to add rigorous memory accountability.
  2. Exploring if we can pass builders into methods instead of passing arrays around and converting them into builders when possible.
  3. Considering using thebatch_transformer: Box<dyn BatchTransformer> pattern if it doesn't regress performance

@ozankabak ozankabak merged commit 87e931c into apache:main Oct 18, 2024
25 checks passed
@alamb
Copy link
Contributor

alamb commented Oct 18, 2024

I will go ahead and merge this soon unless someone catches a critical issue.

Thank you -- I agree this is better than what was on main

Here are the things to work on in the near future:

  1. Add MemoryReservation to batch splitting to add rigorous memory accountability.
  2. Exploring if we can pass builders into methods instead of passing arrays around and converting them into builders when possible.
  3. Considering using thebatch_transformer: Box<dyn BatchTransformer> pattern if it doesn't regress performance

I filed #13003 to track adding memory accounting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate documentation Improvements or additions to documentation execution Related to the execution crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NestedLoopJoinExec can create excessively large record batches
5 participants