diff --git a/README.md b/README.md index fb5b526..a4d5e1a 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,83 @@ Denormalized is a fast embeddable stream processing engine built on Apache DataFusion. -It currently supports kafka as a real-time source and a sink, windowed aggregations, and stream joins. +It currently supports kafka as a real-time source and sink, windowed aggregations, and stream joins. -## Quick Start +Denormalized is *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a [github issue](https://github.com/probably-nothing-labs/denormalized/issues) or email `hello@denormalized.io`. + + +Here's an example job that aggregates sensor values from a kafka topic: + +```rust +// Connect to source topic +let source_topic = topic_builder + .with_topic(String::from("temperature")) + .infer_schema_from_json(get_sample_json().as_str())? + .with_encoding("json")? + .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) + .build_reader(ConnectionOpts::from([ + ("auto.offset.reset".to_string(), "latest".to_string()), + ("group.id".to_string(), "sample_pipeline".to_string()), + ])) + .await?; + +ctx.from_topic(source_topic) + .await? + .window( + vec![col("sensor_name")], + vec![ + count(col("reading")).alias("count"), + min(col("reading")).alias("min"), + max(col("reading")).alias("max"), + avg(col("reading")).alias("average"), + ], + Duration::from_millis(1_000), // aggregate every 1 second + None, // None means tumbling window + )? + .filter(col("max").gt(lit(113)))? + .print_stream() // Print out the results + .await?; +``` + +Denormalized also has python bindings in the [py-denormalized/](py-denormalized/) folder. Here is the same example using python: + +```python +import json +from denormalized import Context +from denormalized.datafusion import col +from denormalized.datafusion import functions as f +from denormalized.datafusion import lit + +sample_event = { + "occurred_at_ms": 100, + "sensor_name": "foo", + "reading": 0.0, +} + +def print_batch(rb): + print(rb) + +ctx = Context() +ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092") + +ds.window( + [col("sensor_name")], + [ + f.count(col("reading"), distinct=False, filter=None).alias("count"), + f.min(col("reading")).alias("min"), + f.max(col("reading")).alias("max"), + f.avg(col("reading")).alias("average"), + ], + 1000, + None, +).filter(col("max") > (lit(113))).sink(print_batch) +``` + +The python version is available on [pypi](https://pypi.org/project/denormalized/0.0.4/): `pip install denormalized` + +Details about developing the python bindings can be found in [py-denormalized/README.md](py-denormalized/README.md) + +## Rust Quick Start ### Prerequisites @@ -24,21 +98,6 @@ It currently supports kafka as a real-time source and a sink, windowed aggregati A more powerful example can be seen in our [Kafka ridesharing example](./docs/kafka_rideshare_example.md) -## Roadmap - -- [x] Stream aggregation -- [x] Stream joins -- [ ] Checkpointing / restoration -- [ ] Session windows -- [ ] Stateful UDF API -- [ ] DuckDB support -- [ ] Reading/writing from Postgres -- [ ] Python bindings -- [ ] Typescript bindings -- [ ] UI - ## Credits Denormalized is built and maintained by [Denormalized](https://www.denormalized.io) in San Francisco. - -This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a [github issue](https://github.com/probably-nothing-labs/denormalized/issues) or email `hello@denormalized.io`. \ No newline at end of file diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index b520a7b..ff8089c 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -78,8 +78,6 @@ async fn main() -> Result<()> { Some(Duration::from_millis(1_000)), // 1 second slide )?; - // ds.clone().print_stream().await?; - ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) .await?; diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 98e396f..27481e8 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -18,16 +18,13 @@ async fn main() -> Result<()> { .filter_level(log::LevelFilter::Info) .init(); - let sample_event = get_sample_json(); - - let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()?; - let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); + let mut topic_builder = KafkaTopicBuilder::new("localhost:9092".to_string()); + // Connect to source topic let source_topic = topic_builder .with_topic(String::from("temperature")) - .infer_schema_from_json(sample_event.as_str())? + .infer_schema_from_json(get_sample_json().as_str())? .with_encoding("json")? .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) .build_reader(ConnectionOpts::from([ @@ -36,8 +33,7 @@ async fn main() -> Result<()> { ])) .await?; - let ds = ctx - .from_topic(source_topic) + ctx.from_topic(source_topic) .await? .window( vec![col("sensor_name")], @@ -47,12 +43,12 @@ async fn main() -> Result<()> { max(col("reading")).alias("max"), avg(col("reading")).alias("average"), ], - Duration::from_millis(1_000), - None, + Duration::from_millis(1_000), // aggregate every 1 second + None, // None means tumbling window )? - .filter(col("max").gt(lit(113)))?; - ds.clone().print_physical_plan().await?; - ds.clone().print_stream().await?; + .filter(col("max").gt(lit(113)))? + .print_stream() // Print out the results + .await?; Ok(()) } diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index f96d0ac..7c1c548 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -1,18 +1,20 @@ """stream_aggregate example.""" + import json +import signal +import sys -import pyarrow as pa from denormalized import Context -from denormalized.datafusion import lit, col +from denormalized.datafusion import col from denormalized.datafusion import functions as f +from denormalized.datafusion import lit -import signal -import sys def signal_handler(sig, frame): - print('You pressed Ctrl+C!') + print("You pressed Ctrl+C!") sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" @@ -23,9 +25,11 @@ def signal_handler(sig, frame): "reading": 0.0, } + def print_batch(rb): print(rb) + ctx = Context() ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) @@ -33,15 +37,11 @@ def print_batch(rb): ds.window( [col("sensor_name")], [ - f.count(col("reading"), distinct=False, filter=None).alias( - "count" - ), + f.count(col("reading"), distinct=False, filter=None).alias("count"), f.min(col("reading")).alias("min"), f.max(col("reading")).alias("max"), f.avg(col("reading")).alias("average"), ], 1000, None, -).filter( - col("max") > (lit(113)) -).sink(print_batch) +).filter(col("max") > (lit(113))).sink(print_batch)