Skip to content

Commit

Permalink
Adding serde utils for Arrow Arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc committed Sep 26, 2024
1 parent e9b6e3f commit 0958f35
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ half = "2.4.1"
delegate = "0.12.0"
ahash = "0.8.11"
hashbrown = "0.14.5"
flatbuffers = "24.3.25"
6 changes: 4 additions & 2 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::execution::{

use crate::datasource::kafka::TopicReader;
use crate::datastream::DataStream;
use crate::physical_optimizer::CoaslesceBeforeStreamingAggregate;
use crate::physical_optimizer::EnsureHashPartititionOnGroupByForStreamingAggregates;
use crate::query_planner::StreamingQueryPlanner;
use crate::utils::get_default_optimizer_rules;

Expand Down Expand Up @@ -41,7 +41,9 @@ impl Context {
.with_runtime_env(runtime)
.with_query_planner(Arc::new(StreamingQueryPlanner {}))
.with_optimizer_rules(get_default_optimizer_rules())
.with_physical_optimizer_rule(Arc::new(CoaslesceBeforeStreamingAggregate::new()))
.with_physical_optimizer_rule(Arc::new(
EnsureHashPartititionOnGroupByForStreamingAggregates::new(),
))
.build();

Ok(Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl StreamingLogicalPlanBuilder for LogicalPlanBuilder {
let plan = self.plan().clone();

Aggregate::try_new(Arc::new(plan.clone()), group_expr, aggr_expr)
.map(|new_aggr| {
.map(|new_aggr: Aggregate| {
LogicalPlan::Extension(Extension {
node: Arc::new(StreamingWindowPlanNode {
window_type: window,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use datafusion::error::Result;

use crate::physical_plan::continuous::streaming_window::StreamingWindowExec;

pub struct CoaslesceBeforeStreamingAggregate {}
pub struct EnsureHashPartititionOnGroupByForStreamingAggregates {}

impl Default for CoaslesceBeforeStreamingAggregate {
impl Default for EnsureHashPartititionOnGroupByForStreamingAggregates {
fn default() -> Self {
Self::new()
}
}

impl CoaslesceBeforeStreamingAggregate {
impl EnsureHashPartititionOnGroupByForStreamingAggregates {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
Expand All @@ -29,7 +29,7 @@ impl CoaslesceBeforeStreamingAggregate {
// Franz optimizer rule, added to ensure coalescing of partitions before a global aggregate
// window. This rule may be removed once we have support for two stage partial and final
// aggregates a la vanilla Datafusion.
impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate {
impl PhysicalOptimizerRule for EnsureHashPartititionOnGroupByForStreamingAggregates {
fn optimize(
&self,
plan: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod coalesce_before_streaming_window_aggregate;

pub use coalesce_before_streaming_window_aggregate::CoaslesceBeforeStreamingAggregate;
pub use coalesce_before_streaming_window_aggregate::EnsureHashPartititionOnGroupByForStreamingAggregates;
1 change: 1 addition & 0 deletions crates/core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
pub mod arrow_helpers;
mod default_optimizer_rules;
pub mod row_encoder;
pub mod serialization;

pub use default_optimizer_rules::get_default_optimizer_rules;
Loading

0 comments on commit 0958f35

Please sign in to comment.