Skip to content

Commit

Permalink
Add ability to specify timestamp_column on kafka stream (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee authored Nov 12, 2024
1 parent 82bcac2 commit 031616c
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 23 deletions.
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ sample_event = {
def print_batch(rb):
print(rb)

ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092")
ds = Context().from_topic(
"temperature",
json.dumps(sample_event),
"localhost:9092",
"occurred_at_ms",
)

ds.window(
[col("sensor_name")],
Expand Down Expand Up @@ -90,7 +94,7 @@ Details about developing the python bindings can be found in [py-denormalized/RE

### Running an example

1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .`
1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 --name emit_measuremetns emgeee/kafka_emit_measurements:latest`
2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation`

### Checkpointing
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Denormalized is a single node stream processing engine written in Rust. This dir
## Getting Started

1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest`
3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example

This script will connect to the kafka instance running in docker and aggregate the metrics in realtime.
Expand Down
1 change: 1 addition & 0 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dev-dependencies = [
"ipython>=8.26.0",
"pytest>=8.3.2",
"maturin>=1.7.4",
"pyarrow-stubs>=17.11",
]

# Enable docstring linting using the google style guide
Expand Down
19 changes: 17 additions & 2 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from denormalized._d_internal import PyContext

from .data_stream import DataStream


class Context:
"""Context."""

Expand All @@ -14,9 +16,22 @@ def __repr__(self):
def __str__(self):
return self.ctx.__str__()

def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> DataStream:
def from_topic(
self,
topic: str,
sample_json: str,
bootstrap_servers: str,
timestamp_column: str,
group_id: str = "default_group",
) -> DataStream:
"""Create a new context from a topic."""
py_ds = self.ctx.from_topic(topic, sample_json, bootstrap_servers)
py_ds = self.ctx.from_topic(
topic,
sample_json,
bootstrap_servers,
timestamp_column,
group_id,
)
ds = DataStream(py_ds)

return ds
8 changes: 4 additions & 4 deletions py-denormalized/python/denormalized/data_stream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Callable

import pyarrow as pa
from denormalized._d_internal import PyDataStream
from denormalized.datafusion import Expr
from denormalized.utils import to_internal_expr, to_internal_exprs

from typing import Callable

class DataStream:
"""Represents a stream of data that can be manipulated using various operations."""
Expand Down Expand Up @@ -73,10 +74,9 @@ def with_column(self, name: str, predicate: Expr) -> "DataStream":
DataStream: A new DataStream with the additional column.
"""
return DataStream(self.ds.with_column(name, to_internal_expr(predicate)))

def drop_columns(self, columns: list[str]) -> "DataStream":
"""Drops columns from the DataStream.
"""
"""Drops columns from the DataStream."""
return DataStream(self.ds.drop_columns(columns))

def join_on(
Expand Down
9 changes: 7 additions & 2 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def signal_handler(sig, frame):
signal.signal(signal.SIGINT, signal_handler)

bootstrap_server = "localhost:9092"
timestamp_column = "occurred_at_ms"

sample_event = {
"occurred_at_ms": 100,
Expand All @@ -33,8 +34,12 @@ def print_batch(rb):
pp.pprint(rb.to_pydict())


ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server)
ds = Context().from_topic(
"temperature",
json.dumps(sample_event),
bootstrap_server,
timestamp_column,
)


ds.window(
Expand Down
14 changes: 10 additions & 4 deletions py-denormalized/python/examples/stream_join.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""stream_aggregate example.
docker build -t emgeee/kafka_emit_measurements:latest .
docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest
"""

import json
Expand All @@ -13,13 +13,14 @@
from denormalized.datafusion import functions as f


def signal_handler(sig, frame):
def signal_handler(_sig, _frame):
sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)

bootstrap_server = "localhost:9092"
timestamp_column = "occurred_at_ms"

sample_event = {
"occurred_at_ms": 100,
Expand All @@ -34,11 +35,16 @@ def print_batch(rb):

ctx = Context()
temperature_ds = ctx.from_topic(
"temperature", json.dumps(sample_event), bootstrap_server
"temperature", json.dumps(sample_event), bootstrap_server, timestamp_column
)

humidity_ds = (
ctx.from_topic("humidity", json.dumps(sample_event), bootstrap_server)
ctx.from_topic(
"humidity",
json.dumps(sample_event),
bootstrap_server,
timestamp_column,
)
.with_column("humidity_sensor", col("sensor_name"))
.drop_columns(["sensor_name"])
.window(
Expand Down
13 changes: 11 additions & 2 deletions py-denormalized/python/examples/udf_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,38 @@
def signal_handler(sig, frame):
sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)

bootstrap_server = "localhost:9092"
timestamp_column = "occurred_at_ms"

sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}


def gt(lhs: pa.Array, rhs: pa.Scalar) -> pa.Array:
return pc.greater(lhs, rhs)


greater_than_udf = udf(gt, [pa.float64(), pa.float64()], pa.bool_(), "stable")


def print_batch(rb: pa.RecordBatch):
if not len(rb):
return
print(rb)


ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server)
ds = Context().from_topic(
"temperature",
json.dumps(sample_event),
bootstrap_server,
timestamp_column,
)

ds.window(
[col("sensor_name")],
Expand Down
4 changes: 3 additions & 1 deletion py-denormalized/requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# last locked with the following flags:
# pre: false
# features: []
# features: ["dev"]
# all-features: false
# with-sources: false
# generate-hashes: false
Expand Down Expand Up @@ -46,6 +46,8 @@ pure-eval==0.2.3
pyarrow==17.0.0
# via datafusion
# via denormalized
# via pyarrow-stubs
pyarrow-stubs==17.11
pygments==2.18.0
# via ipython
pytest==8.3.2
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# last locked with the following flags:
# pre: false
# features: []
# features: ["dev"]
# all-features: false
# with-sources: false
# generate-hashes: false
Expand Down
8 changes: 5 additions & 3 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ impl PyContext {

pub fn from_topic(
&self,
py: Python,
topic: String,
sample_json: String,
bootstrap_servers: String,
py: Python,
timestamp_column: String,
group_id: String,
) -> PyResult<PyDataStream> {
let context = self.context.clone();
let rt = &get_tokio_runtime(py).0;
Expand All @@ -85,13 +87,13 @@ impl PyContext {
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_timestamp(timestamp_column, TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(topic)
.infer_schema_from_json(sample_json.as_str())?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
("group.id".to_string(), group_id.to_string()),
]))
.await?;

Expand Down

0 comments on commit 031616c

Please sign in to comment.