Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update docs #61

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions py-denormalized/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ Denormalized is a single node stream processing engine written in Rust. This dir

1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest`
3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example
3. Copy the [stream_aggregate.py](./python/examples/stream_aggregate.py) example

This script will connect to the kafka instance running in docker and aggregate the metrics in realtime.

There are several other examples in the [examples folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs.
There are several other examples in the [examples folder](./python/examples/) that demonstrate other capabilities including stream joins and UDAFs.

[API Docs](https://probably-nothing-labs.github.io/denormalized/denormalized.html)

## Development

Expand Down
45 changes: 40 additions & 5 deletions py-denormalized/python/denormalized/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,49 @@
"""
.. include:: ../../README.md
:start-line: 1
:end-before: Development
[Denormalized](https://www.denormalized.io/) is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀

1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest`

```python
sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}

def print_batch(rb):
pp.pprint(rb.to_pydict())

ds = Context().from_topic(
"temperature",
json.dumps(sample_event),
"localhost:9092",
"occurred_at_ms",
)

ds.window(
[col("sensor_name")],
[
f.count(col("reading"), distinct=False, filter=None).alias("count"),
f.min(col("reading")).alias("min"),
f.max(col("reading")).alias("max"),
],
1000,
None,
).sink(print_batch)
```


Head on over to the [examples folder](https://github.com/probably-nothing-labs/denormalized/tree/main/py-denormalized/python/examples) to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions.

"""

from .context import Context
from .data_stream import DataStream
from .datafusion import col, column, lit, literal, udf, udaf
from .datafusion.expr import Expr
from .datafusion import col, column
from .datafusion import functions as Functions
from .datafusion import lit, literal, udaf, udf
from .datafusion.expr import Expr

__all__ = [
"Context",
Expand Down
4 changes: 3 additions & 1 deletion py-denormalized/python/denormalized/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ def window(
) -> "DataStream":
"""Apply a windowing operation to the DataStream.

If `slide_millis` is `None` a tumbling window will be created otherwise a sliding window will be created.

Args:
group_exprs: List of expressions to group by
aggr_exprs: List of aggregation expressions to apply
window_length_millis: Length of the window in milliseconds
slide_millis: Optional slide interval in milliseconds (defaults to window_length)
slide_millis: Optional slide interval in milliseconds (defaults to None)

Returns:
DataStream: A new DataStream with the windowing operation applied
Expand Down
Loading