Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Readme #12

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
.vscode
.DS_Store
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ futures = "0.3"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = "0.3.18"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
tokio = { version = "1.36", features = ["macros", "rt", "sync", "rt-multi-thread"] }
async-trait = "0.1.81"
rdkafka = "0.36.2"
log = "^0.4"
Expand Down
41 changes: 30 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
# Denormalized
<h1>
<a href="https://www.denormalized.io">
<img src="./docs/images/denormalized_dark.png" alt="Denormalized Logo" width="512">
</a>
</h1>

Denormalized is a fast embeddable stream processing engine built on Apache DataFusion.
It currently supports sourcing and sinking to kafka, windowed aggregations, and stream joins.

While this repo is very much a *work-in-progress*, we currently support windowed aggregations and joins on streams of data with a
connector available for Kafka.
This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a github issue or email [email protected].

## Building Denormalized
## Quickstart

Simply run `cargo build`
### Prerequisites
- Docker
- Rust/Cargo installed

## Running Examples
### Running an example
1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka`
2. Start emitting some sample data: `cargo run --example emit_measurements`
3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example emit_measurements`

See our [benchmarking repo](https://github.com/probably-nothing-labs/benchmarking) for local Kafka setup and data generation.
## More examples

With the data generation in place, run -
A more powerful example can be seen in our [kafka ridesharing example](./docs/kafka_rideshare_example.md)

`cargo run --example kafka_rideshare`
## Roadmap
- [x] Stream aggregation
- [x] Stream joins
- [ ] Checkpointing / restoration
- [ ] Session windows
- [ ] Stateful UDF API
- [ ] DuckDB support
- [ ] Reading/writing from Postgres
- [ ] Python bindings
- [ ] Typescript bindings
- [ ] UI

## Credits

Denormalized is built and maintained by [Denormalized Inc](www.denormalized.io) from San Francisco. Please drop in a line to
[email protected] or simply open up a GitHub Issue.
Denormalized is built and maintained by [Denormalized](https://www.denormalized.io) in San Francisco. Please drop in a line to
[email protected] or simply open up a GitHub Issue!
Binary file added docs/images/denormalized_dark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/denormalized_logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 35 additions & 0 deletions docs/kafka_rideshare_example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kafka Rideshare Example

This example application aggregates data across a more involved example setup.

### Configure Kafka Cluster

Clone our [docker compose files for running kafka](https://github.com/probably-nothing-labs/kafka-monitoring-stack-docker-compose). If you already have a different kafka cluster running, you can skip this step.
```sh
git clone [email protected]:probably-nothing-labs/kafka-monitoring-stack-docker-compose.git
cd kafka-monitoring-stack-docker-compose
docker compose -f denormalized-benchmark-cluster.yml up
```

This will spin up a 3 node kafka cluster in docker along with an instance of kafka-ui that can be viewed at http://localhost:8080/

### Generate some sample data to the kafka cluster

We wrote a [small rust tool](https://github.com/probably-nothing-labs/benchmarking) that will send fake traffic to the locally run rust program.
```sh
git clone [email protected]:probably-nothing-labs/benchmarking.git
cd benchmarking
cargo run -- -d 60 -a 1000
```

This will start a simulation for 60s and will create two topics: `driver-imu-data` and `trips` which should have around ~58k and ~500 messages accordingly.
There are several other knobs that can be tuned to change the amount of traffic which can be viewed with `cargo run -- --help`.
There are also several other knobs that are not exposes but can be changed in the [src/main.rs](https://github.com/probably-nothing-labs/benchmarking/blob/main/src/main.rs#L104-L108) file

### Run a Streaming Datafusion job

```sh
cargo run --example kafka_rideshare
```

Once everything is setup and one of the two streaming jobs is running, it is recommend to re-run the kafka data generation tool so that live data is produced. This is because watermark tracking of streaming data makes it difficult to properly aggregate older data that lives in the kafka topic.
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ tracing = { workspace = true }
futures = { workspace = true }
tracing-log = { workspace = true }
tracing-subscriber = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tempfile = { version = "3" }
rdkafka = { workspace = true }
rand = "0.8.5"
51 changes: 51 additions & 0 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use datafusion::error::Result;
use rdkafka::producer::FutureProducer;
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;

#[derive(Serialize, Deserialize)]
pub struct Measurment {
occurred_at_ms: u64,
temperature: f64,
}

/// docker run -p 9092:9092 --name kafka apache/kafka
#[tokio::main]
async fn main() -> Result<()> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", String::from("localhost:9092"))
.set("message.timeout.ms", "60000")
.create()
.expect("Producer creation error");

let topic = "temperature".to_string();

loop {
let msg = serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
temperature: rand::random::<f64>() * 115.0,
})
.unwrap();

producer
.send(
FutureRecord::<(), Vec<u8>>::to(topic.as_str()).payload(&msg),
Timeout::Never,
)
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}

fn get_timestamp_ms() -> u64 {
let now = SystemTime::now();
let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");

since_the_epoch.as_millis() as u64
}
46 changes: 46 additions & 0 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::time::Duration;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion_expr::{col, max, min};

use df_streams_core::context::Context;
use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use df_streams_core::physical_plan::utils::time::TimestampUnit;

#[tokio::main]
async fn main() -> Result<()> {
let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#;

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(String::from("temperature"))
.infer_schema_from_json(sample_event)?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

let ds = ctx.from_topic(source_topic).await?.streaming_window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?;

ds.clone().print_stream().await?;

Ok(())
}
60 changes: 0 additions & 60 deletions examples/examples/test.rs

This file was deleted.