Skip to content

Commit

Permalink
Adding GroupedWindowAggStream
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc committed Aug 15, 2024
1 parent 4aa8f50 commit 4bd931c
Show file tree
Hide file tree
Showing 17 changed files with 2,034 additions and 90 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ rocksdb = "0.22.0"
bincode = "1.3.3"
half = "2.4.1"
delegate = "0.12.0"
ahash = "0.8.11"
hashbrown = "0.14.5"
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::error::Result;

use crate::physical_plan::streaming_window::FranzStreamingWindowExec;
use crate::physical_plan::continuous::streaming_window::FranzStreamingWindowExec;

pub struct CoaslesceBeforeStreamingAggregate {}

Expand Down
126 changes: 126 additions & 0 deletions crates/core/src/physical_plan/continuous/group_values/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
use datafusion::common::Result;
use datafusion::{
logical_expr::EmitTo, physical_expr::binary_map::OutputType,
physical_expr_common::binary_map::ArrowBytesMap,
};

use super::GroupValues;
/// A [`GroupValues`] storing single column of Utf8/LargeUtf8/Binary/LargeBinary values
///
/// This specialization is significantly faster than using the more general
/// purpose `Row`s format
pub struct GroupValuesByes<O: OffsetSizeTrait> {
/// Map string/binary values to group index
map: ArrowBytesMap<O, usize>,
/// The total number of groups so far (used to assign group_index)
num_groups: usize,
}

impl<O: OffsetSizeTrait> GroupValuesByes<O> {
pub fn new(output_type: OutputType) -> Self {
Self {
map: ArrowBytesMap::new(output_type),
num_groups: 0,
}
}
}

impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
assert_eq!(cols.len(), 1);

// look up / add entries in the table
let arr = &cols[0];

groups.clear();
self.map.insert_if_new(
arr,
// called for each new group
|_value| {
// assign new group index on each insert
let group_idx = self.num_groups;
self.num_groups += 1;
group_idx
},
// called for each group
|group_idx| {
groups.push(group_idx);
},
);

// ensure we assigned a group to for each row
assert_eq!(groups.len(), arr.len());
Ok(())
}

fn size(&self) -> usize {
self.map.size() + std::mem::size_of::<Self>()
}

fn is_empty(&self) -> bool {
self.num_groups == 0
}

fn len(&self) -> usize {
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

let group_values = match emit_to {
EmitTo::All => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) if n == self.len() => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) => {
// if we only wanted to take the first n, insert the rest back
// into the map we could potentially avoid this reallocation, at
// the expense of much more complex code.
// see https://github.com/apache/datafusion/issues/9195
let emit_group_values = map_contents.slice(0, n);
let remaining_group_values = map_contents.slice(n, map_contents.len() - n);

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);

emit_group_values
}
};

Ok(vec![group_values])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
// in theory we could potentially avoid this reallocation and clear the
// contents of the maps, but for now we just reset the map from the beginning
self.map.take();
}
}
129 changes: 129 additions & 0 deletions crates/core/src/physical_plan/continuous/group_values/bytes_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{Array, ArrayRef, RecordBatch};
use datafusion::{
common::Result, logical_expr::EmitTo, physical_expr::binary_map::OutputType,
physical_expr_common::binary_view_map::ArrowBytesViewMap,
};

use super::GroupValues;
//use datafusion_expr::EmitTo;
//use datafusion_physical_expr::binary_map::OutputType;
//use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap;

/// A [`GroupValues`] storing single column of Utf8View/BinaryView values
///
/// This specialization is significantly faster than using the more general
/// purpose `Row`s format
pub struct GroupValuesBytesView {
/// Map string/binary values to group index
map: ArrowBytesViewMap<usize>,
/// The total number of groups so far (used to assign group_index)
num_groups: usize,
}

impl GroupValuesBytesView {
pub fn new(output_type: OutputType) -> Self {
Self {
map: ArrowBytesViewMap::new(output_type),
num_groups: 0,
}
}
}

impl GroupValues for GroupValuesBytesView {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
assert_eq!(cols.len(), 1);

// look up / add entries in the table
let arr = &cols[0];

groups.clear();
self.map.insert_if_new(
arr,
// called for each new group
|_value| {
// assign new group index on each insert
let group_idx = self.num_groups;
self.num_groups += 1;
group_idx
},
// called for each group
|group_idx| {
groups.push(group_idx);
},
);

// ensure we assigned a group to for each row
assert_eq!(groups.len(), arr.len());
Ok(())
}

fn size(&self) -> usize {
self.map.size() + std::mem::size_of::<Self>()
}

fn is_empty(&self) -> bool {
self.num_groups == 0
}

fn len(&self) -> usize {
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

let group_values = match emit_to {
EmitTo::All => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) if n == self.len() => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) => {
// if we only wanted to take the first n, insert the rest back
// into the map we could potentially avoid this reallocation, at
// the expense of much more complex code.
// see https://github.com/apache/datafusion/issues/9195
let emit_group_values = map_contents.slice(0, n);
let remaining_group_values = map_contents.slice(n, map_contents.len() - n);

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);

emit_group_values
}
};

Ok(vec![group_values])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
// in theory we could potentially avoid this reallocation and clear the
// contents of the maps, but for now we just reset the map from the beginning
self.map.take();
}
}
97 changes: 97 additions & 0 deletions crates/core/src/physical_plan/continuous/group_values/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::record_batch::RecordBatch;
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
use bytes_view::GroupValuesBytesView;
use datafusion::common::Result;

pub(crate) mod primitive;
use datafusion::logical_expr::EmitTo;
use datafusion::physical_expr::binary_map::OutputType;
use primitive::GroupValuesPrimitive;

mod row;
use row::GroupValuesRows;

mod bytes;
mod bytes_view;
use bytes::GroupValuesByes;
//use datafusion_physical_expr::binary_map::OutputType;

/// An interning store for group keys
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;

/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;

/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;

/// The number of values stored in this [`GroupValues`]
fn len(&self) -> usize;

/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}

pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();

macro_rules! downcast_helper {
($t:ty, $d:ident) => {
return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
};
}

downcast_primitive! {
d => (downcast_helper, d),
_ => {}
}

match d {
DataType::Utf8 => {
return Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
}
DataType::LargeUtf8 => {
return Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Utf8)));
}
DataType::Utf8View => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
}
DataType::Binary => {
return Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Binary)));
}
DataType::LargeBinary => {
return Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Binary)));
}
DataType::BinaryView => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
}
_ => {}
}
}

Ok(Box::new(GroupValuesRows::try_new(schema)?))
}
Loading

0 comments on commit 4bd931c

Please sign in to comment.