diff --git a/py-denormalized/README.md b/py-denormalized/README.md index a5dfe00..29e3b18 100644 --- a/py-denormalized/README.md +++ b/py-denormalized/README.md @@ -8,12 +8,13 @@ Denormalized is a single node stream processing engine written in Rust. This dir 1. Install denormalized `pip install denormalized` 2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest` -3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example +3. Copy the [stream_aggregate.py](./python/examples/stream_aggregate.py) example This script will connect to the kafka instance running in docker and aggregate the metrics in realtime. -There are several other examples in the [examples folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs. +There are several other examples in the [examples folder](./python/examples/) that demonstrate other capabilities including stream joins and UDAFs. +[API Docs](https://probably-nothing-labs.github.io/denormalized/denormalized.html) ## Development diff --git a/py-denormalized/python/denormalized/__init__.py b/py-denormalized/python/denormalized/__init__.py index 409c791..435eb4b 100644 --- a/py-denormalized/python/denormalized/__init__.py +++ b/py-denormalized/python/denormalized/__init__.py @@ -1,14 +1,49 @@ """ -.. include:: ../../README.md - :start-line: 1 - :end-before: Development +[Denormalized](https://www.denormalized.io/) is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀 + +1. Install denormalized `pip install denormalized` +2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest` + +```python +sample_event = { + "occurred_at_ms": 100, + "sensor_name": "foo", + "reading": 0.0, +} + +def print_batch(rb): + pp.pprint(rb.to_pydict()) + +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + "localhost:9092", + "occurred_at_ms", +) + +ds.window( + [col("sensor_name")], + [ + f.count(col("reading"), distinct=False, filter=None).alias("count"), + f.min(col("reading")).alias("min"), + f.max(col("reading")).alias("max"), + ], + 1000, + None, +).sink(print_batch) +``` + + +Head on over to the [examples folder](https://github.com/probably-nothing-labs/denormalized/tree/main/py-denormalized/python/examples) to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions. + """ from .context import Context from .data_stream import DataStream -from .datafusion import col, column, lit, literal, udf, udaf -from .datafusion.expr import Expr +from .datafusion import col, column from .datafusion import functions as Functions +from .datafusion import lit, literal, udaf, udf +from .datafusion.expr import Expr __all__ = [ "Context", diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index 2e228b4..f2c2d1c 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -157,11 +157,13 @@ def window( ) -> "DataStream": """Apply a windowing operation to the DataStream. + If `slide_millis` is `None` a tumbling window will be created otherwise a sliding window will be created. + Args: group_exprs: List of expressions to group by aggr_exprs: List of aggregation expressions to apply window_length_millis: Length of the window in milliseconds - slide_millis: Optional slide interval in milliseconds (defaults to window_length) + slide_millis: Optional slide interval in milliseconds (defaults to None) Returns: DataStream: A new DataStream with the windowing operation applied