Skip to content

Commit

Permalink
update examples
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 12, 2024
1 parent c6ef592 commit abc637e
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 161 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl KafkaWriteConfig {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct KafkaTopicBuilder {
bootstrap_servers: String,
topic: Option<String>,
Expand Down
114 changes: 104 additions & 10 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use datafusion::logical_expr::LogicalPlan;
use futures::StreamExt;
use std::{sync::Arc, time::Duration};

use datafusion::common::{DFSchema, DataFusionError, Result};
pub use datafusion::dataframe::DataFrame;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::{logical_plan::LogicalPlanBuilder, Expr};
use datafusion::logical_expr::{
logical_plan::LogicalPlanBuilder, utils::find_window_exprs, Expr, JoinType,
};

use crate::context::Context;
use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
Expand All @@ -18,12 +21,60 @@ pub struct DataStream {
pub(crate) context: Arc<Context>,
}

pub trait Joinable {
fn get_plan(self) -> LogicalPlan;
}
impl Joinable for DataFrame {
fn get_plan(self) -> LogicalPlan {
let (_, plan) = self.into_parts();
plan
}
}
impl Joinable for DataStream {
fn get_plan(self) -> LogicalPlan {
let (_, plan) = self.df.as_ref().clone().into_parts();
plan
}
}

impl DataStream {
/// Return the schema of DataFrame that backs the DataStream
pub fn schema(&self) -> &DFSchema {
self.df.schema()
}

pub fn filter(&self, predicate: Expr) -> Result<Self> {
/// Prints the schema of the underlying dataframe
/// Useful for debugging chained method calls.
pub fn print_schema(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.schema());
Ok(self)
}

/// Prints the underlying logical_plan.
/// Useful for debugging chained method calls.
pub fn print_plan(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.logical_plan().display_indent());
Ok(self)
}

pub fn select(self, expr_list: Vec<Expr>) -> Result<Self, DataFusionError> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();

let window_func_exprs = find_window_exprs(&expr_list);
let plan = if window_func_exprs.is_empty() {
plan
} else {
LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
};
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;

Ok(Self {
df: Arc::new(DataFrame::new(session_state, project_plan)),
context: self.context.clone(),
})
}

pub fn filter(self, predicate: Expr) -> Result<Self> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();

let plan = LogicalPlanBuilder::from(plan).filter(predicate)?.build()?;
Expand All @@ -36,9 +87,53 @@ impl DataStream {

// drop_columns, sync, columns: &[&str]
// count
pub fn join_on(
self,
right: impl Joinable,
join_type: JoinType,
on_exprs: impl IntoIterator<Item = Expr>,
) -> Result<Self, DataFusionError> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();
let right_plan = right.get_plan();

let plan = LogicalPlanBuilder::from(plan)
.join_on(right_plan, join_type, on_exprs)?
.build()?;

Ok(Self {
df: Arc::new(DataFrame::new(session_state, plan)),
context: self.context.clone(),
})
}

pub fn join(
self,
right: impl Joinable,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>,
) -> Result<Self, DataFusionError> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();
let right_plan = right.get_plan();

let plan = LogicalPlanBuilder::from(plan)
.join(
right_plan,
join_type,
(left_cols.to_vec(), right_cols.to_vec()),
filter,
)?
.build()?;

Ok(Self {
df: Arc::new(DataFrame::new(session_state, plan)),
context: self.context.clone(),
})
}

pub fn window(
&self,
self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
window_length: Duration,
Expand All @@ -55,19 +150,18 @@ impl DataStream {
})
}

pub async fn print_stream(&self) -> Result<(), DataFusionError> {
pub async fn print_stream(self) -> Result<(), DataFusionError> {
let mut stream: SendableRecordBatchStream =
self.df.as_ref().clone().execute_stream().await?;
loop {
match stream.next().await.transpose() {
Ok(Some(batch)) => {
if batch.num_rows() > 0 {
for i in 0..batch.num_rows() {
let row = batch.slice(i, 1);
println!(
"{}",
datafusion::common::arrow::util::pretty::pretty_format_batches(&[
batch
])
.unwrap()
datafusion::common::arrow::util::pretty::pretty_format_batches(&[row])
.unwrap()
);
}
}
Expand All @@ -83,7 +177,7 @@ impl DataStream {
}

pub async fn write_table(
&self,
self,
bootstrap_servers: String,
topic: String,
) -> Result<(), DataFusionError> {
Expand Down
41 changes: 29 additions & 12 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,52 @@
use datafusion::error::Result;
use rand::seq::SliceRandom;
use rdkafka::producer::FutureProducer;
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;

#[derive(Serialize, Deserialize)]
pub struct Measurment {
occurred_at_ms: u64,
temperature: f64,
}
use df_streams_examples::Measurment;

/// docker run -p 9092:9092 --name kafka apache/kafka
#[tokio::main]
async fn main() -> Result<()> {
let mut rng = rand::thread_rng();

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", String::from("localhost:9092"))
.set("message.timeout.ms", "60000")
.create()
.expect("Producer creation error");

let topic = "temperature".to_string();
let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"];

loop {
let msg = serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
temperature: rand::random::<f64>() * 115.0,
})
.unwrap();
let sensor_name = sensors.choose(&mut rng).unwrap().to_string();

// Alternate between sending random temperature and humidity readings
let (topic, msg) = if rand::random::<f64>() < 0.4 {
(
"temperature".to_string(),
serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
sensor_name,
reading: rand::random::<f64>() * 115.0,
})
.unwrap(),
)
} else {
(
"humidity".to_string(),
serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
sensor_name,
reading: rand::random::<f64>(),
})
.unwrap(),
)
};

producer
.send(
Expand Down
40 changes: 19 additions & 21 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,48 @@ use std::time::Duration;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::logical_expr::lit;
use datafusion::functions_aggregate::count::count;
// use datafusion::logical_expr::lit;
use datafusion::logical_expr::{col, max, min};

use df_streams_core::context::Context;
use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use df_streams_core::physical_plan::utils::time::TimestampUnit;

use df_streams_examples::get_sample_json;

#[tokio::main]
async fn main() -> Result<()> {
let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#;
let sample_event = get_sample_json();

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(String::from("temperature"))
.infer_schema_from_json(sample_event)?
.infer_schema_from_json(sample_event.as_str())?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

let ds = ctx
.from_topic(source_topic)
.await?
.window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?
.filter(col("max").gt(lit(114)))?;

println!("{}", ds.df.logical_plan().display_indent());
let ds = ctx.from_topic(source_topic).await?.window(
vec![],
vec![
count(col("reading")).alias("count"),
min(col("reading")).alias("min"),
max(col("reading")).alias("max"),
avg(col("reading")).alias("average"),
],
Duration::from_millis(1_000),
None,
)?;
// .filter(col("max").gt(lit(114)))?;

ds.clone().print_stream().await?;

Expand Down
Loading

0 comments on commit abc637e

Please sign in to comment.