Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize take/filter from multiple input arrays to a single large output array #6692

Open
alamb opened this issue Nov 5, 2024 · 21 comments
Open
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@alamb
Copy link
Contributor

alamb commented Nov 5, 2024

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Upstream in DataFusion, there is a common common pattern where we have multiple input RecordBatches and want to produce an output RecordBatch with some subset of the rows from the input batches. This happens in

  • FilterExec --> CoalesceBatchesExec when filtering
  • RepartitionExec --> CoalesceBatchesExec

The kernels used here are:

  • FilterExec uses filter, takes a single input Array and produces a single output Array
  • RepartitionExec uses take, which also takes a single input Array and produces a single output Array``RepartitionExeceach take a single input batch and produce a single output Array
  • CoalesceBatchesExec calls concat which takes multple Arrays and produces a single Array as output

The use of these kernels and patterns has two downsides:

  1. Performance overhead due to a second copy: Calling filter/take immediately copies the data, which is copied again in CoalesceBatches (see illustration below)
  2. Memory Overhead / Performance Overhead for GarbageCollecting StringView: Buffering up several RecordBatches with StringView may consume significant amounts of memory for mostly filtered rows, which requires us to run gc periodically which actually slows some things down (see Reduce copying in CoalesceBatchesExec for StringViews datafusion#11628)

Here is an ascii art picture (from apache/datafusion#7957) that shows the extra copy in action


┌────────────────────┐        Filter                                                                          
│                    │                    ┌────────────────────┐            Coalesce                          
│                    │    ─ ─ ─ ─ ─ ─ ▶   │    RecordBatch     │             Batches                          
│    RecordBatch     │                    │   num_rows = 234   │─ ─ ─ ─ ─ ┐                                   
│  num_rows = 8000   │                    └────────────────────┘                                              
│                    │                                                    │                                   
│                    │                                                                ┌────────────────────┐  
└────────────────────┘                                                    │           │                    │  
┌────────────────────┐                    ┌────────────────────┐                      │                    │  
│                    │        Filter      │                    │          │           │                    │  
│                    │                    │    RecordBatch     │           ─ ─ ─ ─ ─ ▶│                    │  
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   num_rows = 500   │─ ─ ─ ─ ─ ┐           │                    │  
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │  
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │  
│                    │                    └────────────────────┘                      │                    │  
└────────────────────┘                                                                │                    │  
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │  
          ...                   ...                                       │           │                    │  
                                                                                      │                    │  
┌────────────────────┐                                                    │           └────────────────────┘  
│                    │                    ┌────────────────────┐                                              
│                    │       Filter       │                    │          │                                   
│    RecordBatch     │                    │    RecordBatch     │                                              
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   num_rows = 333   │─ ─ ─ ─ ─ ┘                                   
│                    │                    │                    │                                              
│                    │                    └────────────────────┘                                              
└────────────────────┘                                                                                        
                                                                                                              
                      FilterExec                                          RepartitonExec copies the data      
                      creates output batches with copies                  *again* to form final large         
                      of  the matching rows (calls take()                 RecordBatches                       
                      to make a copy)                                                                         
                                                                                                              

Describe the solution you'd like

I would like to apply filter/take to each incoming RecordBatch as it arrives, copying the data to an in progress output array, in a way that is as fast as the filter and take operations. This would reduce the extra copy that is currently required.

Note this is somewhat like the interleave kernel, except that

  1. We only need the output rows to be in the same order as the input batches (so the second usize batch index is not needed)
  2. We don't want to have to buffer all the input

Describe alternatives you've considered

One thing I have thought about is extending the builders so they can append more than one row at a time. For example:

  • Builder::append_filtered
  • Builder::append_take

So for example, to filter a stream of StringViewArrays I might do something like;

let mut builder = StringViewBuilder::new();
while let Some(input) = stream.next() {
  // compute some subset of input rows that make it to the output
  let filter: BooleanArray = compute_filter(&input, ....); 
  // append all rows from input where filter[i] is true
  builder.append_filtered(&input, &filter);
}

And also add an equivalent for append_take

I think if we did this right, it wouldn't be a lot of new code, we could just refactor the existing filter/take implementations. For example, I would expect that the filter kernel would then devolve into something like

fn filter(..) {
  match data_type {
    DataType::Int8 => Int8Builder::with_capacity(...)
      .append_filter(input, filter)
      .build()
...
}

Additional context

@alamb alamb added the enhancement Any new improvement worthy of a entry in the changelog label Nov 5, 2024
@alamb alamb changed the title Optimize take/filter from multiple input arrays to a single output array Optimize take/filter from multiple input arrays to a single large output array Nov 5, 2024
@alamb
Copy link
Contributor Author

alamb commented Nov 5, 2024

FYI

@devanbenz
Copy link

@alamb just to clarify your idea is to modify the existing take & filter kernels. Not create a new one right?

@jayzhan211
Copy link
Contributor

@alamb just to clarify your idea is to modify the existing take & filter kernels. Not create a new one right?

If creating a new one helps, no reason not to do it.

@tustvold
Copy link
Contributor

tustvold commented Nov 7, 2024

I think the idea is sound in principle, but needs a concrete API proposal.

I'm not sure the proposed builder API makes sense, as the typing for nested types like ListBuilder and DictionaryBuilder is not what we want here, and they can't easily be type erased. We also ideally want to avoid overly bloating the arrow-array crate with kernel logic. This isn't even touching on the fact these kernels don't use the builders for performance reasons.

I think we'd need to introduce a new type-erased MutableArray abstraction or something, potentially replacing the rather problematic MutableArrayData.

The only remaining challenge concerns dictionaries, as the output dictionary needs to be computed up front. Simply not supporting dictonaries could potentially be a valid workaround though.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Nov 7, 2024

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

@devanbenz
Copy link

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

@jayzhan211
Copy link
Contributor

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one

@devanbenz
Copy link

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one

That makes sense. This would be a change downstream within Datafusion then correct?

@jayzhan211
Copy link
Contributor

jayzhan211 commented Nov 7, 2024

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one

That makes sense. This would be a change downstream within Datafusion then correct?

yes, you can work on it if you want to

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2024

So it sounds like the consensus is to work out how this might look downstream in DataFusion (maybe starting with StringView as that is what is giving us the most trouble now) and then use some of that knowledge to propose something upstream in Arrow -- sounds like a good idea to me

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2024

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

@jayzhan211 yes I think this is effectively what would happen -- however the actual iteration over filtered values is quite optimized in the filter kernel (checkout what the FilterBuilder does) based on how many values are filtered and other aspect

The fact that filter is so fast in arrow means it is quite hard to get as good / faster :)

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2024

@tustvold

I'm not sure the proposed builder API makes sense, as the typing for nested types like ListBuilder and DictionaryBuilder is not what we want here, and they can't easily be type erased. We also ideally want to avoid overly bloating the arrow-array crate with kernel logic.

This is reasonable -- though I could imagine adding type erased builders like DynListBuilder for this usecase

This isn't even touching on the fact these kernels don't use the builders for performance reasons.

Is there some fundamental reason the builders can't made faster? If we could make the builders fast enough to use for filter that would seem to be valuable in its own right. But I am likely just dreaming here

The only remaining challenge concerns dictionaries, as the output dictionary needs to be computed up front. Simply not supporting dictonaries could potentially be a valid workaround though.

A builder based approach could help (e.g. optimize for the case where the input batches had the same dictionary and handle the case where they didn't -- either via deferred computation or on the fly or something else)

@Rachelint
Copy link
Contributor

Rachelint commented Nov 7, 2024

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

Does approach seems like that for filter?

  • loop the input array and input predicate
  • get the value of needed row(true in input predicate), and append_value to the builder

And at least, we can avoid generate multiple small batches, and concat them(a ton of copies) when big enough.

@tustvold
Copy link
Contributor

tustvold commented Nov 7, 2024

This is reasonable -- though I could imagine adding type erased builders like DynListBuilder for this usecase

This sort of partially type-erased API seems like the worst of both worlds, you either want something that is completely type-erased (e.g. MutableArrayData), or fully typed (e.g. ListBuilder).

I could see us adding some sort of MutableArray abstraction to arrow-select that allows appending values from arrays based on a mask and/or selection. This would be useful not just for this use-case, but potentially as a MutableBuffer abstraction for databases, etc... However, it would be very complex to implement, especially for dictionaries.

Is there some fundamental reason the builders can't made faster

Not without changing their APIs 😅. For the primitive builders one could simply move the current kernel implementations into the builders, but this doesn't really achieve much IMO.

A builder based approach could help (e.g. optimize for the case where the input batches had the same dictionary and handle the case where they didn't -- either via deferred computation or on the fly or something else)

Yeah, it gets very complicated and fiddly. A similar challenge likely exists for StringView, although I'm not sure what level of sophistication we've reached w.r.t automatic GC.

Does approach seems like that for filter?

That would be a very naive way to implement the filter kernel, I would encourage looking at what the selection kernels actually do.

@Rachelint
Copy link
Contributor

That would be a very naive way to implement the filter kernel, I would encourage looking at what the selection kernels actually do.

I agree with it seems a naive version for filter.

Is it possible to public something like filter_native but return a iterator, then we can reduce some copied in downstream and reuse the well optimized filter in arrow:

// current
filter --> intemediate buffer in array --> final buffer

// optimized
filter --> final buffer

@jayzhan211
Copy link
Contributor

jayzhan211 commented Nov 8, 2024

.add_buffers(array.data_buffers().to_vec());

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

@alamb
Copy link
Contributor Author

alamb commented Nov 8, 2024

.add_buffers(array.data_buffers().to_vec());

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

To be clear -- I think the copy is of a Vec<Buffer> (which is a Vec of pointers to the data)

@jayzhan211
Copy link
Contributor

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

I would try to implement a builder like in datafusion

@jayzhan211
Copy link
Contributor

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

Didn't see improvement on this approach apache/datafusion#13450

@alamb
Copy link
Contributor Author

alamb commented Nov 18, 2024

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

Didn't see improvement on this approach apache/datafusion#13450

I didn't have a chance to full look at apache/datafusion#13450 -- can you summarize what approach it implemented?

@jayzhan211
Copy link
Contributor

jayzhan211 commented Nov 19, 2024

The idea is to append the string view array as early as possible in the optimal memory usage to eliminate the need of garbage collection and aggregate those filtered rows in a single large batches instead of many small batches.

Current implementation

We first do Filter then Coalesce in two different operator. The coalescer push_batch try to do gc for string view type. And then concat those small batches again.

This PR

I try to combine Filter and Coalesce in Filter operator. And append string view type to the coalescer until the batch size (8192) or no more incoming batch. Then sent it to Coalesce which ideally there should do nothing and pass to the next operator.

This approach has additional cost of computing each view and lookup view's hash again, but eliminate the need of gc.

The result meet my assumption that has no much difference, but currently no further improvement in my mind too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

5 participants