From 031616c207eef166af2a4b0219a1595af3f7e17f Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 12 Nov 2024 14:04:58 -0800 Subject: [PATCH] Add ability to specify timestamp_column on kafka stream (#57) --- README.md | 10 +++++++--- py-denormalized/README.md | 2 +- py-denormalized/pyproject.toml | 1 + .../python/denormalized/context.py | 19 +++++++++++++++++-- .../python/denormalized/data_stream.py | 8 ++++---- .../python/examples/stream_aggregate.py | 9 +++++++-- .../python/examples/stream_join.py | 14 ++++++++++---- .../python/examples/udf_example.py | 13 +++++++++++-- py-denormalized/requirements-dev.lock | 4 +++- py-denormalized/requirements.lock | 2 +- py-denormalized/src/context.rs | 8 +++++--- 11 files changed, 67 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 55ad026..ce1626d 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,12 @@ sample_event = { def print_batch(rb): print(rb) -ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092") +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + "localhost:9092", + "occurred_at_ms", +) ds.window( [col("sensor_name")], @@ -90,7 +94,7 @@ Details about developing the python bindings can be found in [py-denormalized/RE ### Running an example -1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 --name emit_measuremetns emgeee/kafka_emit_measurements:latest` 2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` ### Checkpointing diff --git a/py-denormalized/README.md b/py-denormalized/README.md index 1bc40a0..886273a 100644 --- a/py-denormalized/README.md +++ b/py-denormalized/README.md @@ -8,7 +8,7 @@ Denormalized is a single node stream processing engine written in Rust. This dir ## Getting Started 1. Install denormalized `pip install denormalized` -2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest` 3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example This script will connect to the kafka instance running in docker and aggregate the metrics in realtime. diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index de58d3e..9af3a12 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -29,6 +29,7 @@ dev-dependencies = [ "ipython>=8.26.0", "pytest>=8.3.2", "maturin>=1.7.4", + "pyarrow-stubs>=17.11", ] # Enable docstring linting using the google style guide diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 1b3c5df..e3f7c84 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -1,6 +1,8 @@ from denormalized._d_internal import PyContext + from .data_stream import DataStream + class Context: """Context.""" @@ -14,9 +16,22 @@ def __repr__(self): def __str__(self): return self.ctx.__str__() - def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> DataStream: + def from_topic( + self, + topic: str, + sample_json: str, + bootstrap_servers: str, + timestamp_column: str, + group_id: str = "default_group", + ) -> DataStream: """Create a new context from a topic.""" - py_ds = self.ctx.from_topic(topic, sample_json, bootstrap_servers) + py_ds = self.ctx.from_topic( + topic, + sample_json, + bootstrap_servers, + timestamp_column, + group_id, + ) ds = DataStream(py_ds) return ds diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index ca28dfb..e3790e0 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -1,9 +1,10 @@ +from typing import Callable + import pyarrow as pa from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from denormalized.utils import to_internal_expr, to_internal_exprs -from typing import Callable class DataStream: """Represents a stream of data that can be manipulated using various operations.""" @@ -73,10 +74,9 @@ def with_column(self, name: str, predicate: Expr) -> "DataStream": DataStream: A new DataStream with the additional column. """ return DataStream(self.ds.with_column(name, to_internal_expr(predicate))) - + def drop_columns(self, columns: list[str]) -> "DataStream": - """Drops columns from the DataStream. - """ + """Drops columns from the DataStream.""" return DataStream(self.ds.drop_columns(columns)) def join_on( diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index 6e7d5fd..6a88982 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -21,6 +21,7 @@ def signal_handler(sig, frame): signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" +timestamp_column = "occurred_at_ms" sample_event = { "occurred_at_ms": 100, @@ -33,8 +34,12 @@ def print_batch(rb): pp.pprint(rb.to_pydict()) -ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + bootstrap_server, + timestamp_column, +) ds.window( diff --git a/py-denormalized/python/examples/stream_join.py b/py-denormalized/python/examples/stream_join.py index 10804aa..70e76c2 100644 --- a/py-denormalized/python/examples/stream_join.py +++ b/py-denormalized/python/examples/stream_join.py @@ -1,6 +1,6 @@ """stream_aggregate example. -docker build -t emgeee/kafka_emit_measurements:latest . +docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest """ import json @@ -13,13 +13,14 @@ from denormalized.datafusion import functions as f -def signal_handler(sig, frame): +def signal_handler(_sig, _frame): sys.exit(0) signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" +timestamp_column = "occurred_at_ms" sample_event = { "occurred_at_ms": 100, @@ -34,11 +35,16 @@ def print_batch(rb): ctx = Context() temperature_ds = ctx.from_topic( - "temperature", json.dumps(sample_event), bootstrap_server + "temperature", json.dumps(sample_event), bootstrap_server, timestamp_column ) humidity_ds = ( - ctx.from_topic("humidity", json.dumps(sample_event), bootstrap_server) + ctx.from_topic( + "humidity", + json.dumps(sample_event), + bootstrap_server, + timestamp_column, + ) .with_column("humidity_sensor", col("sensor_name")) .drop_columns(["sensor_name"]) .window( diff --git a/py-denormalized/python/examples/udf_example.py b/py-denormalized/python/examples/udf_example.py index 4993cfe..a198705 100644 --- a/py-denormalized/python/examples/udf_example.py +++ b/py-denormalized/python/examples/udf_example.py @@ -15,9 +15,11 @@ def signal_handler(sig, frame): sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" +timestamp_column = "occurred_at_ms" sample_event = { "occurred_at_ms": 100, @@ -25,19 +27,26 @@ def signal_handler(sig, frame): "reading": 0.0, } + def gt(lhs: pa.Array, rhs: pa.Scalar) -> pa.Array: return pc.greater(lhs, rhs) + greater_than_udf = udf(gt, [pa.float64(), pa.float64()], pa.bool_(), "stable") + def print_batch(rb: pa.RecordBatch): if not len(rb): return print(rb) -ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + bootstrap_server, + timestamp_column, +) ds.window( [col("sensor_name")], diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index c1d8781..92923c9 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -3,7 +3,7 @@ # # last locked with the following flags: # pre: false -# features: [] +# features: ["dev"] # all-features: false # with-sources: false # generate-hashes: false @@ -46,6 +46,8 @@ pure-eval==0.2.3 pyarrow==17.0.0 # via datafusion # via denormalized + # via pyarrow-stubs +pyarrow-stubs==17.11 pygments==2.18.0 # via ipython pytest==8.3.2 diff --git a/py-denormalized/requirements.lock b/py-denormalized/requirements.lock index 5980bbc..216bf46 100644 --- a/py-denormalized/requirements.lock +++ b/py-denormalized/requirements.lock @@ -3,7 +3,7 @@ # # last locked with the following flags: # pre: false -# features: [] +# features: ["dev"] # all-features: false # with-sources: false # generate-hashes: false diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index e6e6ffa..cee7a88 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -73,10 +73,12 @@ impl PyContext { pub fn from_topic( &self, + py: Python, topic: String, sample_json: String, bootstrap_servers: String, - py: Python, + timestamp_column: String, + group_id: String, ) -> PyResult { let context = self.context.clone(); let rt = &get_tokio_runtime(py).0; @@ -85,13 +87,13 @@ impl PyContext { let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic = topic_builder - .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) + .with_timestamp(timestamp_column, TimestampUnit::Int64Millis) .with_encoding("json")? .with_topic(topic) .infer_schema_from_json(sample_json.as_str())? .build_reader(ConnectionOpts::from([ ("auto.offset.reset".to_string(), "latest".to_string()), - ("group.id".to_string(), "sample_pipeline".to_string()), + ("group.id".to_string(), group_id.to_string()), ])) .await?;