Skip to content

Commit

Permalink
Update README
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Oct 8, 2024
1 parent 8c28354 commit b77cf6d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 43 deletions.
93 changes: 76 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,83 @@
</h1>

Denormalized is a fast embeddable stream processing engine built on Apache DataFusion.
It currently supports kafka as a real-time source and a sink, windowed aggregations, and stream joins.
It currently supports kafka as a real-time source and sink, windowed aggregations, and stream joins.

## Quick Start
Denormalized is *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a [github issue](https://github.com/probably-nothing-labs/denormalized/issues) or email `[email protected]`.


Here's an example job that aggregates sensor values from a kafka topic:

```rust
// Connect to source topic
let source_topic = topic_builder
.with_topic(String::from("temperature"))
.infer_schema_from_json(get_sample_json().as_str())?
.with_encoding("json")?
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

ctx.from_topic(source_topic)
.await?
.window(
vec![col("sensor_name")],
vec![
count(col("reading")).alias("count"),
min(col("reading")).alias("min"),
max(col("reading")).alias("max"),
avg(col("reading")).alias("average"),
],
Duration::from_millis(1_000), // aggregate every 1 second
None, // None means tumbling window
)?
.filter(col("max").gt(lit(113)))?
.print_stream() // Print out the results
.await?;
```

Denormalized also has python bindings in the [py-denormalized/](py-denormalized/) folder. Here is the same example using python:

```python
import json
from denormalized import Context
from denormalized.datafusion import col
from denormalized.datafusion import functions as f
from denormalized.datafusion import lit

sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}

def print_batch(rb):
print(rb)

ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092")

ds.window(
[col("sensor_name")],
[
f.count(col("reading"), distinct=False, filter=None).alias("count"),
f.min(col("reading")).alias("min"),
f.max(col("reading")).alias("max"),
f.avg(col("reading")).alias("average"),
],
1000,
None,
).filter(col("max") > (lit(113))).sink(print_batch)
```

The python version is available on [pypi](https://pypi.org/project/denormalized/0.0.4/): `pip install denormalized`

Details about developing the python bindings can be found in [py-denormalized/README.md](py-denormalized/README.md)

## Rust Quick Start

### Prerequisites

Expand All @@ -24,21 +98,6 @@ It currently supports kafka as a real-time source and a sink, windowed aggregati

A more powerful example can be seen in our [Kafka ridesharing example](./docs/kafka_rideshare_example.md)

## Roadmap

- [x] Stream aggregation
- [x] Stream joins
- [ ] Checkpointing / restoration
- [ ] Session windows
- [ ] Stateful UDF API
- [ ] DuckDB support
- [ ] Reading/writing from Postgres
- [ ] Python bindings
- [ ] Typescript bindings
- [ ] UI

## Credits

Denormalized is built and maintained by [Denormalized](https://www.denormalized.io) in San Francisco.

This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a [github issue](https://github.com/probably-nothing-labs/denormalized/issues) or email `[email protected]`.
2 changes: 0 additions & 2 deletions examples/examples/kafka_rideshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ async fn main() -> Result<()> {
Some(Duration::from_millis(1_000)), // 1 second slide
)?;

// ds.clone().print_stream().await?;

ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic"))
.await?;

Expand Down
22 changes: 9 additions & 13 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ async fn main() -> Result<()> {
.filter_level(log::LevelFilter::Info)
.init();

let sample_event = get_sample_json();

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());
let mut topic_builder = KafkaTopicBuilder::new("localhost:9092".to_string());

// Connect to source topic
let source_topic = topic_builder
.with_topic(String::from("temperature"))
.infer_schema_from_json(sample_event.as_str())?
.infer_schema_from_json(get_sample_json().as_str())?
.with_encoding("json")?
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.build_reader(ConnectionOpts::from([
Expand All @@ -36,8 +33,7 @@ async fn main() -> Result<()> {
]))
.await?;

let ds = ctx
.from_topic(source_topic)
ctx.from_topic(source_topic)
.await?
.window(
vec![col("sensor_name")],
Expand All @@ -47,12 +43,12 @@ async fn main() -> Result<()> {
max(col("reading")).alias("max"),
avg(col("reading")).alias("average"),
],
Duration::from_millis(1_000),
None,
Duration::from_millis(1_000), // aggregate every 1 second
None, // None means tumbling window
)?
.filter(col("max").gt(lit(113)))?;
ds.clone().print_physical_plan().await?;
ds.clone().print_stream().await?;
.filter(col("max").gt(lit(113)))?
.print_stream() // Print out the results
.await?;

Ok(())
}
22 changes: 11 additions & 11 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
"""stream_aggregate example."""

import json
import signal
import sys

import pyarrow as pa
from denormalized import Context
from denormalized.datafusion import lit, col
from denormalized.datafusion import col
from denormalized.datafusion import functions as f
from denormalized.datafusion import lit

import signal
import sys

def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
print("You pressed Ctrl+C!")
sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)

bootstrap_server = "localhost:9092"
Expand All @@ -23,25 +25,23 @@ def signal_handler(sig, frame):
"reading": 0.0,
}


def print_batch(rb):
print(rb)


ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server)


ds.window(
[col("sensor_name")],
[
f.count(col("reading"), distinct=False, filter=None).alias(
"count"
),
f.count(col("reading"), distinct=False, filter=None).alias("count"),
f.min(col("reading")).alias("min"),
f.max(col("reading")).alias("max"),
f.avg(col("reading")).alias("average"),
],
1000,
None,
).filter(
col("max") > (lit(113))
).sink(print_batch)
).filter(col("max") > (lit(113))).sink(print_batch)

0 comments on commit b77cf6d

Please sign in to comment.