Skip to content

Commit

Permalink
add new example
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 6, 2024
1 parent e76189f commit 6d0da86
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 72 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ futures = "0.3"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = "0.3.18"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
tokio = { version = "1.36", features = ["macros", "rt", "sync", "rt-multi-thread"] }
async-trait = "0.1.81"
rdkafka = "0.36.2"
log = "^0.4"
Expand Down
37 changes: 26 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,32 @@ It currently supports sourcing and sinking to kafka, windowed aggregations, and

This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a github issue or email [email protected].

## Building Denormalized

Simply run `cargo build`

## Running Examples

See our [benchmarking repo](https://github.com/probably-nothing-labs/benchmarking) for local Kafka setup and data generation.

With the data generation in place, run -

`cargo run --example kafka_rideshare`
## Quickstart

### Prerequisites
- Docker + docker compose
- Rust/Cargo installed

### Running an example
1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka`
2. Start emitting some sample data: `cargo run --example emit_measurements`
3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example emit_measurements`

## More examples

A more powerful example can be seen in our [kafka ridesharing example](./docs/kafka_rideshare_example.md)

## Roadmap
- [x] Stream aggregation
- [x] Stream joins
- [ ] Checkpointing / restoration
- [ ] Session windows
- [ ] Stateful UDF API
- [ ] DuckDB support
- [ ] Reading/writing from Postgres
- [ ] Python bindings
- [ ] Typescript bindings
- [ ] UI

## Credits

Expand Down
35 changes: 35 additions & 0 deletions docs/kafka_rideshare_example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kafka Rideshare Example

This example application aggregates data across a more involved example setup.

### Configure Kafka Cluster

Clone our [docker compose files for running kafka](https://github.com/probably-nothing-labs/kafka-monitoring-stack-docker-compose). If you already have a different kafka cluster running, you can skip this step.
```sh
git clone [email protected]:probably-nothing-labs/kafka-monitoring-stack-docker-compose.git
cd kafka-monitoring-stack-docker-compose
docker compose -f denormalized-benchmark-cluster.yml up
```

This will spin up a 3 node kafka cluster in docker along with an instance of kafka-ui that can be viewed at http://localhost:8080/

### Generate some sample data to the kafka cluster

We wrote a [small rust tool](https://github.com/probably-nothing-labs/benchmarking) that will send fake traffic to the locally run rust program.
```sh
git clone [email protected]:probably-nothing-labs/benchmarking.git
cd benchmarking
cargo run -- -d 60 -a 1000
```

This will start a simulation for 60s and will create two topics: `driver-imu-data` and `trips` which should have around ~58k and ~500 messages accordingly.
There are several other knobs that can be tuned to change the amount of traffic which can be viewed with `cargo run -- --help`.
There are also several other knobs that are not exposes but can be changed in the [src/main.rs](https://github.com/probably-nothing-labs/benchmarking/blob/main/src/main.rs#L104-L108) file

### Run a Streaming Datafusion job

```sh
cargo run --example kafka_rideshare
```

Once everything is setup and one of the two streaming jobs is running, it is recommend to re-run the kafka data generation tool so that live data is produced. This is because watermark tracking of streaming data makes it difficult to properly aggregate older data that lives in the kafka topic.
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ tracing = { workspace = true }
futures = { workspace = true }
tracing-log = { workspace = true }
tracing-subscriber = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tempfile = { version = "3" }
rdkafka = { workspace = true }
rand = "0.8.5"
51 changes: 51 additions & 0 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use datafusion::error::Result;
use rdkafka::producer::FutureProducer;
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;

#[derive(Serialize, Deserialize)]
pub struct Measurment {
occurred_at_ms: u64,
temperature: f64,
}

/// docker run -p 9092:9092 --name kafka apache/kafka
#[tokio::main]
async fn main() -> Result<()> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", String::from("localhost:9092"))
.set("message.timeout.ms", "60000")
.create()
.expect("Producer creation error");

let topic = "temperature".to_string();

loop {
let msg = serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
temperature: rand::random::<f64>() * 115.0,
})
.unwrap();

producer
.send(
FutureRecord::<(), Vec<u8>>::to(topic.as_str()).payload(&msg),
Timeout::Never,
)
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}

fn get_timestamp_ms() -> u64 {
let now = SystemTime::now();
let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");

since_the_epoch.as_millis() as u64
}
46 changes: 46 additions & 0 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::time::Duration;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion_expr::{col, max, min};

use df_streams_core::context::Context;
use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use df_streams_core::physical_plan::utils::time::TimestampUnit;

#[tokio::main]
async fn main() -> Result<()> {
let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#;

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(String::from("temperature"))
.infer_schema_from_json(sample_event)?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

let ds = ctx.from_topic(source_topic).await?.streaming_window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?;

ds.clone().print_stream().await?;

Ok(())
}
60 changes: 0 additions & 60 deletions examples/examples/test.rs

This file was deleted.

0 comments on commit 6d0da86

Please sign in to comment.