diff --git a/.gitignore b/.gitignore
index 9026c77..a1ab9bc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
/target
.vscode
+.DS_Store
diff --git a/Cargo.lock b/Cargo.lock
index 99d8f0c..ef8c7e8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1110,7 +1110,9 @@ dependencies = [
"datafusion-physical-expr",
"df-streams-core",
"futures",
+ "rand",
"rdkafka",
+ "serde",
"serde_json",
"tempfile",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index 1e83231..34cd93d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,7 +50,7 @@ futures = "0.3"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = "0.3.18"
-tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
+tokio = { version = "1.36", features = ["macros", "rt", "sync", "rt-multi-thread"] }
async-trait = "0.1.81"
rdkafka = "0.36.2"
log = "^0.4"
diff --git a/README.md b/README.md
index 2ec0b23..2dcf250 100644
--- a/README.md
+++ b/README.md
@@ -1,23 +1,42 @@
-# Denormalized
+
Denormalized is a fast embeddable stream processing engine built on Apache DataFusion.
+It currently supports sourcing and sinking to kafka, windowed aggregations, and stream joins.
-While this repo is very much a *work-in-progress*, we currently support windowed aggregations and joins on streams of data with a
-connector available for Kafka.
+This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a github issue or email hello@denormalized.io.
-## Building Denormalized
+## Quickstart
-Simply run `cargo build`
+### Prerequisites
+- Docker
+- Rust/Cargo installed
-## Running Examples
+### Running an example
+1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka`
+2. Start emitting some sample data: `cargo run --example emit_measurements`
+3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example emit_measurements`
-See our [benchmarking repo](https://github.com/probably-nothing-labs/benchmarking) for local Kafka setup and data generation.
+## More examples
-With the data generation in place, run -
+A more powerful example can be seen in our [kafka ridesharing example](./docs/kafka_rideshare_example.md)
-`cargo run --example kafka_rideshare`
+## Roadmap
+- [x] Stream aggregation
+- [x] Stream joins
+- [ ] Checkpointing / restoration
+- [ ] Session windows
+- [ ] Stateful UDF API
+- [ ] DuckDB support
+- [ ] Reading/writing from Postgres
+- [ ] Python bindings
+- [ ] Typescript bindings
+- [ ] UI
## Credits
-Denormalized is built and maintained by [Denormalized Inc](www.denormalized.io) from San Francisco. Please drop in a line to
-hello@denormalized.io or simply open up a GitHub Issue.
+Denormalized is built and maintained by [Denormalized](https://www.denormalized.io) in San Francisco. Please drop in a line to
+hello@denormalized.io or simply open up a GitHub Issue!
diff --git a/docs/images/denormalized_dark.png b/docs/images/denormalized_dark.png
new file mode 100644
index 0000000..1c3e188
Binary files /dev/null and b/docs/images/denormalized_dark.png differ
diff --git a/docs/images/denormalized_logo.png b/docs/images/denormalized_logo.png
new file mode 100644
index 0000000..8bfac09
Binary files /dev/null and b/docs/images/denormalized_logo.png differ
diff --git a/docs/kafka_rideshare_example.md b/docs/kafka_rideshare_example.md
new file mode 100644
index 0000000..607c45c
--- /dev/null
+++ b/docs/kafka_rideshare_example.md
@@ -0,0 +1,35 @@
+# Kafka Rideshare Example
+
+This example application aggregates data across a more involved example setup.
+
+### Configure Kafka Cluster
+
+Clone our [docker compose files for running kafka](https://github.com/probably-nothing-labs/kafka-monitoring-stack-docker-compose). If you already have a different kafka cluster running, you can skip this step.
+```sh
+git clone git@github.com:probably-nothing-labs/kafka-monitoring-stack-docker-compose.git
+cd kafka-monitoring-stack-docker-compose
+docker compose -f denormalized-benchmark-cluster.yml up
+```
+
+This will spin up a 3 node kafka cluster in docker along with an instance of kafka-ui that can be viewed at http://localhost:8080/
+
+### Generate some sample data to the kafka cluster
+
+We wrote a [small rust tool](https://github.com/probably-nothing-labs/benchmarking) that will send fake traffic to the locally run rust program.
+```sh
+git clone git@github.com:probably-nothing-labs/benchmarking.git
+cd benchmarking
+cargo run -- -d 60 -a 1000
+```
+
+This will start a simulation for 60s and will create two topics: `driver-imu-data` and `trips` which should have around ~58k and ~500 messages accordingly.
+There are several other knobs that can be tuned to change the amount of traffic which can be viewed with `cargo run -- --help`.
+There are also several other knobs that are not exposes but can be changed in the [src/main.rs](https://github.com/probably-nothing-labs/benchmarking/blob/main/src/main.rs#L104-L108) file
+
+### Run a Streaming Datafusion job
+
+```sh
+cargo run --example kafka_rideshare
+```
+
+Once everything is setup and one of the two streaming jobs is running, it is recommend to re-run the kafka data generation tool so that live data is produced. This is because watermark tracking of streaming data makes it difficult to properly aggregate older data that lives in the kafka topic.
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 66611e9..a94c834 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -20,7 +20,9 @@ tracing = { workspace = true }
futures = { workspace = true }
tracing-log = { workspace = true }
tracing-subscriber = { workspace = true }
+serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tempfile = { version = "3" }
rdkafka = { workspace = true }
+rand = "0.8.5"
diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs
new file mode 100644
index 0000000..311bbcd
--- /dev/null
+++ b/examples/examples/emit_measurements.rs
@@ -0,0 +1,51 @@
+use datafusion::error::Result;
+use rdkafka::producer::FutureProducer;
+use serde::{Deserialize, Serialize};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use rdkafka::config::ClientConfig;
+use rdkafka::producer::FutureRecord;
+use rdkafka::util::Timeout;
+
+#[derive(Serialize, Deserialize)]
+pub struct Measurment {
+ occurred_at_ms: u64,
+ temperature: f64,
+}
+
+/// docker run -p 9092:9092 --name kafka apache/kafka
+#[tokio::main]
+async fn main() -> Result<()> {
+ let producer: FutureProducer = ClientConfig::new()
+ .set("bootstrap.servers", String::from("localhost:9092"))
+ .set("message.timeout.ms", "60000")
+ .create()
+ .expect("Producer creation error");
+
+ let topic = "temperature".to_string();
+
+ loop {
+ let msg = serde_json::to_vec(&Measurment {
+ occurred_at_ms: get_timestamp_ms(),
+ temperature: rand::random::() * 115.0,
+ })
+ .unwrap();
+
+ producer
+ .send(
+ FutureRecord::<(), Vec>::to(topic.as_str()).payload(&msg),
+ Timeout::Never,
+ )
+ .await
+ .unwrap();
+
+ tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
+ }
+}
+
+fn get_timestamp_ms() -> u64 {
+ let now = SystemTime::now();
+ let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
+
+ since_the_epoch.as_millis() as u64
+}
diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs
new file mode 100644
index 0000000..3ce7878
--- /dev/null
+++ b/examples/examples/simple_aggregation.rs
@@ -0,0 +1,46 @@
+use std::time::Duration;
+
+use datafusion::error::Result;
+use datafusion::functions_aggregate::average::avg;
+use datafusion_expr::{col, max, min};
+
+use df_streams_core::context::Context;
+use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
+use df_streams_core::physical_plan::utils::time::TimestampUnit;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#;
+
+ let bootstrap_servers = String::from("localhost:9092");
+
+ let ctx = Context::new()?;
+
+ let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());
+
+ let source_topic = topic_builder
+ .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
+ .with_encoding("json")?
+ .with_topic(String::from("temperature"))
+ .infer_schema_from_json(sample_event)?
+ .build_reader(ConnectionOpts::from([
+ ("auto.offset.reset".to_string(), "earliest".to_string()),
+ ("group.id".to_string(), "sample_pipeline".to_string()),
+ ]))
+ .await?;
+
+ let ds = ctx.from_topic(source_topic).await?.streaming_window(
+ vec![],
+ vec![
+ min(col("temperature")).alias("min"),
+ max(col("temperature")).alias("max"),
+ avg(col("temperature")).alias("average"),
+ ],
+ Duration::from_millis(1_000), // 5 second window
+ None,
+ )?;
+
+ ds.clone().print_stream().await?;
+
+ Ok(())
+}
diff --git a/examples/examples/test.rs b/examples/examples/test.rs
deleted file mode 100644
index d4d20b0..0000000
--- a/examples/examples/test.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-#![allow(dead_code)]
-#![allow(unused_variables)]
-#![allow(unused_imports)]
-
-use datafusion::error::Result;
-use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaReadConfig, KafkaTopicBuilder};
-use std::{sync::Arc, time::Duration};
-
-use rdkafka::admin::AdminClient;
-use rdkafka::admin::AdminOptions;
-use rdkafka::admin::ConfigResource;
-use rdkafka::admin::ConfigResourceResult;
-use rdkafka::admin::ResourceSpecifier;
-use rdkafka::config::ClientConfig;
-use rdkafka::config::FromClientConfig;
-use rdkafka::consumer::Consumer;
-use rdkafka::consumer::StreamConsumer;
-use rdkafka::metadata::MetadataTopic;
-use tracing::field::debug;
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092");
- let mut client_config = ClientConfig::new();
- client_config.set("bootstrap.servers", bootstrap_servers.to_string());
-
- let admin = AdminClient::from_config(&client_config).unwrap();
-
- // let res: Vec = admin
- // .describe_configs(
- // &vec![ResourceSpecifier::Topic("out_topic")],
- // &AdminOptions::default(),
- // )
- // .await
- // .unwrap()
- // .into_iter()
- // .map(|v| v.unwrap())
- // .collect();
- //
- // for (k, v) in res[0].entry_map().into_iter() {
- // println!("{}: {:?}", k, v.value);
- // }
-
- let mut client_config = ClientConfig::new();
-
- client_config.set("bootstrap.servers", bootstrap_servers.to_string());
-
- let consumer: StreamConsumer = client_config.create().expect("Consumer creation failed");
-
- let data = consumer
- .fetch_metadata(Some("out_topic"), Duration::from_millis(5_000))
- .unwrap();
- let topic_metadata = data.topics();
- let md = &topic_metadata[0];
- let partitions = md.partitions();
-
- println!("{:?}", partitions.len());
-
- Ok(())
-}