Skip to content

Commit

Permalink
add comments + cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 13, 2024
1 parent abc637e commit c805c2d
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 53 deletions.
92 changes: 92 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 51 additions & 38 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,18 @@ use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use crate::logical_plan::StreamingLogicalPlanBuilder;
use crate::physical_plan::utils::time::TimestampUnit;

/// The primary interface for building a streaming job
///
/// Wraps the DataFusion DataFrame and context objects and provides methods
/// for constructing and executing streaming pipelines.
#[derive(Clone)]
pub struct DataStream {
pub df: Arc<DataFrame>,
pub(crate) context: Arc<Context>,
}

pub trait Joinable {
fn get_plan(self) -> LogicalPlan;
}
impl Joinable for DataFrame {
fn get_plan(self) -> LogicalPlan {
let (_, plan) = self.into_parts();
plan
}
}
impl Joinable for DataStream {
fn get_plan(self) -> LogicalPlan {
let (_, plan) = self.df.as_ref().clone().into_parts();
plan
}
}

impl DataStream {
/// Return the schema of DataFrame that backs the DataStream
pub fn schema(&self) -> &DFSchema {
self.df.schema()
}

/// Prints the schema of the underlying dataframe
/// Useful for debugging chained method calls.
pub fn print_schema(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.schema());
Ok(self)
}

/// Prints the underlying logical_plan.
/// Useful for debugging chained method calls.
pub fn print_plan(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.logical_plan().display_indent());
Ok(self)
}

// Select columns in the output stream
pub fn select(self, expr_list: Vec<Expr>) -> Result<Self, DataFusionError> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();

Expand All @@ -74,6 +44,7 @@ impl DataStream {
})
}

// Apply a filter
pub fn filter(self, predicate: Expr) -> Result<Self> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();

Expand All @@ -85,8 +56,7 @@ impl DataStream {
})
}

// drop_columns, sync, columns: &[&str]
// count
// Join two streams using the specified expression
pub fn join_on(
self,
right: impl Joinable,
Expand All @@ -106,6 +76,8 @@ impl DataStream {
})
}

// Join two streams together using explicitly specified columns
// Also supports joining a DataStream with a DataFrame object
pub fn join(
self,
right: impl Joinable,
Expand All @@ -132,6 +104,7 @@ impl DataStream {
})
}

/// create a streaming window
pub fn window(
self,
group_expr: Vec<Expr>,
Expand All @@ -150,6 +123,8 @@ impl DataStream {
})
}

/// execute the stream and print the results to stdout.
/// Mainly used for development and debugging
pub async fn print_stream(self) -> Result<(), DataFusionError> {
let mut stream: SendableRecordBatchStream =
self.df.as_ref().clone().execute_stream().await?;
Expand All @@ -176,7 +151,27 @@ impl DataStream {
}
}

pub async fn write_table(
/// Return the schema of DataFrame that backs the DataStream
pub fn schema(&self) -> &DFSchema {
self.df.schema()
}

/// Prints the schema of the underlying dataframe
/// Useful for debugging chained method calls.
pub fn print_schema(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.schema());
Ok(self)
}

/// Prints the underlying logical_plan.
/// Useful for debugging chained method calls.
pub fn print_plan(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.logical_plan().display_indent());
Ok(self)
}

/// execute the stream and write the results to a give kafka topic
pub async fn sink_kafka(
self,
bootstrap_servers: String,
topic: String,
Expand Down Expand Up @@ -206,3 +201,21 @@ impl DataStream {
Ok(())
}
}

/// Trait that allows both DataStream and DataFrame objects to be joined to
/// the current DataStream
pub trait Joinable {
fn get_plan(self) -> LogicalPlan;
}
impl Joinable for DataFrame {
fn get_plan(self) -> LogicalPlan {
let (_, plan) = self.into_parts();
plan
}
}
impl Joinable for DataStream {
fn get_plan(self) -> LogicalPlan {
let (_, plan) = self.df.as_ref().clone().into_parts();
plan
}
}
3 changes: 2 additions & 1 deletion crates/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use datafusion::logical_expr::{Aggregate, Expr};
pub mod streaming_window;
use streaming_window::{StreamingWindowPlanNode, StreamingWindowSchema, StreamingWindowType};

/// Extend the DataFusion logical plan builder with streaming specific functionality
pub trait StreamingLogicalPlanBuilder {
fn streaming_window(
self,
Expand All @@ -24,7 +25,7 @@ pub trait StreamingLogicalPlanBuilder {

// Extend the LogicalPlanBuilder with functions to add streaming operators to the plan
impl StreamingLogicalPlanBuilder for LogicalPlanBuilder {
/// Apply franz window functions to extend the schema
/// Apply a streaming window functions to extend the schema
fn streaming_window(
self,
group_expr: impl IntoIterator<Item = impl Into<Expr>>,
Expand Down
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ tempfile = { version = "3" }
rdkafka = { workspace = true }
rand = "0.8.5"
tokio-postgres = { version = "0.7.11", features = ["with-serde_json-1"] }
log = { workspace = true }
env_logger = "0.11.5"
2 changes: 1 addition & 1 deletion examples/examples/kafka_rideshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn main() -> Result<()> {

// ds.clone().print_stream().await?;

ds.write_table(bootstrap_servers.clone(), String::from("out_topic"))
ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic"))
.await?;

Ok(())
Expand Down
34 changes: 21 additions & 13 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::count::count;
// use datafusion::logical_expr::lit;
use datafusion::logical_expr::lit;
use datafusion::logical_expr::{col, max, min};

use df_streams_core::context::Context;
Expand All @@ -14,6 +14,10 @@ use df_streams_examples::get_sample_json;

#[tokio::main]
async fn main() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Warn)
.init();

let sample_event = get_sample_json();

let bootstrap_servers = String::from("localhost:9092");
Expand All @@ -32,18 +36,22 @@ async fn main() -> Result<()> {
]))
.await?;

let ds = ctx.from_topic(source_topic).await?.window(
vec![],
vec![
count(col("reading")).alias("count"),
min(col("reading")).alias("min"),
max(col("reading")).alias("max"),
avg(col("reading")).alias("average"),
],
Duration::from_millis(1_000),
None,
)?;
// .filter(col("max").gt(lit(114)))?;
let ds = ctx
.from_topic(source_topic)
.await?
// .filter(col("reading").gt(lit(70)))?
.window(
vec![],
vec![
count(col("reading")).alias("count"),
min(col("reading")).alias("min"),
max(col("reading")).alias("max"),
avg(col("reading")).alias("average"),
],
Duration::from_millis(1_000),
None,
)?
.filter(col("max").lt(lit(113)))?;

ds.clone().print_stream().await?;

Expand Down

0 comments on commit c805c2d

Please sign in to comment.