From 4bd931c23002eedc74483d583778cc9a8a132478 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Wed, 14 Aug 2024 17:43:34 -0700 Subject: [PATCH] Adding GroupedWindowAggStream --- Cargo.lock | 2 + crates/core/Cargo.toml | 2 + ...lesce_before_streaming_window_aggregate.rs | 2 +- .../continuous/group_values/bytes.rs | 126 +++++ .../continuous/group_values/bytes_view.rs | 129 +++++ .../continuous/group_values/mod.rs | 97 ++++ .../continuous/group_values/primitive.rs | 224 ++++++++ .../continuous/group_values/row.rs | 254 +++++++++ .../continuous/grouped_window_agg_stream.rs | 535 ++++++++++++++++++ .../core/src/physical_plan/continuous/mod.rs | 106 ++++ .../physical_plan/continuous/order/full.rs | 144 +++++ .../src/physical_plan/continuous/order/mod.rs | 124 ++++ .../physical_plan/continuous/order/partial.rs | 252 +++++++++ .../{ => continuous}/streaming_window.rs | 117 ++-- crates/core/src/physical_plan/mod.rs | 2 +- crates/core/src/planner/streaming_window.rs | 4 +- examples/examples/kafka_rideshare.rs | 4 +- 17 files changed, 2034 insertions(+), 90 deletions(-) create mode 100644 crates/core/src/physical_plan/continuous/group_values/bytes.rs create mode 100644 crates/core/src/physical_plan/continuous/group_values/bytes_view.rs create mode 100644 crates/core/src/physical_plan/continuous/group_values/mod.rs create mode 100644 crates/core/src/physical_plan/continuous/group_values/primitive.rs create mode 100644 crates/core/src/physical_plan/continuous/group_values/row.rs create mode 100644 crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs create mode 100644 crates/core/src/physical_plan/continuous/mod.rs create mode 100644 crates/core/src/physical_plan/continuous/order/full.rs create mode 100644 crates/core/src/physical_plan/continuous/order/mod.rs create mode 100644 crates/core/src/physical_plan/continuous/order/partial.rs rename crates/core/src/physical_plan/{ => continuous}/streaming_window.rs (91%) diff --git a/Cargo.lock b/Cargo.lock index 9720ae0..4babca3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1134,6 +1134,7 @@ dependencies = [ name = "df-streams-core" version = "0.1.0" dependencies = [ + "ahash", "apache-avro", "arrow", "arrow-array", @@ -1148,6 +1149,7 @@ dependencies = [ "delegate", "futures", "half", + "hashbrown", "itertools 0.13.0", "log", "rdkafka", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 44d23e9..7d68c0d 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -28,3 +28,5 @@ rocksdb = "0.22.0" bincode = "1.3.3" half = "2.4.1" delegate = "0.12.0" +ahash = "0.8.11" +hashbrown = "0.14.5" diff --git a/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs b/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs index 97809bf..4e68422 100644 --- a/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs +++ b/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs @@ -8,7 +8,7 @@ use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::error::Result; -use crate::physical_plan::streaming_window::FranzStreamingWindowExec; +use crate::physical_plan::continuous::streaming_window::FranzStreamingWindowExec; pub struct CoaslesceBeforeStreamingAggregate {} diff --git a/crates/core/src/physical_plan/continuous/group_values/bytes.rs b/crates/core/src/physical_plan/continuous/group_values/bytes.rs new file mode 100644 index 0000000..f04e516 --- /dev/null +++ b/crates/core/src/physical_plan/continuous/group_values/bytes.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch}; +use datafusion::common::Result; +use datafusion::{ + logical_expr::EmitTo, physical_expr::binary_map::OutputType, + physical_expr_common::binary_map::ArrowBytesMap, +}; + +use super::GroupValues; +/// A [`GroupValues`] storing single column of Utf8/LargeUtf8/Binary/LargeBinary values +/// +/// This specialization is significantly faster than using the more general +/// purpose `Row`s format +pub struct GroupValuesByes { + /// Map string/binary values to group index + map: ArrowBytesMap, + /// The total number of groups so far (used to assign group_index) + num_groups: usize, +} + +impl GroupValuesByes { + pub fn new(output_type: OutputType) -> Self { + Self { + map: ArrowBytesMap::new(output_type), + num_groups: 0, + } + } +} + +impl GroupValues for GroupValuesByes { + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + + // look up / add entries in the table + let arr = &cols[0]; + + groups.clear(); + self.map.insert_if_new( + arr, + // called for each new group + |_value| { + // assign new group index on each insert + let group_idx = self.num_groups; + self.num_groups += 1; + group_idx + }, + // called for each group + |group_idx| { + groups.push(group_idx); + }, + ); + + // ensure we assigned a group to for each row + assert_eq!(groups.len(), arr.len()); + Ok(()) + } + + fn size(&self) -> usize { + self.map.size() + std::mem::size_of::() + } + + fn is_empty(&self) -> bool { + self.num_groups == 0 + } + + fn len(&self) -> usize { + self.num_groups + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + // Reset the map to default, and convert it into a single array + let map_contents = self.map.take().into_state(); + + let group_values = match emit_to { + EmitTo::All => { + self.num_groups -= map_contents.len(); + map_contents + } + EmitTo::First(n) if n == self.len() => { + self.num_groups -= map_contents.len(); + map_contents + } + EmitTo::First(n) => { + // if we only wanted to take the first n, insert the rest back + // into the map we could potentially avoid this reallocation, at + // the expense of much more complex code. + // see https://github.com/apache/datafusion/issues/9195 + let emit_group_values = map_contents.slice(0, n); + let remaining_group_values = map_contents.slice(n, map_contents.len() - n); + + self.num_groups = 0; + let mut group_indexes = vec![]; + self.intern(&[remaining_group_values], &mut group_indexes)?; + + // Verify that the group indexes were assigned in the correct order + assert_eq!(0, group_indexes[0]); + + emit_group_values + } + }; + + Ok(vec![group_values]) + } + + fn clear_shrink(&mut self, _batch: &RecordBatch) { + // in theory we could potentially avoid this reallocation and clear the + // contents of the maps, but for now we just reset the map from the beginning + self.map.take(); + } +} diff --git a/crates/core/src/physical_plan/continuous/group_values/bytes_view.rs b/crates/core/src/physical_plan/continuous/group_values/bytes_view.rs new file mode 100644 index 0000000..6b57a3d --- /dev/null +++ b/crates/core/src/physical_plan/continuous/group_values/bytes_view.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, ArrayRef, RecordBatch}; +use datafusion::{ + common::Result, logical_expr::EmitTo, physical_expr::binary_map::OutputType, + physical_expr_common::binary_view_map::ArrowBytesViewMap, +}; + +use super::GroupValues; +//use datafusion_expr::EmitTo; +//use datafusion_physical_expr::binary_map::OutputType; +//use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap; + +/// A [`GroupValues`] storing single column of Utf8View/BinaryView values +/// +/// This specialization is significantly faster than using the more general +/// purpose `Row`s format +pub struct GroupValuesBytesView { + /// Map string/binary values to group index + map: ArrowBytesViewMap, + /// The total number of groups so far (used to assign group_index) + num_groups: usize, +} + +impl GroupValuesBytesView { + pub fn new(output_type: OutputType) -> Self { + Self { + map: ArrowBytesViewMap::new(output_type), + num_groups: 0, + } + } +} + +impl GroupValues for GroupValuesBytesView { + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + + // look up / add entries in the table + let arr = &cols[0]; + + groups.clear(); + self.map.insert_if_new( + arr, + // called for each new group + |_value| { + // assign new group index on each insert + let group_idx = self.num_groups; + self.num_groups += 1; + group_idx + }, + // called for each group + |group_idx| { + groups.push(group_idx); + }, + ); + + // ensure we assigned a group to for each row + assert_eq!(groups.len(), arr.len()); + Ok(()) + } + + fn size(&self) -> usize { + self.map.size() + std::mem::size_of::() + } + + fn is_empty(&self) -> bool { + self.num_groups == 0 + } + + fn len(&self) -> usize { + self.num_groups + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + // Reset the map to default, and convert it into a single array + let map_contents = self.map.take().into_state(); + + let group_values = match emit_to { + EmitTo::All => { + self.num_groups -= map_contents.len(); + map_contents + } + EmitTo::First(n) if n == self.len() => { + self.num_groups -= map_contents.len(); + map_contents + } + EmitTo::First(n) => { + // if we only wanted to take the first n, insert the rest back + // into the map we could potentially avoid this reallocation, at + // the expense of much more complex code. + // see https://github.com/apache/datafusion/issues/9195 + let emit_group_values = map_contents.slice(0, n); + let remaining_group_values = map_contents.slice(n, map_contents.len() - n); + + self.num_groups = 0; + let mut group_indexes = vec![]; + self.intern(&[remaining_group_values], &mut group_indexes)?; + + // Verify that the group indexes were assigned in the correct order + assert_eq!(0, group_indexes[0]); + + emit_group_values + } + }; + + Ok(vec![group_values]) + } + + fn clear_shrink(&mut self, _batch: &RecordBatch) { + // in theory we could potentially avoid this reallocation and clear the + // contents of the maps, but for now we just reset the map from the beginning + self.map.take(); + } +} diff --git a/crates/core/src/physical_plan/continuous/group_values/mod.rs b/crates/core/src/physical_plan/continuous/group_values/mod.rs new file mode 100644 index 0000000..111bf5f --- /dev/null +++ b/crates/core/src/physical_plan/continuous/group_values/mod.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::record_batch::RecordBatch; +use arrow_array::{downcast_primitive, ArrayRef}; +use arrow_schema::{DataType, SchemaRef}; +use bytes_view::GroupValuesBytesView; +use datafusion::common::Result; + +pub(crate) mod primitive; +use datafusion::logical_expr::EmitTo; +use datafusion::physical_expr::binary_map::OutputType; +use primitive::GroupValuesPrimitive; + +mod row; +use row::GroupValuesRows; + +mod bytes; +mod bytes_view; +use bytes::GroupValuesByes; +//use datafusion_physical_expr::binary_map::OutputType; + +/// An interning store for group keys +pub trait GroupValues: Send { + /// Calculates the `groups` for each input row of `cols` + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()>; + + /// Returns the number of bytes used by this [`GroupValues`] + fn size(&self) -> usize; + + /// Returns true if this [`GroupValues`] is empty + fn is_empty(&self) -> bool; + + /// The number of values stored in this [`GroupValues`] + fn len(&self) -> usize; + + /// Emits the group values + fn emit(&mut self, emit_to: EmitTo) -> Result>; + + /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage) + fn clear_shrink(&mut self, batch: &RecordBatch); +} + +pub fn new_group_values(schema: SchemaRef) -> Result> { + if schema.fields.len() == 1 { + let d = schema.fields[0].data_type(); + + macro_rules! downcast_helper { + ($t:ty, $d:ident) => { + return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone()))) + }; + } + + downcast_primitive! { + d => (downcast_helper, d), + _ => {} + } + + match d { + DataType::Utf8 => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); + } + DataType::LargeUtf8 => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); + } + DataType::Utf8View => { + return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View))); + } + DataType::Binary => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Binary))); + } + DataType::LargeBinary => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Binary))); + } + DataType::BinaryView => { + return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView))); + } + _ => {} + } + } + + Ok(Box::new(GroupValuesRows::try_new(schema)?)) +} diff --git a/crates/core/src/physical_plan/continuous/group_values/primitive.rs b/crates/core/src/physical_plan/continuous/group_values/primitive.rs new file mode 100644 index 0000000..f9dc68f --- /dev/null +++ b/crates/core/src/physical_plan/continuous/group_values/primitive.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ahash::RandomState; +use arrow::array::BooleanBufferBuilder; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{i256, IntervalDayTime, IntervalMonthDayNano}; +use arrow::record_batch::RecordBatch; +use arrow_array::cast::AsArray; +use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; +//use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; +use arrow_schema::DataType; +use datafusion::common::utils::proxy::VecAllocExt; +use datafusion::common::Result; +use datafusion::logical_expr::EmitTo; +//use datafusion_common::Result; +//use datafusion_execution::memory_pool::proxy::VecAllocExt; +//use datafusion_expr::EmitTo; +use half::f16; +use hashbrown::raw::RawTable; +use std::sync::Arc; + +use super::GroupValues; + +/// A trait to allow hashing of floating point numbers +pub(crate) trait HashValue { + fn hash(&self, state: &RandomState) -> u64; +} + +macro_rules! hash_integer { + ($($t:ty),+) => { + $(impl HashValue for $t { + #[cfg(not(feature = "force_hash_collisions"))] + fn hash(&self, state: &RandomState) -> u64 { + state.hash_one(self) + } + + #[cfg(feature = "force_hash_collisions")] + fn hash(&self, _state: &RandomState) -> u64 { + 0 + } + })+ + }; +} +hash_integer!(i8, i16, i32, i64, i128, i256); +hash_integer!(u8, u16, u32, u64); +hash_integer!(IntervalDayTime, IntervalMonthDayNano); + +macro_rules! hash_float { + ($($t:ty),+) => { + $(impl HashValue for $t { + #[cfg(not(feature = "force_hash_collisions"))] + fn hash(&self, state: &RandomState) -> u64 { + state.hash_one(self.to_bits()) + } + + #[cfg(feature = "force_hash_collisions")] + fn hash(&self, _state: &RandomState) -> u64 { + 0 + } + })+ + }; +} + +hash_float!(f16, f32, f64); + +/// A [`GroupValues`] storing a single column of primitive values +/// +/// This specialization is significantly faster than using the more general +/// purpose `Row`s format +pub struct GroupValuesPrimitive { + /// The data type of the output array + data_type: DataType, + /// Stores the group index based on the hash of its value + /// + /// We don't store the hashes as hashing fixed width primitives + /// is fast enough for this not to benefit performance + map: RawTable, + /// The group index of the null value if any + null_group: Option, + /// The values for each group index + values: Vec, + /// The random state used to generate hashes + random_state: RandomState, +} + +impl GroupValuesPrimitive { + pub fn new(data_type: DataType) -> Self { + assert!(PrimitiveArray::::is_compatible(&data_type)); + Self { + data_type, + map: RawTable::with_capacity(128), + values: Vec::with_capacity(128), + null_group: None, + random_state: Default::default(), + } + } +} + +impl GroupValues for GroupValuesPrimitive +where + T::Native: HashValue, +{ + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + groups.clear(); + + for v in cols[0].as_primitive::() { + let group_id = match v { + None => *self.null_group.get_or_insert_with(|| { + let group_id = self.values.len(); + self.values.push(Default::default()); + group_id + }), + Some(key) => { + let state = &self.random_state; + let hash = key.hash(state); + let insert = self.map.find_or_find_insert_slot( + hash, + |g| unsafe { self.values.get_unchecked(*g).is_eq(key) }, + |g| unsafe { self.values.get_unchecked(*g).hash(state) }, + ); + + // SAFETY: No mutation occurred since find_or_find_insert_slot + unsafe { + match insert { + Ok(v) => *v.as_ref(), + Err(slot) => { + let g = self.values.len(); + self.map.insert_in_slot(hash, slot, g); + self.values.push(key); + g + } + } + } + } + }; + groups.push(group_id) + } + Ok(()) + } + + fn size(&self) -> usize { + self.map.capacity() * std::mem::size_of::() + self.values.allocated_size() + } + + fn is_empty(&self) -> bool { + self.values.is_empty() + } + + fn len(&self) -> usize { + self.values.len() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + fn build_primitive( + values: Vec, + null_idx: Option, + ) -> PrimitiveArray { + let nulls = null_idx.map(|null_idx| { + let mut buffer = BooleanBufferBuilder::new(values.len()); + buffer.append_n(values.len(), true); + buffer.set_bit(null_idx, false); + unsafe { NullBuffer::new_unchecked(buffer.finish(), 1) } + }); + PrimitiveArray::::new(values.into(), nulls) + } + + let array: PrimitiveArray = match emit_to { + EmitTo::All => { + self.map.clear(); + build_primitive(std::mem::take(&mut self.values), self.null_group.take()) + } + EmitTo::First(n) => { + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + // Decrement group index by n + match bucket.as_ref().checked_sub(n) { + // Group index was >= n, shift value down + Some(sub) => *bucket.as_mut() = sub, + // Group index was < n, so remove from table + None => self.map.erase(bucket), + } + } + } + let null_group = match &mut self.null_group { + Some(v) if *v >= n => { + *v -= n; + None + } + Some(_) => self.null_group.take(), + None => None, + }; + let mut split = self.values.split_off(n); + std::mem::swap(&mut self.values, &mut split); + build_primitive(split, null_group) + } + }; + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + let count = batch.num_rows(); + self.values.clear(); + self.values.shrink_to(count); + self.map.clear(); + self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared + } +} diff --git a/crates/core/src/physical_plan/continuous/group_values/row.rs b/crates/core/src/physical_plan/continuous/group_values/row.rs new file mode 100644 index 0000000..9a3167c --- /dev/null +++ b/crates/core/src/physical_plan/continuous/group_values/row.rs @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ahash::RandomState; +use arrow::compute::cast; +use arrow::record_batch::RecordBatch; +use arrow::row::{RowConverter, Rows, SortField}; +use arrow_array::{Array, ArrayRef}; +use arrow_schema::{DataType, SchemaRef}; +use datafusion::common::Result; +use datafusion::common::{ + hash_utils::create_hashes, + utils::proxy::{RawTableAllocExt, VecAllocExt}, +}; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::EmitTo; +use hashbrown::raw::RawTable; + +use super::GroupValues; + +/// A [`GroupValues`] making use of [`Rows`] +pub struct GroupValuesRows { + /// The output schema + schema: SchemaRef, + + /// Converter for the group values + row_converter: RowConverter, + + /// Logically maps group values to a group_index in + /// [`Self::group_values`] and in each accumulator + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys (group values) in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, group_index) + map: RawTable<(u64, usize)>, + + /// The size of `map` in bytes + map_size: usize, + + /// The actual group by values, stored in arrow [`Row`] format. + /// `group_values[i]` holds the group value for group_index `i`. + /// + /// The row format is used to compare group keys quickly and store + /// them efficiently in memory. Quick comparison is especially + /// important for multi-column group keys. + /// + /// [`Row`]: arrow::row::Row + group_values: Option, + + /// reused buffer to store hashes + hashes_buffer: Vec, + + /// reused buffer to store rows + rows_buffer: Rows, + + /// Random state for creating hashes + random_state: RandomState, +} + +impl GroupValuesRows { + pub fn try_new(schema: SchemaRef) -> Result { + let row_converter = RowConverter::new( + schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + + let map = RawTable::with_capacity(0); + + let starting_rows_capacity = 1000; + let starting_data_capacity = 64 * starting_rows_capacity; + let rows_buffer = row_converter.empty_rows(starting_rows_capacity, starting_data_capacity); + Ok(Self { + schema, + row_converter, + map, + map_size: 0, + group_values: None, + hashes_buffer: Default::default(), + rows_buffer, + random_state: Default::default(), + }) + } +} + +impl GroupValues for GroupValuesRows { + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + // Convert the group keys into the row format + let group_rows = &mut self.rows_buffer; + group_rows.clear(); + self.row_converter.append(group_rows, cols)?; + let n_rows = group_rows.num_rows(); + + let mut group_values = match self.group_values.take() { + Some(group_values) => group_values, + None => self.row_converter.empty_rows(0, 0), + }; + + // tracks to which group each of the input rows belongs + groups.clear(); + + // 1.1 Calculate the group keys for the group values + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, batch_hashes)?; + + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { + // Somewhat surprisingly, this closure can be called even if the + // hash doesn't match, so check the hash first with an integer + // comparison first avoid the more expensive comparison with + // group value. https://github.com/apache/datafusion/pull/11718 + target_hash == *exist_hash + // verify that the group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + && group_rows.row(row) == group_values.row(*group_idx) + }); + + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx)) => *group_idx, + // 1.2 Need to create new entry for the group + None => { + // Add new entry to aggr_state and save newly created index + let group_idx = group_values.num_rows(); + group_values.push(group_rows.row(row)); + + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, group_idx), + |(hash, _group_index)| *hash, + &mut self.map_size, + ); + group_idx + } + }; + groups.push(group_idx); + } + + self.group_values = Some(group_values); + + Ok(()) + } + + fn size(&self) -> usize { + let group_values_size = self.group_values.as_ref().map(|v| v.size()).unwrap_or(0); + self.row_converter.size() + + group_values_size + + self.map_size + + self.rows_buffer.size() + + self.hashes_buffer.allocated_size() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn len(&self) -> usize { + self.group_values + .as_ref() + .map(|group_values| group_values.num_rows()) + .unwrap_or(0) + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + let mut group_values = self + .group_values + .take() + .expect("Can not emit from empty rows"); + + let mut output = match emit_to { + EmitTo::All => { + let output = self.row_converter.convert_rows(&group_values)?; + group_values.clear(); + output + } + EmitTo::First(n) => { + let groups_rows = group_values.iter().take(n); + let output = self.row_converter.convert_rows(groups_rows)?; + // Clear out first n group keys by copying them to a new Rows. + // TODO file some ticket in arrow-rs to make this more efficient? + let mut new_group_values = self.row_converter.empty_rows(0, 0); + for row in group_values.iter().skip(n) { + new_group_values.push(row); + } + std::mem::swap(&mut new_group_values, &mut group_values); + + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + // Decrement group index by n + match bucket.as_ref().1.checked_sub(n) { + // Group index was >= n, shift value down + Some(sub) => bucket.as_mut().1 = sub, + // Group index was < n, so remove from table + None => self.map.erase(bucket), + } + } + } + output + } + }; + + // TODO: Materialize dictionaries in group keys (#7647) + for (field, array) in self.schema.fields.iter().zip(&mut output) { + let expected = field.data_type(); + if let DataType::Dictionary(_, v) = expected { + let actual = array.data_type(); + if v.as_ref() != actual { + return Err(DataFusionError::Internal(format!( + "Converted group rows expected dictionary of {v} got {actual}" + ))); + } + *array = cast(array.as_ref(), expected)?; + } + } + + self.group_values = Some(group_values); + Ok(output) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + let count = batch.num_rows(); + self.group_values = self.group_values.take().map(|mut rows| { + rows.clear(); + rows + }); + self.map.clear(); + self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared + self.map_size = self.map.capacity() * std::mem::size_of::<(u64, usize)>(); + self.hashes_buffer.clear(); + self.hashes_buffer.shrink_to(count); + } +} diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs new file mode 100644 index 0000000..c87a3e3 --- /dev/null +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -0,0 +1,535 @@ +use std::{ + collections::BTreeMap, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use arrow::array::*; +use arrow::{ + compute::{concat_batches, filter_record_batch}, + datatypes::TimestampMillisecondType, +}; + +use arrow_array::{ArrayRef, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray}; +use arrow_ord::cmp; +use arrow_schema::{Schema, SchemaRef}; +use datafusion::{ + common::{utils::proxy::VecAllocExt, DataFusionError, Result}, + execution::{ + memory_pool::{MemoryConsumer, MemoryReservation}, + runtime_env::RuntimeEnv, + }, + logical_expr::EmitTo, + physical_plan::{aggregates::PhysicalGroupBy, PhysicalExpr}, +}; +use datafusion::{ + execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, + physical_plan::{ + aggregates::{aggregate_expressions, AggregateMode}, + metrics::BaselineMetrics, + AggregateExpr, + }, +}; +use futures::{Stream, StreamExt}; + +use crate::physical_plan::utils::{ + accumulators::{create_accumulators, AccumulatorItem}, + time::RecordBatchWatermark, +}; + +use super::{ + add_window_columns_to_record_batch, add_window_columns_to_schema, create_group_accumulator, + group_values::{new_group_values, GroupValues}, + order::GroupOrdering, + streaming_window::{ + get_windows_for_watermark, FranzStreamingWindowExec, FranzStreamingWindowType, + FranzWindowFrame, + }, + GroupsAccumulatorItem, +}; + +pub struct GroupedWindowAggStream { + pub schema: SchemaRef, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + exec_aggregate_expressions: Vec>, + aggregate_expressions: Vec>>, + filter_expressions: Vec>>, + latest_watermark: Arc>>, + window_frames: BTreeMap, + window_type: FranzStreamingWindowType, + aggregation_mode: AggregateMode, + group_by: PhysicalGroupBy, + group_schema: Arc, + context: Arc, +} + +fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { + let group_fields = schema.fields()[0..group_count].to_vec(); + Arc::new(Schema::new(group_fields)) +} + +#[allow(dead_code)] +impl GroupedWindowAggStream { + pub fn new( + exec_operator: &FranzStreamingWindowExec, + context: Arc, + partition: usize, + watermark: Arc>>, + window_type: FranzStreamingWindowType, + aggregation_mode: AggregateMode, + ) -> Result { + let agg_schema = Arc::clone(&exec_operator.schema); + let agg_filter_expr = exec_operator.filter_expressions.clone(); + + let baseline_metrics = BaselineMetrics::new(&exec_operator.metrics, partition); + let input = exec_operator + .input + .execute(partition, Arc::clone(&context))?; + + let aggregate_expressions = + aggregate_expressions(&exec_operator.aggregate_expressions, &exec_operator.mode, 0)?; + let filter_expressions = match exec_operator.mode { + AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned => { + agg_filter_expr + } + AggregateMode::Final | AggregateMode::FinalPartitioned => { + vec![None; exec_operator.aggregate_expressions.len()] + } + }; + + let group_by = exec_operator.group_by.clone(); + let group_schema = group_schema(&agg_schema, group_by.expr.len()); + Ok(Self { + schema: agg_schema, + input, + baseline_metrics, + exec_aggregate_expressions: exec_operator.aggregate_expressions.clone(), + aggregate_expressions, + filter_expressions, + latest_watermark: watermark, + window_frames: BTreeMap::new(), + window_type, + aggregation_mode, + group_by, + group_schema, + context, + }) + } + + pub fn output_schema_with_window(&self) -> SchemaRef { + Arc::new(add_window_columns_to_schema(self.schema.clone())) + } + + fn trigger_windows(&mut self) -> Result { + let mut results: Vec = Vec::new(); + let watermark_lock: std::sync::MutexGuard<'_, Option> = + self.latest_watermark.lock().unwrap(); + + if let Some(watermark) = *watermark_lock { + let mut window_frames_to_remove: Vec = Vec::new(); + + for (timestamp, frame) in self.window_frames.iter_mut() { + if watermark >= frame.window_end_time { + let rb = frame.evaluate()?; + let result = add_window_columns_to_record_batch( + rb, + frame.window_start_time, + frame.window_end_time, + ); + results.push(result); + window_frames_to_remove.push(*timestamp); + } + } + + for timestamp in window_frames_to_remove { + self.window_frames.remove(×tamp); + } + } + concat_batches(&self.output_schema_with_window(), &results) + .map_err(|err| DataFusionError::ArrowError(err, None)) + } + + fn process_watermark(&mut self, watermark: RecordBatchWatermark) { + // should this be within a mutex? + let mut watermark_lock: std::sync::MutexGuard> = + self.latest_watermark.lock().unwrap(); + + if let Some(current_watermark) = *watermark_lock { + if current_watermark <= watermark.min_timestamp { + *watermark_lock = Some(watermark.min_timestamp) + } + } else { + *watermark_lock = Some(watermark.min_timestamp) + } + } + + fn get_window_length(&mut self) -> Duration { + match self.window_type { + FranzStreamingWindowType::Session(duration) => duration, + FranzStreamingWindowType::Sliding(duration, _) => duration, + FranzStreamingWindowType::Tumbling(duration) => duration, + } + } + + fn ensure_window_frames_for_ranges( + &mut self, + ranges: &Vec<(SystemTime, SystemTime)>, + ) -> Result<(), DataFusionError> { + for (start_time, end_time) in ranges { + self.window_frames.entry(*start_time).or_insert({ + let accumulators: Vec<_> = self + .exec_aggregate_expressions + .iter() + .map(create_group_accumulator) + .collect::>()?; + let elapsed = start_time.elapsed().unwrap().as_millis(); + let name = format!("GroupedHashAggregateStream WindowStart[{elapsed}]"); + // Threading in Memory Reservation for now. We are currently not supporting spilling to disk. + let reservation = MemoryConsumer::new(name) + .with_can_spill(false) + .register(self.context.memory_pool()); + let group_values = new_group_values(self.group_schema.clone())?; + + GroupedAggWindowFrame::new( + *start_time, + *end_time, + "canonical_timestamp".to_string(), + accumulators, + self.aggregate_expressions.clone(), + self.filter_expressions.clone(), + self.group_by.clone(), + self.schema.clone(), + self.baseline_metrics.clone(), + group_values, + Default::default(), + GroupOrdering::None, + reservation, + ) + }); + } + Ok(()) + } + + #[inline] + fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { + let result: std::prelude::v1::Result = match self + .input + .poll_next_unpin(cx) + { + Poll::Ready(rdy) => match rdy { + Some(Ok(batch)) => { + if batch.num_rows() > 0 { + let watermark: RecordBatchWatermark = + RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?; + let ranges = get_windows_for_watermark(&watermark, self.window_type); + let _ = self.ensure_window_frames_for_ranges(&ranges); + for range in ranges { + let frame = self.window_frames.get_mut(&range.0).unwrap(); + let _ = frame.push(&batch); + } + self.process_watermark(watermark); + + self.trigger_windows() + } else { + Ok(RecordBatch::new_empty(self.output_schema_with_window())) + } + } + Some(Err(e)) => Err(e), + None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), + }, + Poll::Pending => { + return Poll::Pending; + } + }; + Poll::Ready(Some(result)) + } +} + +impl RecordBatchStream for GroupedWindowAggStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for GroupedWindowAggStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll: Poll>> = + self.poll_next_inner(cx); + self.baseline_metrics.record_poll(poll) + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} + +pub struct GroupedAggWindowFrame { + pub window_start_time: SystemTime, + pub window_end_time: SystemTime, + pub timestamp_column: String, + pub accumulators: Vec, + pub aggregate_expressions: Vec>>, + pub filter_expressions: Vec>>, + pub group_by: PhysicalGroupBy, + /// GROUP BY expressions + /// An interning store of group keys + group_values: Box, + /// scratch space for the current input [`RecordBatch`] being + /// processed. Reused across batches here to avoid reallocations + current_group_indices: Vec, + + /// Optional ordering information, that might allow groups to be + /// emitted from the hash table prior to seeing the end of the + /// input + group_ordering: GroupOrdering, + reservation: MemoryReservation, + + pub schema: SchemaRef, + pub baseline_metrics: BaselineMetrics, +} + +impl GroupedAggWindowFrame { + pub(crate) fn new( + window_start_time: SystemTime, + window_end_time: SystemTime, + timestamp_column: String, + accumulators: Vec, + aggregate_expressions: Vec>>, + filter_expressions: Vec>>, + group_by: PhysicalGroupBy, + schema: SchemaRef, + baseline_metrics: BaselineMetrics, + group_values: Box, + current_group_indices: Vec, + group_ordering: GroupOrdering, + reservation: MemoryReservation, + ) -> Self { + Self { + window_start_time, + window_end_time, + timestamp_column, + accumulators, + aggregate_expressions, + filter_expressions, + group_by, + group_values, + current_group_indices, + group_ordering, + reservation, + schema, + baseline_metrics, + } + } + + fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> { + // Evaluate the grouping expressions + let group_by_values = evaluate_group_by(&self.group_by, &batch)?; + // Evaluate the aggregation expressions. + let input_values = evaluate_many(&self.aggregate_expressions, &batch)?; + + // Evaluate the filter expressions, if any, against the inputs + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + for group_values in &group_by_values { + // calculate the group indices for each input row + let starting_num_groups = self.group_values.len(); + self.group_values + .intern(group_values, &mut self.current_group_indices)?; + let group_indices = &self.current_group_indices; + + // Update ordering information if necessary + let total_num_groups = self.group_values.len(); + if total_num_groups > starting_num_groups { + self.group_ordering + .new_groups(group_values, group_indices, total_num_groups)?; + } + + // Gather the inputs to call the actual accumulator + let t = self + .accumulators + .iter_mut() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + for ((acc, values), opt_filter) in t { + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + + acc.update_batch(values, group_indices, opt_filter, total_num_groups)?; + } + } + self.update_memory_reservation() + } + + fn update_memory_reservation(&mut self) -> Result<()> { + let acc = self.accumulators.iter().map(|x| x.size()).sum::(); + self.reservation.try_resize( + acc + self.group_values.size() + + self.group_ordering.size() + + self.current_group_indices.allocated_size(), + ) + } + + pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> { + let metadata = batch + .column_by_name("_streaming_internal_metadata") + .unwrap(); + let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); + + let ts_column = metadata_struct + .column_by_name("canonical_timestamp") + .unwrap(); + + let ts_array = ts_column + .as_any() + .downcast_ref::>() + .unwrap() + .to_owned(); + + let start_time_duration = self + .window_start_time + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let gte_cmp_filter = cmp::gt_eq( + &ts_array, + &TimestampMillisecondArray::new_scalar(start_time_duration), + )?; + + let filtered_batch: RecordBatch = filter_record_batch(batch, >e_cmp_filter)?; + + let end_time_duration = self + .window_end_time + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let metadata = filtered_batch + .column_by_name("_streaming_internal_metadata") + .unwrap(); + let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); + + let ts_column = metadata_struct + .column_by_name("canonical_timestamp") + .unwrap(); + + let ts_array = ts_column + .as_any() + .downcast_ref::>() + .unwrap() + .to_owned(); + + let lt_cmp_filter = cmp::lt( + &ts_array, + &TimestampMillisecondArray::new_scalar(end_time_duration), + )?; + let final_batch = filter_record_batch(&filtered_batch, <_cmp_filter)?; + + let _ = self.group_aggregate_batch(final_batch); + + Ok(()) + } + + /// Create an output RecordBatch with the group keys and + /// accumulator states/values specified in emit_to + fn evaluate(&mut self) -> Result { + //let timer = self.baseline_metrics.elapsed_compute().timer(); + + let schema = self.schema.clone(); + if self.group_values.is_empty() { + return Ok(RecordBatch::new_empty(schema)); + } + + let mut output = self.group_values.emit(EmitTo::All)?; + + // Next output each aggregate value + for acc in self.accumulators.iter_mut() { + output.push(acc.evaluate(EmitTo::All)?) + } + + // emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is + // over the target memory size after emission, we can emit again rather than returning Err. + let _ = self.update_memory_reservation(); + let batch = RecordBatch::try_new(schema, output)?; + Ok(batch) + } +} + +pub(crate) fn evaluate_group_by( + group_by: &PhysicalGroupBy, + batch: &RecordBatch, +) -> Result>> { + let exprs: Vec = group_by + .expr + .iter() + .map(|(expr, _)| { + let value = expr.evaluate(batch)?; + value.into_array(batch.num_rows()) + }) + .collect::>>()?; + + let null_exprs: Vec = group_by + .null_expr + .iter() + .map(|(expr, _)| { + let value = expr.evaluate(batch)?; + value.into_array(batch.num_rows()) + }) + .collect::>>()?; + + Ok(group_by + .groups + .iter() + .map(|group| { + group + .iter() + .enumerate() + .map(|(idx, is_null)| { + if *is_null { + Arc::clone(&null_exprs[idx]) + } else { + Arc::clone(&exprs[idx]) + } + }) + .collect() + }) + .collect()) +} + +/// Evaluates expressions against a record batch. +fn evaluate(expr: &[Arc], batch: &RecordBatch) -> Result> { + expr.iter() + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) + .collect() +} + +/// Evaluates expressions against a record batch. +pub(crate) fn evaluate_many( + expr: &[Vec>], + batch: &RecordBatch, +) -> Result>> { + expr.iter().map(|expr| evaluate(expr, batch)).collect() +} + +fn evaluate_optional( + expr: &[Option>], + batch: &RecordBatch, +) -> Result>> { + expr.iter() + .map(|expr| { + expr.as_ref() + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) + .transpose() + }) + .collect() +} diff --git a/crates/core/src/physical_plan/continuous/mod.rs b/crates/core/src/physical_plan/continuous/mod.rs new file mode 100644 index 0000000..e980eb3 --- /dev/null +++ b/crates/core/src/physical_plan/continuous/mod.rs @@ -0,0 +1,106 @@ +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use arrow::{ + array::PrimitiveBuilder, compute::filter_record_batch, datatypes::TimestampMillisecondType, +}; +use arrow_array::{Array, BooleanArray, RecordBatch}; +use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit}; +use datafusion::{ + common::{downcast_value, DataFusionError, Result}, + logical_expr::GroupsAccumulator, + physical_expr::GroupsAccumulatorAdapter, + physical_plan::PhysicalExpr, +}; +pub mod grouped_window_agg_stream; +pub mod streaming_window; + +mod group_values; +mod order; +use datafusion::physical_expr::AggregateExpr; +use log::debug; + +pub(crate) type GroupsAccumulatorItem = Box; + +pub(crate) fn create_group_accumulator( + agg_expr: &Arc, +) -> Result> { + if agg_expr.groups_accumulator_supported() { + agg_expr.create_groups_accumulator() + } else { + // Note in the log when the slow path is used + debug!( + "Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}", + agg_expr.name() + ); + let agg_expr_captured = Arc::clone(agg_expr); + let factory = move || agg_expr_captured.create_accumulator(); + Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) + } +} + +fn add_window_columns_to_schema(schema: SchemaRef) -> Schema { + let fields = schema.flattened_fields().to_owned(); + + let mut builder = SchemaBuilder::new(); + + for field in fields { + builder.push(field.clone()); + } + builder.push(Field::new( + "window_start_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + builder.push(Field::new( + "window_end_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + + builder.finish() +} + +fn add_window_columns_to_record_batch( + record_batch: RecordBatch, + start_time: SystemTime, + end_time: SystemTime, +) -> RecordBatch { + let start_time_duration = start_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; + let end_time_duration = end_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; + + let mut start_builder = PrimitiveBuilder::::new(); + let mut end_builder = PrimitiveBuilder::::new(); + + for _ in 0..record_batch.num_rows() { + start_builder.append_value(start_time_duration); + end_builder.append_value(end_time_duration); + } + + let start_array = start_builder.finish(); + let end_array = end_builder.finish(); + + let new_schema = add_window_columns_to_schema(record_batch.schema()); + let mut new_columns = record_batch.columns().to_vec(); + new_columns.push(Arc::new(start_array)); + new_columns.push(Arc::new(end_array)); + + RecordBatch::try_new(Arc::new(new_schema), new_columns).unwrap() +} + +pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray> { + Ok(downcast_value!(array, BooleanArray)) +} + +fn batch_filter(batch: &RecordBatch, predicate: &Arc) -> Result { + predicate + .evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + Ok(as_boolean_array(&array)?) + // apply filter array to record batch + .and_then(|filter_array| Ok(filter_record_batch(batch, filter_array)?)) + }) +} diff --git a/crates/core/src/physical_plan/continuous/order/full.rs b/crates/core/src/physical_plan/continuous/order/full.rs new file mode 100644 index 0000000..65af814 --- /dev/null +++ b/crates/core/src/physical_plan/continuous/order/full.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::logical_expr::EmitTo; + +/// Tracks grouping state when the data is ordered entirely by its +/// group keys +/// +/// When the group values are sorted, as soon as we see group `n+1` we +/// know we will never see any rows for group `n again and thus they +/// can be emitted. +/// +/// For example, given `SUM(amt) GROUP BY id` if the input is sorted +/// by `id` as soon as a new `id` value is seen all previous values +/// can be emitted. +/// +/// The state is tracked like this: +/// +/// ```text +/// ┌─────┐ ┌──────────────────┐ +/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓ +/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃ +/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛ +/// │ ... │ │ ... │ │ +/// │┌───┐│ │ ┌──────────────┐ │ │ current +/// ││12 ││ │ │ 234 │ │ │ +/// │├───┤│ │ ├──────────────┤ │ │ +/// ││12 ││ │ │ 234 │ │ │ +/// │├───┤│ │ ├──────────────┤ │ │ +/// ││13 ││ │ │ 456 │◀┼───┘ +/// │└───┘│ │ └──────────────┘ │ +/// └─────┘ └──────────────────┘ +/// +/// group indices group_values current tracks the most +/// (in group value recent group index +/// order) +/// ``` +/// +/// In this diagram, the current group is `13`, and thus groups +/// `0..12` can be emitted. Note that `13` can not yet be emitted as +/// there may be more values in the next batch with the same group_id. +#[derive(Debug)] +pub(crate) struct GroupOrderingFull { + state: State, +} + +#[derive(Debug)] +enum State { + /// Seen no input yet + Start, + + /// Data is in progress. `current is the current group for which + /// values are being generated. Can emit `current` - 1 + InProgress { current: usize }, + + /// Seen end of input: all groups can be emitted + Complete, +} + +impl GroupOrderingFull { + pub fn new() -> Self { + Self { + state: State::Start, + } + } + + // How many groups be emitted, or None if no data can be emitted + pub fn emit_to(&self) -> Option { + match &self.state { + State::Start => None, + State::InProgress { current, .. } => { + if *current == 0 { + // Can not emit if still on the first row + None + } else { + // otherwise emit all rows prior to the current group + Some(EmitTo::First(*current)) + } + } + State::Complete { .. } => Some(EmitTo::All), + } + } + + /// remove the first n groups from the internal state, shifting + /// all existing indexes down by `n` + pub fn remove_groups(&mut self, n: usize) { + match &mut self.state { + State::Start => panic!("invalid state: start"), + State::InProgress { current } => { + // shift down by n + assert!(*current >= n); + *current -= n; + } + State::Complete { .. } => panic!("invalid state: complete"), + } + } + + /// Note that the input is complete so any outstanding groups are done as well + pub fn input_done(&mut self) { + self.state = State::Complete; + } + + /// Called when new groups are added in a batch. See documentation + /// on [`super::GroupOrdering::new_groups`] + pub fn new_groups(&mut self, total_num_groups: usize) { + assert_ne!(total_num_groups, 0); + + // Update state + let max_group_index = total_num_groups - 1; + self.state = match self.state { + State::Start => State::InProgress { + current: max_group_index, + }, + State::InProgress { current } => { + // expect to see new group indexes when called again + assert!(current <= max_group_index, "{current} <= {max_group_index}"); + State::InProgress { + current: max_group_index, + } + } + State::Complete { .. } => { + panic!("Saw new group after input was complete"); + } + }; + } + + pub(crate) fn size(&self) -> usize { + std::mem::size_of::() + } +} diff --git a/crates/core/src/physical_plan/continuous/order/mod.rs b/crates/core/src/physical_plan/continuous/order/mod.rs new file mode 100644 index 0000000..921452a --- /dev/null +++ b/crates/core/src/physical_plan/continuous/order/mod.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::ArrayRef; +use arrow_schema::Schema; +use datafusion::{ + common::Result, logical_expr::EmitTo, physical_expr::PhysicalSortExpr, + physical_plan::InputOrderMode, +}; + +mod full; +mod partial; + +pub(crate) use full::GroupOrderingFull; +pub(crate) use partial::GroupOrderingPartial; + +/// Ordering information for each group in the hash table +#[derive(Debug)] +pub(crate) enum GroupOrdering { + /// Groups are not ordered + None, + /// Groups are ordered by some pre-set of the group keys + Partial(GroupOrderingPartial), + /// Groups are entirely contiguous, + Full(GroupOrderingFull), +} + +impl GroupOrdering { + /// Create a `GroupOrdering` for the specified ordering + pub fn try_new( + input_schema: &Schema, + mode: &InputOrderMode, + ordering: &[PhysicalSortExpr], + ) -> Result { + match mode { + InputOrderMode::Linear => Ok(GroupOrdering::None), + InputOrderMode::PartiallySorted(order_indices) => { + GroupOrderingPartial::try_new(input_schema, order_indices, ordering) + .map(GroupOrdering::Partial) + } + InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())), + } + } + + // How many groups be emitted, or None if no data can be emitted + pub fn emit_to(&self) -> Option { + match self { + GroupOrdering::None => None, + GroupOrdering::Partial(partial) => partial.emit_to(), + GroupOrdering::Full(full) => full.emit_to(), + } + } + + /// Updates the state the input is done + pub fn input_done(&mut self) { + match self { + GroupOrdering::None => {} + GroupOrdering::Partial(partial) => partial.input_done(), + GroupOrdering::Full(full) => full.input_done(), + } + } + + /// remove the first n groups from the internal state, shifting + /// all existing indexes down by `n` + pub fn remove_groups(&mut self, n: usize) { + match self { + GroupOrdering::None => {} + GroupOrdering::Partial(partial) => partial.remove_groups(n), + GroupOrdering::Full(full) => full.remove_groups(n), + } + } + + /// Called when new groups are added in a batch + /// + /// * `total_num_groups`: total number of groups (so max + /// group_index is total_num_groups - 1). + /// + /// * `group_values`: group key values for *each row* in the batch + /// + /// * `group_indices`: indices for each row in the batch + /// + /// * `hashes`: hash values for each row in the batch + pub fn new_groups( + &mut self, + batch_group_values: &[ArrayRef], + group_indices: &[usize], + total_num_groups: usize, + ) -> Result<()> { + match self { + GroupOrdering::None => {} + GroupOrdering::Partial(partial) => { + partial.new_groups(batch_group_values, group_indices, total_num_groups)?; + } + GroupOrdering::Full(full) => { + full.new_groups(total_num_groups); + } + }; + Ok(()) + } + + /// Return the size of memory used by the ordering state, in bytes + pub(crate) fn size(&self) -> usize { + std::mem::size_of::() + + match self { + GroupOrdering::None => 0, + GroupOrdering::Partial(partial) => partial.size(), + GroupOrdering::Full(full) => full.size(), + } + } +} diff --git a/crates/core/src/physical_plan/continuous/order/partial.rs b/crates/core/src/physical_plan/continuous/order/partial.rs new file mode 100644 index 0000000..e941c10 --- /dev/null +++ b/crates/core/src/physical_plan/continuous/order/partial.rs @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::row::{OwnedRow, RowConverter, Rows, SortField}; +use arrow_array::ArrayRef; +use arrow_schema::Schema; +use datafusion::{ + common::utils::proxy::VecAllocExt, common::Result, logical_expr::EmitTo, + physical_expr::PhysicalSortExpr, +}; + +use std::sync::Arc; + +/// Tracks grouping state when the data is ordered by some subset of +/// the group keys. +/// +/// Once the next *sort key* value is seen, never see groups with that +/// sort key again, so we can emit all groups with the previous sort +/// key and earlier. +/// +/// For example, given `SUM(amt) GROUP BY id, state` if the input is +/// sorted by `state, when a new value of `state` is seen, all groups +/// with prior values of `state` can be emitted. +/// +/// The state is tracked like this: +/// +/// ```text +/// ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓ +/// ┌─────┐ ┌───────────────────┐ ┌─────┃ 9 ┃ ┃ "MD" ┃ +/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛ +/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key +/// │└───┘│ │ └──────────────┘ │ │ +/// │ ... │ │ ... │ │ current_sort tracks the +/// │┌───┐│ │ ┌──────────────┐ │ │ smallest group index that had +/// ││ 8 ││ │ │ 765, "MA" │ │ │ the same sort_key as current +/// │├───┤│ │ ├──────────────┤ │ │ +/// ││ 9 ││ │ │ 923, "MD" │◀─┼─┘ +/// │├───┤│ │ ├──────────────┤ │ ┏━━━━━━━━━━━━━━┓ +/// ││10 ││ │ │ 345, "MD" │ │ ┌─────┃ 11 ┃ +/// │├───┤│ │ ├──────────────┤ │ │ ┗━━━━━━━━━━━━━━┛ +/// ││11 ││ │ │ 124, "MD" │◀─┼──┘ current +/// │└───┘│ │ └──────────────┘ │ +/// └─────┘ └───────────────────┘ +/// +/// group indices +/// (in group value group_values current tracks the most +/// order) recent group index +///``` +#[derive(Debug)] +pub(crate) struct GroupOrderingPartial { + /// State machine + state: State, + + /// The indexes of the group by columns that form the sort key. + /// For example if grouping by `id, state` and ordered by `state` + /// this would be `[1]`. + order_indices: Vec, + + /// Converter for the sort key (used on the group columns + /// specified in `order_indexes`) + row_converter: RowConverter, +} + +#[derive(Debug, Default)] +enum State { + /// The ordering was temporarily taken. `Self::Taken` is left + /// when state must be temporarily taken to satisfy the borrow + /// checker. If an error happens before the state can be restored, + /// the ordering information is lost and execution can not + /// proceed, but there is no undefined behavior. + #[default] + Taken, + + /// Seen no input yet + Start, + + /// Data is in progress. + InProgress { + /// Smallest group index with the sort_key + current_sort: usize, + /// The sort key of group_index `current_sort` + sort_key: OwnedRow, + /// index of the current group for which values are being + /// generated + current: usize, + }, + + /// Seen end of input, all groups can be emitted + Complete, +} + +impl GroupOrderingPartial { + pub fn try_new( + input_schema: &Schema, + order_indices: &[usize], + ordering: &[PhysicalSortExpr], + ) -> Result { + assert!(!order_indices.is_empty()); + assert!(order_indices.len() <= ordering.len()); + + // get only the section of ordering, that consist of group by expressions. + let fields = ordering[0..order_indices.len()] + .iter() + .map(|sort_expr| { + Ok(SortField::new_with_options( + sort_expr.expr.data_type(input_schema)?, + sort_expr.options, + )) + }) + .collect::>>()?; + + Ok(Self { + state: State::Start, + order_indices: order_indices.to_vec(), + row_converter: RowConverter::new(fields)?, + }) + } + + /// Creates sort keys from the group values + /// + /// For example, if group_values had `A, B, C` but the input was + /// only sorted on `B` and `C` this should return rows for (`B`, + /// `C`) + fn compute_sort_keys(&mut self, group_values: &[ArrayRef]) -> Result { + // Take only the columns that are in the sort key + let sort_values: Vec<_> = self + .order_indices + .iter() + .map(|&idx| Arc::clone(&group_values[idx])) + .collect(); + + Ok(self.row_converter.convert_columns(&sort_values)?) + } + + /// How many groups be emitted, or None if no data can be emitted + pub fn emit_to(&self) -> Option { + match &self.state { + State::Taken => unreachable!("State previously taken"), + State::Start => None, + State::InProgress { current_sort, .. } => { + // Can not emit if we are still on the first row sort + // row otherwise we can emit all groups that had earlier sort keys + // + if *current_sort == 0 { + None + } else { + Some(EmitTo::First(*current_sort)) + } + } + State::Complete => Some(EmitTo::All), + } + } + + /// remove the first n groups from the internal state, shifting + /// all existing indexes down by `n` + pub fn remove_groups(&mut self, n: usize) { + match &mut self.state { + State::Taken => unreachable!("State previously taken"), + State::Start => panic!("invalid state: start"), + State::InProgress { + current_sort, + current, + sort_key: _, + } => { + // shift indexes down by n + assert!(*current >= n); + *current -= n; + assert!(*current_sort >= n); + *current_sort -= n; + } + State::Complete { .. } => panic!("invalid state: complete"), + } + } + + /// Note that the input is complete so any outstanding groups are done as well + pub fn input_done(&mut self) { + self.state = match self.state { + State::Taken => unreachable!("State previously taken"), + _ => State::Complete, + }; + } + + /// Called when new groups are added in a batch. See documentation + /// on [`super::GroupOrdering::new_groups`] + pub fn new_groups( + &mut self, + batch_group_values: &[ArrayRef], + group_indices: &[usize], + total_num_groups: usize, + ) -> Result<()> { + assert!(total_num_groups > 0); + assert!(!batch_group_values.is_empty()); + + let max_group_index = total_num_groups - 1; + + // compute the sort key values for each group + let sort_keys = self.compute_sort_keys(batch_group_values)?; + + let old_state = std::mem::take(&mut self.state); + let (mut current_sort, mut sort_key) = match &old_state { + State::Taken => unreachable!("State previously taken"), + State::Start => (0, sort_keys.row(0)), + State::InProgress { + current_sort, + sort_key, + .. + } => (*current_sort, sort_key.row()), + State::Complete => { + panic!("Saw new group after the end of input"); + } + }; + + // Find latest sort key + let iter = group_indices.iter().zip(sort_keys.iter()); + for (&group_index, group_sort_key) in iter { + // Does this group have seen a new sort_key? + if sort_key != group_sort_key { + current_sort = group_index; + sort_key = group_sort_key; + } + } + + self.state = State::InProgress { + current_sort, + sort_key: sort_key.owned(), + current: max_group_index, + }; + + Ok(()) + } + + /// Return the size of memory allocated by this structure + pub(crate) fn size(&self) -> usize { + std::mem::size_of::() + + self.order_indices.allocated_size() + + self.row_converter.size() + } +} diff --git a/crates/core/src/physical_plan/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs similarity index 91% rename from crates/core/src/physical_plan/streaming_window.rs rename to crates/core/src/physical_plan/continuous/streaming_window.rs index 0eeb0b2..93f0e57 100644 --- a/crates/core/src/physical_plan/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -41,9 +41,12 @@ use datafusion::physical_plan::{ use futures::{Stream, StreamExt}; use tracing::debug; -use super::utils::{ - accumulators::{create_accumulators, AccumulatorItem}, - time::RecordBatchWatermark, +use crate::physical_plan::{ + continuous::grouped_window_agg_stream::GroupedWindowAggStream, + utils::{ + accumulators::{create_accumulators, AccumulatorItem}, + time::RecordBatchWatermark, + }, }; pub struct FranzWindowFrame { @@ -78,6 +81,8 @@ impl DisplayAs for FranzWindowFrame { use datafusion::common::Result; +use super::{add_window_columns_to_record_batch, add_window_columns_to_schema, batch_filter}; + impl FranzWindowFrame { pub fn new( window_start_time: SystemTime, @@ -194,11 +199,11 @@ pub struct FranzStreamingWindowExec { pub filter_expressions: Vec>>, /// Schema after the window is run pub group_by: PhysicalGroupBy, - schema: SchemaRef, + pub schema: SchemaRef, pub input_schema: SchemaRef, pub watermark: Arc>>, - metrics: ExecutionPlanMetricsSet, + pub(crate) metrics: ExecutionPlanMetricsSet, cache: PlanProperties, pub mode: AggregateMode, pub window_type: FranzStreamingWindowType, @@ -407,15 +412,27 @@ impl ExecutionPlan for FranzStreamingWindowExec { partition: usize, context: Arc, ) -> Result { - let stream: Pin> = Box::pin(WindowAggStream::new( - self, - context, - partition, - self.watermark.clone(), - self.window_type, - self.mode, - )?); - Ok(stream) + if self.group_by.is_empty() { + debug!("GROUP BY expression is empty creating a SimpleWindowAggStream"); + Ok(Box::pin(WindowAggStream::new( + self, + context, + partition, + self.watermark.clone(), + self.window_type, + self.mode, + )?)) + } else { + debug!("Creating a GroupedWindowAggStream"); + Ok(Box::pin(GroupedWindowAggStream::new( + self, + context, + partition, + self.watermark.clone(), + self.window_type, + self.mode, + )?)) + } } fn metrics(&self) -> Option { @@ -573,12 +590,6 @@ impl WindowAggStream { window_type: FranzStreamingWindowType, aggregation_mode: AggregateMode, ) -> Result { - // TODO: 6/12/2024 Why was this commented out? - - // In WindowAggExec all partition by columns should be ordered. - //if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() { - // return internal_err!("All partition by columns should have an ordering"); - //} let agg_schema = Arc::clone(&exec_operator.schema); let agg_filter_expr = exec_operator.filter_expressions.clone(); @@ -746,7 +757,7 @@ impl Stream for WindowAggStream { } } -fn get_windows_for_watermark( +pub fn get_windows_for_watermark( watermark: &RecordBatchWatermark, window_type: FranzStreamingWindowType, ) -> Vec<(SystemTime, SystemTime)> { @@ -878,67 +889,3 @@ pub fn aggregate_batch( Ok(allocated) } - -fn add_window_columns_to_schema(schema: SchemaRef) -> Schema { - let fields = schema.flattened_fields().to_owned(); - - let mut builder = SchemaBuilder::new(); - - for field in fields { - builder.push(field.clone()); - } - builder.push(Field::new( - "window_start_time", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )); - builder.push(Field::new( - "window_end_time", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )); - - builder.finish() -} - -fn add_window_columns_to_record_batch( - record_batch: RecordBatch, - start_time: SystemTime, - end_time: SystemTime, -) -> RecordBatch { - let start_time_duration = start_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; - let end_time_duration = end_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; - - let mut start_builder = PrimitiveBuilder::::new(); - let mut end_builder = PrimitiveBuilder::::new(); - - for _ in 0..record_batch.num_rows() { - start_builder.append_value(start_time_duration); - end_builder.append_value(end_time_duration); - } - - let start_array = start_builder.finish(); - let end_array = end_builder.finish(); - - let new_schema = add_window_columns_to_schema(record_batch.schema()); - let mut new_columns = record_batch.columns().to_vec(); - new_columns.push(Arc::new(start_array)); - new_columns.push(Arc::new(end_array)); - - RecordBatch::try_new(Arc::new(new_schema), new_columns).unwrap() -} - -pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray> { - Ok(downcast_value!(array, BooleanArray)) -} - -fn batch_filter(batch: &RecordBatch, predicate: &Arc) -> Result { - predicate - .evaluate(batch) - .and_then(|v| v.into_array(batch.num_rows())) - .and_then(|array| { - Ok(as_boolean_array(&array)?) - // apply filter array to record batch - .and_then(|filter_array| Ok(filter_record_batch(batch, filter_array)?)) - }) -} diff --git a/crates/core/src/physical_plan/mod.rs b/crates/core/src/physical_plan/mod.rs index f4a9642..44831a7 100644 --- a/crates/core/src/physical_plan/mod.rs +++ b/crates/core/src/physical_plan/mod.rs @@ -1,2 +1,2 @@ -pub mod streaming_window; +pub mod continuous; pub mod utils; diff --git a/crates/core/src/planner/streaming_window.rs b/crates/core/src/planner/streaming_window.rs index 34ec2d2..a054e41 100644 --- a/crates/core/src/planner/streaming_window.rs +++ b/crates/core/src/planner/streaming_window.rs @@ -17,7 +17,9 @@ use datafusion::physical_planner::{ }; use crate::logical_plan::streaming_window::{StreamingWindowPlanNode, StreamingWindowType}; -use crate::physical_plan::streaming_window::{FranzStreamingWindowExec, FranzStreamingWindowType}; +use crate::physical_plan::continuous::streaming_window::{ + FranzStreamingWindowExec, FranzStreamingWindowType, +}; /// Physical planner for TopK nodes pub struct StreamingWindowPlanner {} diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 90902bd..47b5bed 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -17,7 +17,7 @@ async fn main() -> Result<()> { tracing_log::LogTracer::init().expect("Failed to set up log tracer"); let subscriber = FmtSubscriber::builder() - .with_max_level(tracing::Level::INFO) + .with_max_level(tracing::Level::DEBUG) .with_span_events(FmtSpan::CLOSE | FmtSpan::ENTER) .finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); @@ -69,7 +69,7 @@ async fn main() -> Result<()> { .await?; let ds = ctx.from_topic(source_topic).await?.window( - vec![], + vec![col("driver_id")], vec![ max(col("imu_measurement").field("gps").field("speed")), min(col("imu_measurement").field("gps").field("altitude")),