From 15bf8f9e7d675838e75f910ead1e478db945cabc Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 1 Nov 2024 14:42:40 -0700 Subject: [PATCH 01/10] Add example python udaf --- .gitignore | 2 + py-denormalized/pyproject.toml | 8 +- .../python/examples/udaf_example.py | 106 +++++++++ py-denormalized/requirements-dev.lock | 214 ++++++++++++++++++ 4 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 py-denormalized/python/examples/udaf_example.py diff --git a/.gitignore b/.gitignore index a1ab9bc..dd1acb4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target .vscode .DS_Store +.ipynb_checkpoints/ +Untitled.ipynb diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 171eae2..dc49053 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -23,7 +23,13 @@ features = ["pyo3/extension-module"] module-name = "denormalized._internal" [tool.rye] -dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"] +dev-dependencies = [ + "pip>=24.2", + "ipython>=8.26.0", + "pytest>=8.3.2", + "pandas>=2.2.3", + "jupyterlab>=4.3.0", +] # Enable docstring linting using the google style guide [tool.ruff.lint] diff --git a/py-denormalized/python/examples/udaf_example.py b/py-denormalized/python/examples/udaf_example.py new file mode 100644 index 0000000..797f77b --- /dev/null +++ b/py-denormalized/python/examples/udaf_example.py @@ -0,0 +1,106 @@ +"""stream_aggregate example.""" + +import json +import signal +import sys +from collections import Counter + +import pyarrow as pa +from denormalized import Context +from denormalized.datafusion import Accumulator, col +from denormalized.datafusion import functions as f +from denormalized.datafusion import lit, udaf + + +def signal_handler(sig, frame): + sys.exit(0) + + +signal.signal(signal.SIGINT, signal_handler) + +bootstrap_server = "localhost:9092" + +sample_event = { + "occurred_at_ms": 100, + "sensor_name": "foo", + "reading": 0.0, +} + + +class TotalValuesRead(Accumulator): + # Define the state type as a struct containing a map + acc_state_type = pa.struct([("counts", pa.map_(pa.string(), pa.int64()))]) + + def __init__(self): + self.counts = Counter() + + def update(self, values: pa.Array) -> None: + # Update counter with new values + if values is not None: + self.counts.update(values.to_pylist()) + + def merge(self, states: pa.Array) -> None: + # Merge multiple states into this accumulator + for state in states: + if state is not None: + count_map = state["counts"] + # Iterate through the map's keys and values + for k, v in zip(count_map.keys(), count_map.values()): + self.counts[k.as_py()] += v.as_py() + + def state(self) -> pa.Array: + # Convert current state to Arrow array format + if not self.counts: + # Handle empty state + return pa.array( + [{"counts": pa.array([], type=pa.map_(pa.string(), pa.int64()))}], + type=self.acc_state_type, + ) + + # Convert counter to key-value pairs + keys, values = zip(*self.counts.items()) + + # Create a single-element array containing our state struct + return pa.array( + [ + { + "counts": pa.array( + list(zip(keys, values)), type=pa.map_(pa.string(), pa.int64()) + ) + } + ], + type=self.acc_state_type, + ) + + def evaluate(self) -> pa.Array: + # Convert final state to output format + if not self.counts: + return pa.array([], type=pa.map_(pa.string(), pa.int64())) + + keys, values = zip(*self.counts.items()) + return pa.array(list(zip(keys, values)), type=pa.map_(pa.string(), pa.int64())) + + +input_type = [pa.string()] +return_type = pa.string() +state_type = [TotalValuesRead.acc_state_type] +sample_udaf = udaf(TotalValuesRead, input_type, return_type, state_type, "stable") + + +def print_batch(rb: pa.RecordBatch): + if not len(rb): + return + print(rb) + + +ctx = Context() +ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) + +ds.window( + [], + [ + sample_udaf(col("sensor_name")).alias("count"), + ], + 1000, + None, +).sink(print_batch) diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index 1cfbc8f..d8d69b3 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -10,52 +10,266 @@ # universal: false -e file:. +anyio==4.6.2.post1 + # via httpx + # via jupyter-server +appnope==0.1.4 + # via ipykernel +argon2-cffi==23.1.0 + # via jupyter-server +argon2-cffi-bindings==21.2.0 + # via argon2-cffi +arrow==1.3.0 + # via isoduration asttokens==2.4.1 # via stack-data +async-lru==2.0.4 + # via jupyterlab +attrs==24.2.0 + # via jsonschema + # via referencing +babel==2.16.0 + # via jupyterlab-server +beautifulsoup4==4.12.3 + # via nbconvert +bleach==6.2.0 + # via nbconvert +certifi==2024.8.30 + # via httpcore + # via httpx + # via requests +cffi==1.17.1 + # via argon2-cffi-bindings +charset-normalizer==3.4.0 + # via requests +comm==0.2.2 + # via ipykernel datafusion==40.1.0 # via denormalized +debugpy==1.8.7 + # via ipykernel decorator==5.1.1 # via ipython +defusedxml==0.7.1 + # via nbconvert executing==2.0.1 # via stack-data +fastjsonschema==2.20.0 + # via nbformat +fqdn==1.5.1 + # via jsonschema +h11==0.14.0 + # via httpcore +httpcore==1.0.6 + # via httpx +httpx==0.27.2 + # via jupyterlab +idna==3.10 + # via anyio + # via httpx + # via jsonschema + # via requests iniconfig==2.0.0 # via pytest +ipykernel==6.29.5 + # via jupyterlab ipython==8.26.0 + # via ipykernel +isoduration==20.11.0 + # via jsonschema jedi==0.19.1 # via ipython +jinja2==3.1.4 + # via jupyter-server + # via jupyterlab + # via jupyterlab-server + # via nbconvert +json5==0.9.25 + # via jupyterlab-server +jsonpointer==3.0.0 + # via jsonschema +jsonschema==4.23.0 + # via jupyter-events + # via jupyterlab-server + # via nbformat +jsonschema-specifications==2024.10.1 + # via jsonschema +jupyter-client==8.6.3 + # via ipykernel + # via jupyter-server + # via nbclient +jupyter-core==5.7.2 + # via ipykernel + # via jupyter-client + # via jupyter-server + # via jupyterlab + # via nbclient + # via nbconvert + # via nbformat +jupyter-events==0.10.0 + # via jupyter-server +jupyter-lsp==2.2.5 + # via jupyterlab +jupyter-server==2.14.2 + # via jupyter-lsp + # via jupyterlab + # via jupyterlab-server + # via notebook-shim +jupyter-server-terminals==0.5.3 + # via jupyter-server +jupyterlab==4.3.0 +jupyterlab-pygments==0.3.0 + # via nbconvert +jupyterlab-server==2.27.3 + # via jupyterlab +markupsafe==3.0.2 + # via jinja2 + # via nbconvert matplotlib-inline==0.1.7 + # via ipykernel # via ipython +mistune==3.0.2 + # via nbconvert +nbclient==0.10.0 + # via nbconvert +nbconvert==7.16.4 + # via jupyter-server +nbformat==5.10.4 + # via jupyter-server + # via nbclient + # via nbconvert +nest-asyncio==1.6.0 + # via ipykernel +notebook-shim==0.2.4 + # via jupyterlab numpy==2.1.0 + # via pandas # via pyarrow +overrides==7.7.0 + # via jupyter-server packaging==24.1 + # via ipykernel + # via jupyter-server + # via jupyterlab + # via jupyterlab-server + # via nbconvert # via pytest +pandas==2.2.3 +pandocfilters==1.5.1 + # via nbconvert parso==0.8.4 # via jedi pexpect==4.9.0 # via ipython pip==24.2 +platformdirs==4.3.6 + # via jupyter-core pluggy==1.5.0 # via pytest +prometheus-client==0.21.0 + # via jupyter-server prompt-toolkit==3.0.47 # via ipython +psutil==6.1.0 + # via ipykernel ptyprocess==0.7.0 # via pexpect + # via terminado pure-eval==0.2.3 # via stack-data pyarrow==17.0.0 # via datafusion # via denormalized +pycparser==2.22 + # via cffi pygments==2.18.0 # via ipython + # via nbconvert pytest==8.3.2 +python-dateutil==2.9.0.post0 + # via arrow + # via jupyter-client + # via pandas +python-json-logger==2.0.7 + # via jupyter-events +pytz==2024.2 + # via pandas +pyyaml==6.0.2 + # via jupyter-events +pyzmq==26.2.0 + # via ipykernel + # via jupyter-client + # via jupyter-server +referencing==0.35.1 + # via jsonschema + # via jsonschema-specifications + # via jupyter-events +requests==2.32.3 + # via jupyterlab-server +rfc3339-validator==0.1.4 + # via jsonschema + # via jupyter-events +rfc3986-validator==0.1.1 + # via jsonschema + # via jupyter-events +rpds-py==0.20.1 + # via jsonschema + # via referencing +send2trash==1.8.3 + # via jupyter-server +setuptools==75.3.0 + # via jupyterlab six==1.16.0 # via asttokens + # via python-dateutil + # via rfc3339-validator +sniffio==1.3.1 + # via anyio + # via httpx +soupsieve==2.6 + # via beautifulsoup4 stack-data==0.6.3 # via ipython +terminado==0.18.1 + # via jupyter-server + # via jupyter-server-terminals +tinycss2==1.4.0 + # via nbconvert +tornado==6.4.1 + # via ipykernel + # via jupyter-client + # via jupyter-server + # via jupyterlab + # via terminado traitlets==5.14.3 + # via comm + # via ipykernel # via ipython + # via jupyter-client + # via jupyter-core + # via jupyter-events + # via jupyter-server + # via jupyterlab # via matplotlib-inline + # via nbclient + # via nbconvert + # via nbformat +types-python-dateutil==2.9.0.20241003 + # via arrow typing-extensions==4.12.2 # via datafusion +tzdata==2024.2 + # via pandas +uri-template==1.3.0 + # via jsonschema +urllib3==2.2.3 + # via requests wcwidth==0.2.13 # via prompt-toolkit +webcolors==24.8.0 + # via jsonschema +webencodings==0.5.1 + # via bleach + # via tinycss2 +websocket-client==1.8.0 + # via jupyter-server From 0a84261f8cd1f72343fd6fd6a15fe971905ef43f Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 10:50:03 -0800 Subject: [PATCH 02/10] Fixing the streaming join example (#54) * Fixing the streaming join example * format * add drop_columns * update python internal package name --------- Co-authored-by: Matt Green --- crates/core/src/datastream.rs | 9 ++++ examples/examples/stream_join.rs | 41 ++++++++++++------- py-denormalized/pyproject.toml | 2 +- .../python/denormalized/context.py | 3 +- .../python/denormalized/data_stream.py | 2 +- .../denormalized/datafusion/__init__.py | 2 +- .../python/denormalized/datafusion/catalog.py | 2 +- .../python/denormalized/datafusion/common.py | 2 +- .../python/denormalized/datafusion/context.py | 12 +++--- .../denormalized/datafusion/dataframe.py | 4 +- .../python/denormalized/datafusion/expr.py | 6 +-- .../denormalized/datafusion/functions.py | 2 +- .../denormalized/datafusion/object_store.py | 2 +- .../denormalized/datafusion/record_batch.py | 2 +- .../python/denormalized/datafusion/udf.py | 2 +- .../python/denormalized/feast_data_stream.py | 2 +- py-denormalized/python/denormalized/utils.py | 2 +- py-denormalized/src/lib.rs | 2 +- 18 files changed, 59 insertions(+), 40 deletions(-) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 8b83ec4..42a8f4f 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -113,6 +113,15 @@ impl DataStream { }) } + pub fn drop_columns(self, columns: &[&str]) -> Result { + Ok(Self { + df: Arc::new(self.df.as_ref().clone().drop_columns(columns)?), + context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), + }) + } + // Join two streams using the specified expression pub fn join_on( self, diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index ac7dae4..5c6c672 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,7 +17,10 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()?; + let ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) + .await; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder @@ -29,7 +32,7 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("temperature")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?; @@ -40,30 +43,38 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("humidity")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?, ) - .await?; + .await? + .with_column("humidity_sensor", col("sensor_name"))? + .drop_columns(&["sensor_name"])? + .window( + vec![col("humidity_sensor")], + vec![avg(col("reading")).alias("avg_humidity")], + Duration::from_millis(1_000), + None, + )? + .with_column("humidity_window_start_time", col("window_start_time"))? + .with_column("humidity_window_end_time", col("window_end_time"))? + .drop_columns(&["window_start_time", "window_end_time"])?; let joined_ds = ctx .from_topic(temperature_topic) .await? + .window( + vec![col("sensor_name")], + vec![avg(col("reading")).alias("avg_temperature")], + Duration::from_millis(1_000), + None, + )? .join( humidity_ds, JoinType::Inner, - &["sensor_name"], - &["sensor_name"], - None, - )? - .window( - vec![], - vec![ - avg(col("temperature.reading")).alias("avg_temperature"), - avg(col("humidity.reading")).alias("avg_humidity"), - ], - Duration::from_millis(1_000), + &["sensor_name", "window_start_time"], + &["humidity_sensor", "humidity_window_start_time"], None, )?; diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index dc49053..812718c 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -20,7 +20,7 @@ feast = ["feast"] [tool.maturin] python-source = "python" features = ["pyo3/extension-module"] -module-name = "denormalized._internal" +module-name = "denormalized._d_internal" [tool.rye] dev-dependencies = [ diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 462ce21..1b3c5df 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -1,4 +1,4 @@ -from denormalized._internal import PyContext +from denormalized._d_internal import PyContext from .data_stream import DataStream class Context: @@ -20,4 +20,3 @@ def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> Da ds = DataStream(py_ds) return ds - diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index 418c520..a2a5805 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -1,5 +1,5 @@ import pyarrow as pa -from denormalized._internal import PyDataStream +from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from denormalized.utils import to_internal_expr, to_internal_exprs diff --git a/py-denormalized/python/denormalized/datafusion/__init__.py b/py-denormalized/python/denormalized/datafusion/__init__.py index 7419ad7..715a94d 100644 --- a/py-denormalized/python/denormalized/datafusion/__init__.py +++ b/py-denormalized/python/denormalized/datafusion/__init__.py @@ -36,7 +36,7 @@ from .catalog import Catalog, Database, Table # The following imports are okay to remain as opaque to the user. -from denormalized._internal import Config, LogicalPlan, ExecutionPlan, runtime +from denormalized._d_internal import Config, LogicalPlan, ExecutionPlan, runtime from .record_batch import RecordBatchStream, RecordBatch diff --git a/py-denormalized/python/denormalized/datafusion/catalog.py b/py-denormalized/python/denormalized/datafusion/catalog.py index d8c9092..faa5058 100644 --- a/py-denormalized/python/denormalized/datafusion/catalog.py +++ b/py-denormalized/python/denormalized/datafusion/catalog.py @@ -19,7 +19,7 @@ from __future__ import annotations -import denormalized._internal as df_internal +import denormalized._d_internal as df_internal from typing import TYPE_CHECKING diff --git a/py-denormalized/python/denormalized/datafusion/common.py b/py-denormalized/python/denormalized/datafusion/common.py index 73ed7c4..082b979 100644 --- a/py-denormalized/python/denormalized/datafusion/common.py +++ b/py-denormalized/python/denormalized/datafusion/common.py @@ -16,7 +16,7 @@ # under the License. """Common data types used throughout the DataFusion project.""" -from denormalized._internal import common as common_internal +from denormalized._d_internal import common as common_internal from enum import Enum # TODO these should all have proper wrapper classes diff --git a/py-denormalized/python/denormalized/datafusion/context.py b/py-denormalized/python/denormalized/datafusion/context.py index 19c0760..f3eed51 100644 --- a/py-denormalized/python/denormalized/datafusion/context.py +++ b/py-denormalized/python/denormalized/datafusion/context.py @@ -19,13 +19,13 @@ from __future__ import annotations -from denormalized._internal import SessionConfig as SessionConfigInternal -from denormalized._internal import RuntimeConfig as RuntimeConfigInternal -from denormalized._internal import SQLOptions as SQLOptionsInternal -from denormalized._internal import SessionContext as SessionContextInternal -from denormalized._internal import LogicalPlan, ExecutionPlan +from denormalized._d_internal import SessionConfig as SessionConfigInternal +from denormalized._d_internal import RuntimeConfig as RuntimeConfigInternal +from denormalized._d_internal import SQLOptions as SQLOptionsInternal +from denormalized._d_internal import SessionContext as SessionContextInternal +from denormalized._d_internal import LogicalPlan, ExecutionPlan -from denormalized._internal import AggregateUDF +from denormalized._d_internal import AggregateUDF from denormalized.datafusion.catalog import Catalog, Table from denormalized.datafusion.dataframe import DataFrame from denormalized.datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list diff --git a/py-denormalized/python/denormalized/datafusion/dataframe.py b/py-denormalized/python/denormalized/datafusion/dataframe.py index 4a50545..63b41e2 100644 --- a/py-denormalized/python/denormalized/datafusion/dataframe.py +++ b/py-denormalized/python/denormalized/datafusion/dataframe.py @@ -32,9 +32,9 @@ import pathlib from typing import Callable -from denormalized._internal import DataFrame as DataFrameInternal +from denormalized._d_internal import DataFrame as DataFrameInternal from denormalized.datafusion.expr import Expr, SortExpr, sort_or_default -from denormalized._internal import ( +from denormalized._d_internal import ( LogicalPlan, ExecutionPlan, ) diff --git a/py-denormalized/python/denormalized/datafusion/expr.py b/py-denormalized/python/denormalized/datafusion/expr.py index a858a66..69f2505 100644 --- a/py-denormalized/python/denormalized/datafusion/expr.py +++ b/py-denormalized/python/denormalized/datafusion/expr.py @@ -28,9 +28,9 @@ from denormalized.datafusion.common import DataTypeMap, NullTreatment, RexType from typing_extensions import deprecated -from denormalized._internal import LogicalPlan -from denormalized._internal import expr as expr_internal -from denormalized._internal import functions as functions_internal +from denormalized._d_internal import LogicalPlan +from denormalized._d_internal import expr as expr_internal +from denormalized._d_internal import functions as functions_internal # The following are imported from the internal representation. We may choose to # give these all proper wrappers, or to simply leave as is. These were added diff --git a/py-denormalized/python/denormalized/datafusion/functions.py b/py-denormalized/python/denormalized/datafusion/functions.py index 291c578..d564672 100644 --- a/py-denormalized/python/denormalized/datafusion/functions.py +++ b/py-denormalized/python/denormalized/datafusion/functions.py @@ -18,7 +18,7 @@ from __future__ import annotations -from denormalized._internal import functions as f +from denormalized._d_internal import functions as f from denormalized.datafusion.expr import ( CaseBuilder, Expr, diff --git a/py-denormalized/python/denormalized/datafusion/object_store.py b/py-denormalized/python/denormalized/datafusion/object_store.py index 3a3371e..54610c0 100644 --- a/py-denormalized/python/denormalized/datafusion/object_store.py +++ b/py-denormalized/python/denormalized/datafusion/object_store.py @@ -16,7 +16,7 @@ # under the License. """Object store functionality.""" -from denormalized._internal import object_store +from denormalized._d_internal import object_store AmazonS3 = object_store.AmazonS3 GoogleCloud = object_store.GoogleCloud diff --git a/py-denormalized/python/denormalized/datafusion/record_batch.py b/py-denormalized/python/denormalized/datafusion/record_batch.py index e0e436e..7f7b7ef 100644 --- a/py-denormalized/python/denormalized/datafusion/record_batch.py +++ b/py-denormalized/python/denormalized/datafusion/record_batch.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: import pyarrow - import denormalized._internal as df_internal + import denormalized._d_internal as df_internal import typing_extensions diff --git a/py-denormalized/python/denormalized/datafusion/udf.py b/py-denormalized/python/denormalized/datafusion/udf.py index c1d45f9..c7f4d26 100644 --- a/py-denormalized/python/denormalized/datafusion/udf.py +++ b/py-denormalized/python/denormalized/datafusion/udf.py @@ -19,7 +19,7 @@ from __future__ import annotations -import denormalized._internal as df_internal +import denormalized._d_internal as df_internal from datafusion.expr import Expr from typing import Callable, TYPE_CHECKING, TypeVar from abc import ABCMeta, abstractmethod diff --git a/py-denormalized/python/denormalized/feast_data_stream.py b/py-denormalized/python/denormalized/feast_data_stream.py index 252c2bb..e289b8c 100644 --- a/py-denormalized/python/denormalized/feast_data_stream.py +++ b/py-denormalized/python/denormalized/feast_data_stream.py @@ -2,7 +2,7 @@ from typing import Any, TypeVar, Union, cast, get_type_hints import pyarrow as pa -from denormalized._internal import PyDataStream +from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from feast import FeatureStore, Field from feast.data_source import PushMode diff --git a/py-denormalized/python/denormalized/utils.py b/py-denormalized/python/denormalized/utils.py index 13a5dbf..adb8e10 100644 --- a/py-denormalized/python/denormalized/utils.py +++ b/py-denormalized/python/denormalized/utils.py @@ -1,4 +1,4 @@ -from denormalized._internal import expr as internal_exprs +from denormalized._d_internal import expr as internal_exprs from denormalized.datafusion import Expr diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index ddb4734..230171b 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -11,7 +11,7 @@ pub mod utils; pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. -#[pymodule(name="_internal")] +#[pymodule(name = "_d_internal")] fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; From 26695f06242311af238ca7f6d8c945e33bd0c63c Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 7 Nov 2024 13:39:11 -0800 Subject: [PATCH 03/10] Implement stream join in python (#51) * Implement stream join in python * update * fmt * clippy fmt * example works * fmt --- .../src/datasource/kafka/kafka_stream_read.rs | 2 +- .../continuous/grouped_window_agg_stream.rs | 6 +- examples/examples/simple_aggregation.rs | 2 +- py-denormalized/.gitignore | 3 + py-denormalized/pyproject.toml | 1 + .../python/denormalized/data_stream.py | 5 ++ .../python/examples/stream_join.py | 72 +++++++++++++++++++ py-denormalized/src/datastream.rs | 41 ++++++++--- 8 files changed, 117 insertions(+), 15 deletions(-) create mode 100644 py-denormalized/python/examples/stream_join.py diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 26dc488..9d484ee 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -253,7 +253,7 @@ impl PartitionStream for KafkaStreamRead { max_timestamp, offsets_read, }; - let _ = state_backend + state_backend .as_ref() .put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap()); debug!("checkpointed offsets {:?}", off); diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 4114584..da43430 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -168,7 +168,7 @@ impl GroupedWindowAggStream { context, epoch: 0, partition, - channel_tag: channel_tag, + channel_tag, receiver, state_backend, }; @@ -184,7 +184,7 @@ impl GroupedWindowAggStream { .collect(); let _ = stream.ensure_window_frames_for_ranges(&ranges); state.frames.iter().for_each(|f| { - let _ = stream.update_accumulators_for_frame(f.window_start_time, &f); + let _ = stream.update_accumulators_for_frame(f.window_start_time, f); }); let state_watermark = state.watermark.unwrap(); stream.process_watermark(RecordBatchWatermark { @@ -387,7 +387,7 @@ impl GroupedWindowAggStream { let watermark = { let watermark_lock = self.latest_watermark.lock().unwrap(); - watermark_lock.clone() + *watermark_lock }; let checkpointed_state = CheckpointedGroupedWindowAggStream { diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 2fe1306..d785d65 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { ])) .await?; - let _ctx = Context::new()? + Context::new()? .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) .await .from_topic(source_topic) diff --git a/py-denormalized/.gitignore b/py-denormalized/.gitignore index c8f0442..9d2a03d 100644 --- a/py-denormalized/.gitignore +++ b/py-denormalized/.gitignore @@ -70,3 +70,6 @@ docs/_build/ # Pyenv .python-version + +.ipynb_checkpoints/ +Untitled.ipynb diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 812718c..86cf198 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ [project.optional-dependencies] tests = ["pytest"] feast = ["feast"] +dev = [] [tool.maturin] python-source = "python" diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index a2a5805..ca28dfb 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -73,6 +73,11 @@ def with_column(self, name: str, predicate: Expr) -> "DataStream": DataStream: A new DataStream with the additional column. """ return DataStream(self.ds.with_column(name, to_internal_expr(predicate))) + + def drop_columns(self, columns: list[str]) -> "DataStream": + """Drops columns from the DataStream. + """ + return DataStream(self.ds.drop_columns(columns)) def join_on( self, right: "DataStream", join_type: str, on_exprs: list[Expr] diff --git a/py-denormalized/python/examples/stream_join.py b/py-denormalized/python/examples/stream_join.py new file mode 100644 index 0000000..1155a62 --- /dev/null +++ b/py-denormalized/python/examples/stream_join.py @@ -0,0 +1,72 @@ +"""stream_aggregate example.""" + +import json +import signal +import sys +import pprint as pp + +from denormalized import Context +from denormalized.datafusion import col, expr +from denormalized.datafusion import functions as f +from denormalized.datafusion import lit + + +def signal_handler(sig, frame): + print("You pressed Ctrl+C!") + sys.exit(0) + + +signal.signal(signal.SIGINT, signal_handler) + +bootstrap_server = "localhost:9092" + +sample_event = { + "occurred_at_ms": 100, + "sensor_name": "foo", + "reading": 0.0, +} + + +def print_batch(rb): + pp.pprint(rb.to_pydict()) + + +ctx = Context() +temperature_ds = ctx.from_topic( + "temperature", json.dumps(sample_event), bootstrap_server +) + +humidity_ds = ( + ctx.from_topic("humidity", json.dumps(sample_event), bootstrap_server) + .with_column("humidity_sensor", col("sensor_name")) + .drop_columns(["sensor_name"]) + .window( + [col("humidity_sensor")], + [ + f.count(col("reading")).alias("avg_humidity"), + ], + 4000, + None, + ) + .with_column("humidity_window_start_time", col("window_start_time")) + .with_column("humidity_window_end_time", col("window_end_time")) + .drop_columns(["window_start_time", "window_end_time"]) +) + +joined_ds = ( + temperature_ds.window( + [col("sensor_name")], + [ + f.avg(col("reading")).alias("avg_temperature"), + ], + 4000, + None, + ) + .join( + humidity_ds, + "inner", + ["sensor_name", "window_start_time"], + ["humidity_sensor", "humidity_window_start_time"], + ) + .sink(print_batch) +) diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index da9de9d..c3b712f 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -9,13 +9,14 @@ use tokio::task::JoinHandle; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::pyarrow::ToPyArrow; +use datafusion::common::JoinType; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion_python::expr::{join::PyJoinType, PyExpr}; use denormalized::datastream::DataStream; -use crate::errors::{py_denormalized_err, Result}; +use crate::errors::{py_denormalized_err, DenormalizedError, Result}; use crate::utils::{get_tokio_runtime, python_print, wait_for_future}; #[pyclass(name = "PyDataStream", module = "denormalized", subclass)] @@ -86,6 +87,13 @@ impl PyDataStream { Ok(Self::new(ds)) } + pub fn drop_columns(&self, columns: Vec) -> Result { + let columns_ref: Vec<&str> = columns.iter().map(|s| s.as_str()).collect(); + + let ds = self.ds.as_ref().clone().drop_columns(&columns_ref)?; + Ok(Self::new(ds)) + } + pub fn join_on( &self, _right: PyDataStream, @@ -95,29 +103,42 @@ impl PyDataStream { todo!() } - #[pyo3(signature = (right, join_type, left_cols, right_cols, filter=None))] + #[pyo3(signature = (right, how, left_cols, right_cols, filter=None))] pub fn join( &self, right: PyDataStream, - join_type: PyJoinType, + how: &str, left_cols: Vec, right_cols: Vec, filter: Option, ) -> PyResult { let right_ds = right.ds.as_ref().clone(); + let join_type = match how { + "inner" => JoinType::Inner, + "left" => JoinType::Left, + "right" => JoinType::Right, + "full" => JoinType::Full, + "semi" => JoinType::LeftSemi, + "anti" => JoinType::LeftAnti, + how => { + return Err(DenormalizedError::Common(format!( + "The join type {how} does not exist or is not implemented" + )) + .into()); + } + }; + let filter = filter.map(|f| f.into()); let left_cols = left_cols.iter().map(|s| s.as_ref()).collect::>(); let right_cols = right_cols.iter().map(|s| s.as_ref()).collect::>(); - let ds = self.ds.as_ref().clone().join( - right_ds, - join_type.into(), - &left_cols, - &right_cols, - filter, - )?; + let ds = + self.ds + .as_ref() + .clone() + .join(right_ds, join_type, &left_cols, &right_cols, filter)?; Ok(Self::new(ds)) } From b6d7a6b51667556c28d15b46c4381dd1c9a34fe0 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 13:57:09 -0800 Subject: [PATCH 04/10] Adding config option for checkpointing (#50) * Adding config option for checkpointing * Add maturin build step for ci (#52) * fix: correct python module name * Fixing the streaming join example (#54) * Fixing the streaming join example * format * add drop_columns * update python internal package name --------- Co-authored-by: Matt Green * merge with main * Adding config option for checkpointing * merge with main * Cargo fmt --------- Co-authored-by: Matt Green --- README.md | 7 ++-- crates/core/src/context.rs | 24 +++++++---- .../src/datasource/kafka/kafka_stream_read.rs | 27 +++++++----- crates/core/src/datastream.rs | 19 ++++++--- .../continuous/grouped_window_agg_stream.rs | 42 ++++++++++++------- .../continuous/streaming_window.rs | 25 +++++++---- crates/orchestrator/src/orchestrator.rs | 2 - examples/examples/simple_aggregation.rs | 9 ++-- examples/examples/stream_join.rs | 13 +++--- 9 files changed, 108 insertions(+), 60 deletions(-) diff --git a/README.md b/README.md index eaf2b0d..3ba7348 100644 --- a/README.md +++ b/README.md @@ -96,11 +96,12 @@ Details about developing the python bindings can be found in [py-denormalized/RE ### Checkpointing -We use SlateDB for state backend. Initialize your Job Context to a path to local directory - +We use SlateDB for state backend. Initialize your Job Context with a custom config and a path for SlateDB backend to store state - ``` - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + let config = Context::default_config().set_bool("denormalized_config.checkpoint", true); + let ctx = Context::with_config(config)? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg/job1")) .await; ``` diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 3419720..f3b078d 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -6,6 +6,7 @@ use datafusion::execution::{ session_state::SessionStateBuilder, }; +use crate::config_extensions::denormalized_config::DenormalizedConfig; use crate::datasource::kafka::TopicReader; use crate::datastream::DataStream; use crate::physical_optimizer::EnsureHashPartititionOnGroupByForStreamingAggregates; @@ -17,12 +18,13 @@ use denormalized_common::error::{DenormalizedError, Result}; #[derive(Clone)] pub struct Context { - pub session_conext: Arc, + pub session_context: Arc, } impl Context { - pub fn new() -> Result { - let config = SessionConfig::new() + pub fn default_config() -> SessionConfig { + let ext_config = DenormalizedConfig::default(); + let mut config = SessionConfig::new() .set( "datafusion.execution.batch_size", &datafusion::common::ScalarValue::UInt64(Some(32)), @@ -34,8 +36,16 @@ impl Context { &datafusion::common::ScalarValue::Boolean(Some(false)), ); - let runtime = Arc::new(RuntimeEnv::default()); + let _ = config.options_mut().extensions.insert(ext_config); + config + } + pub fn new() -> Result { + Context::with_config(Context::default_config()) + } + + pub fn with_config(config: SessionConfig) -> Result { + let runtime = Arc::new(RuntimeEnv::default()); let state = SessionStateBuilder::new() .with_default_features() .with_config(config) @@ -48,7 +58,7 @@ impl Context { .build(); Ok(Self { - session_conext: Arc::new(SessionContext::new_with_state(state)), + session_context: Arc::new(SessionContext::new_with_state(state)), }) } @@ -56,7 +66,7 @@ impl Context { let topic_name = topic.0.topic.clone(); self.register_table(topic_name.clone(), Arc::new(topic)) .await?; - let df = self.session_conext.table(topic_name.as_str()).await?; + let df = self.session_context.table(topic_name.as_str()).await?; let ds = DataStream::new(Arc::new(df), Arc::new(self.clone())); Ok(ds) } @@ -66,7 +76,7 @@ impl Context { name: String, table: Arc, ) -> Result<(), DenormalizedError> { - self.session_conext + self.session_context .register_table(name.as_str(), table.clone())?; Ok(()) diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 9d484ee..caed655 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -7,7 +7,7 @@ use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, Str use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; use crossbeam::channel; use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver}; -use denormalized_orchestrator::orchestrator::{self, OrchestrationMessage}; +use denormalized_orchestrator::orchestrator::OrchestrationMessage; use futures::executor::block_on; use log::{debug, error}; use serde::{Deserialize, Serialize}; @@ -83,13 +83,13 @@ impl PartitionStream for KafkaStreamRead { } fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { - let _config_options = ctx + let config_options = ctx .session_config() .options() .extensions .get::(); - let mut should_checkpoint = false; //config_options.map_or(false, |c| c.checkpoint); + let should_checkpoint = config_options.map_or(false, |c| c.checkpoint); let node_id = self.exec_node_id.unwrap(); let partition_tag = self @@ -101,13 +101,16 @@ impl PartitionStream for KafkaStreamRead { let channel_tag = format!("{}_{}", node_id, partition_tag); let mut serialized_state: Option> = None; - let state_backend = get_global_slatedb().unwrap(); + let mut state_backend = None; let mut starting_offsets: HashMap = HashMap::new(); - if orchestrator::SHOULD_CHECKPOINT { + + if should_checkpoint { create_channel(channel_tag.as_str(), 10); + let backend = get_global_slatedb().unwrap(); debug!("checking for last checkpointed offsets"); - serialized_state = block_on(state_backend.clone().get(channel_tag.as_bytes().to_vec())); + serialized_state = block_on(backend.get(channel_tag.as_bytes().to_vec())); + state_backend = Some(backend); } if let Some(serialized_state) = serialized_state { @@ -151,25 +154,26 @@ impl PartitionStream for KafkaStreamRead { builder.spawn(async move { let mut epoch = 0; let mut receiver: Option> = None; - if orchestrator::SHOULD_CHECKPOINT { + if should_checkpoint { let orchestrator_sender = get_sender("orchestrator"); let msg: OrchestrationMessage = OrchestrationMessage::RegisterStream(channel_tag.clone()); orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); receiver = take_receiver(channel_tag.as_str()); } + let mut checkpoint_batch = false; loop { //let mut checkpoint_barrier: Option = None; let mut _checkpoint_barrier: Option = None; - if orchestrator::SHOULD_CHECKPOINT { + if should_checkpoint { let r = receiver.as_ref().unwrap(); for message in r.try_iter() { debug!("received checkpoint barrier for {:?}", message); if let OrchestrationMessage::CheckpointBarrier(epoch_ts) = message { epoch = epoch_ts; - should_checkpoint = true; + checkpoint_batch = true; } } } @@ -245,7 +249,7 @@ impl PartitionStream for KafkaStreamRead { let tx_result = tx.send(Ok(timestamped_record_batch)).await; match tx_result { Ok(_) => { - if should_checkpoint { + if checkpoint_batch { debug!("about to checkpoint offsets"); let off = BatchReadMetadata { epoch, @@ -255,9 +259,10 @@ impl PartitionStream for KafkaStreamRead { }; state_backend .as_ref() + .unwrap() .put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap()); debug!("checkpointed offsets {:?}", off); - should_checkpoint = false; + checkpoint_batch = false; } } Err(err) => error!("result err {:?}. shutdown signal detected.", err), diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 42a8f4f..ea86b72 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,7 +1,6 @@ use datafusion::common::runtime::SpawnedTask; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::ExecutionPlanProperties; -use denormalized_orchestrator::orchestrator; use futures::StreamExt; use log::debug; use log::info; @@ -18,6 +17,7 @@ use datafusion::logical_expr::{ }; use datafusion::physical_plan::display::DisplayableExecutionPlan; +use crate::config_extensions::denormalized_config::DenormalizedConfig; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use crate::logical_plan::StreamingLogicalPlanBuilder; @@ -240,7 +240,12 @@ impl DataStream { let mut maybe_orchestrator_handle = None; - if orchestrator::SHOULD_CHECKPOINT { + let config = self.context.session_context.copied_config(); + let config_options = config.options().extensions.get::(); + + let should_checkpoint = config_options.map_or(false, |c| c.checkpoint); + + if should_checkpoint { let mut orchestrator = Orchestrator::default(); let cloned_shutdown_rx = self.shutdown_rx.clone(); let orchestrator_handle = @@ -286,10 +291,12 @@ impl DataStream { log::info!("Stream processing stopped. Cleaning up..."); - let state_backend = get_global_slatedb(); - if let Ok(db) = state_backend { - log::info!("Closing the state backend (slatedb)..."); - db.close().await.unwrap(); + if should_checkpoint { + let state_backend = get_global_slatedb(); + if let Ok(db) = state_backend { + log::info!("Closing the state backend (slatedb)..."); + db.close().await.unwrap(); + } } // Join the orchestrator handle if it exists, ensuring it is joined and awaited diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index da43430..0388a23 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -37,14 +37,14 @@ use datafusion::{ }; use denormalized_orchestrator::{ - channel_manager::take_receiver, - orchestrator::{self, OrchestrationMessage}, + channel_manager::take_receiver, orchestrator::OrchestrationMessage, }; use futures::{executor::block_on, Stream, StreamExt}; use log::debug; use serde::{Deserialize, Serialize}; use crate::{ + config_extensions::denormalized_config::DenormalizedConfig, physical_plan::utils::time::RecordBatchWatermark, state_backend::slatedb::{get_global_slatedb, SlateDBWrapper}, utils::serialization::ArrayContainer, @@ -73,11 +73,11 @@ pub struct GroupedWindowAggStream { group_by: PhysicalGroupBy, group_schema: Arc, context: Arc, - epoch: i64, + checkpoint: bool, partition: usize, channel_tag: String, receiver: Option>, - state_backend: Arc, + state_backend: Option>, } #[derive(Serialize, Deserialize)] @@ -147,11 +147,23 @@ impl GroupedWindowAggStream { .and_then(|tag| take_receiver(tag.as_str())); let channel_tag: String = channel_tag.unwrap_or(String::from("")); - let state_backend = get_global_slatedb().unwrap(); - let serialized_state = block_on(state_backend.get(channel_tag.as_bytes().to_vec())); + let config_options = context + .session_config() + .options() + .extensions + .get::(); + + let checkpoint = config_options.map_or(false, |c| c.checkpoint); + + let mut serialized_state: Option> = None; + let mut state_backend = None; + if checkpoint { + let backend = get_global_slatedb().unwrap(); + serialized_state = block_on(backend.get(channel_tag.as_bytes().to_vec())); + state_backend = Some(backend); + } - //let window_frames: BTreeMap = BTreeMap::new(); let mut stream = Self { schema: agg_schema, input, @@ -166,7 +178,7 @@ impl GroupedWindowAggStream { group_by, group_schema, context, - epoch: 0, + checkpoint, partition, channel_tag, receiver, @@ -340,19 +352,19 @@ impl GroupedWindowAggStream { return Poll::Pending; } }; - self.epoch += 1; - if orchestrator::SHOULD_CHECKPOINT { + let mut checkpoint_batch = false; + + if self.checkpoint { let r = self.receiver.as_ref().unwrap(); - let mut epoch: u128 = 0; for message in r.try_iter() { debug!("received checkpoint barrier for {:?}", message); - if let OrchestrationMessage::CheckpointBarrier(epoch_ts) = message { - epoch = epoch_ts; + if let OrchestrationMessage::CheckpointBarrier(_epoch_ts) = message { + checkpoint_batch = true; } } - if epoch != 0 { + if checkpoint_batch { // Prepare data for checkpointing // Clone or extract necessary data @@ -400,7 +412,7 @@ impl GroupedWindowAggStream { let key = self.channel_tag.as_bytes().to_vec(); // Clone or use `Arc` for `state_backend` - let state_backend = self.state_backend.clone(); + let state_backend = self.state_backend.clone().unwrap(); state_backend.put(key, serialized_checkpoint); } diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index e70261b..c6ea378 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -40,16 +40,19 @@ use datafusion::{ }; use denormalized_orchestrator::{ channel_manager::{create_channel, get_sender}, - orchestrator::{self, OrchestrationMessage}, + orchestrator::OrchestrationMessage, }; use futures::{Stream, StreamExt}; use tracing::debug; -use crate::physical_plan::{ - continuous::grouped_window_agg_stream::GroupedWindowAggStream, - utils::{ - accumulators::{create_accumulators, AccumulatorItem}, - time::{system_time_from_epoch, RecordBatchWatermark}, +use crate::{ + config_extensions::denormalized_config::DenormalizedConfig, + physical_plan::{ + continuous::grouped_window_agg_stream::GroupedWindowAggStream, + utils::{ + accumulators::{create_accumulators, AccumulatorItem}, + time::{system_time_from_epoch, RecordBatchWatermark}, + }, }, }; @@ -427,7 +430,15 @@ impl ExecutionPlan for StreamingWindowExec { .node_id() .expect("expected node id to be set."); - let channel_tag = if orchestrator::SHOULD_CHECKPOINT { + let config_options = context + .session_config() + .options() + .extensions + .get::(); + + let checkpoint = config_options.map_or(false, |c| c.checkpoint); + + let channel_tag = if checkpoint { let tag = format!("{}_{}", node_id, partition); create_channel(tag.as_str(), 10); let orchestrator_sender = get_sender("orchestrator"); diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index 6acfeed..ae9c1f0 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -20,8 +20,6 @@ pub struct Orchestrator { senders: HashMap>, } -pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG - /** * 1. Keep track of checkpoint per source. * 2. Tell each downstream which checkpoints it needs to know. diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index d785d65..9341529 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -18,6 +18,9 @@ async fn main() -> Result<()> { .init(); let bootstrap_servers = String::from("localhost:9092"); + + let config = Context::default_config().set_bool("denormalized_config.checkpoint", false); + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -32,9 +35,9 @@ async fn main() -> Result<()> { ])) .await?; - Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) - .await + let _ctx = Context::with_config(config)? + //.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + //.await .from_topic(source_topic) .await? .window( diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index 5c6c672..2edc25e 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,10 +17,7 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) - .await; - + let ctx = Context::new()?; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder @@ -73,8 +70,12 @@ async fn main() -> Result<()> { .join( humidity_ds, JoinType::Inner, - &["sensor_name", "window_start_time"], - &["humidity_sensor", "humidity_window_start_time"], + &["sensor_name", "window_start_time", "window_end_time"], + &[ + "humidity_sensor", + "humidity_window_start_time", + "humidity_window_end_time", + ], None, )?; From 3df33e2964b6e524a36e558f241a0a78020c0b80 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Mon, 11 Nov 2024 13:14:59 -0800 Subject: [PATCH 05/10] Rm hardcoded checkpoint (#55) --- py-denormalized/src/context.rs | 7 ++----- py-denormalized/src/utils.rs | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index 7e2372f..e6e6ffa 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -53,11 +53,8 @@ impl PyContext { #[new] pub fn new(py: Python) -> PyResult { let rt = &get_tokio_runtime(py).0; - let fut: JoinHandle> = rt.spawn(async move { - Ok(Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) - .await) - }); + let fut: JoinHandle> = + rt.spawn(async move { Ok(Context::new()?) }); let context = wait_for_future(py, fut).map_err(py_denormalized_err)??; diff --git a/py-denormalized/src/utils.rs b/py-denormalized/src/utils.rs index a7f41f6..2fc9732 100644 --- a/py-denormalized/src/utils.rs +++ b/py-denormalized/src/utils.rs @@ -6,7 +6,7 @@ use tokio::runtime::Runtime; /// Utility to get the Tokio Runtime from Python pub(crate) fn get_tokio_runtime(py: Python) -> PyRef { - let datafusion = py.import_bound("denormalized._internal").unwrap(); + let datafusion = py.import_bound("denormalized._d_internal").unwrap(); let tmp = datafusion.getattr("runtime").unwrap(); match tmp.extract::>() { Ok(runtime) => runtime, From d292927982237d1f9f00eba19ef54a5cae305c21 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Tue, 12 Nov 2024 16:16:08 -0800 Subject: [PATCH 06/10] Add dockerfile to run emit_measurements and kafka (#56) * Add dockerfile to run emit_measurements and kafka --- .dockerignore | 6 ++ Cargo.lock | 1 + Cargo.toml | 1 + Dockerfile | 100 ++++++++++++++++++ README.md | 5 +- examples/Cargo.toml | 1 + examples/examples/emit_measurements.rs | 6 +- py-denormalized/README.md | 13 +++ py-denormalized/pyproject.toml | 5 + .../python/examples/stream_aggregate.py | 8 +- .../python/examples/stream_join.py | 11 +- py-denormalized/requirements-dev.lock | 1 + 12 files changed, 144 insertions(+), 14 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4ff6a5b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +target/ +.git/ +.gitignore +Dockerfile +.dockerignore +**/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock index ab6bf26..9e042d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1599,6 +1599,7 @@ dependencies = [ name = "denormalized-examples" version = "0.0.1" dependencies = [ + "anyhow", "arrow", "arrow-array", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index f3900f1..b263184 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ license = "Apache-2.0" readme = "README.md" repository = "https://github.com/probably-nothing-labs/denormalized.git" version = "0.0.1" +rust-version = "1.81.0" description = "Embeddable stream processing engine" [workspace.dependencies] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c1ecd5d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,100 @@ +# This file builds an image that runs Kakfa and the emit_measurments.rs scripts for generating fake data +# +# docker build -t emgeee/kafka_emit_measurements:latest . +# + +# Stage 1: Build the Rust application +FROM rust:1.81.0-slim-bookworm AS builder +WORKDIR /usr/src/app + +# Install build dependencies and zig +RUN apt-get update && apt-get install -y \ + cmake \ + g++ \ + libssl-dev \ + pkg-config \ + wget \ + xz-utils \ + && wget https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz \ + && tar -xf zig-linux-x86_64-0.13.0.tar.xz \ + && mv zig-linux-x86_64-0.13.0 /usr/local/zig \ + && rm zig-linux-x86_64-0.13.0.tar.xz \ + && rm -rf /var/lib/apt/lists/* + +# Add zig to PATH +ENV PATH="/usr/local/zig/:$PATH" + +# Install cargo-zigbuild +RUN cargo install --locked cargo-zigbuild && \ + rustup target add x86_64-unknown-linux-musl + +# Copy and build +COPY . . +RUN cargo zigbuild --target x86_64-unknown-linux-musl --release --example emit_measurements && \ + cp target/x86_64-unknown-linux-musl/release/examples/emit_measurements /tmp/ && \ + rm -rf /usr/src/app/* + +# Stage 2: Final image with Kafka and Rust binary +FROM confluentinc/cp-kafka:7.5.1 +USER root + +# Install minimal dependencies +RUN yum update -y && \ + yum install -y openssl-devel && \ + yum clean all && \ + rm -rf /var/cache/yum + +# Copy only the binary from builder stage +COPY --from=builder /tmp/emit_measurements /usr/local/bin/ + +# Create startup script +COPY </dev/null; do + echo "Waiting for Kafka to be ready..." + sleep 5 +done + +# Create topics with 1GB retention +echo "Creating temperature topic..." +kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic temperature --partitions 1 --replication-factor 1 --config retention.bytes=1073741824 + +echo "Creating humidity topic..." +kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic humidity --partitions 1 --replication-factor 1 --config retention.bytes=1073741824 + +# Start the Rust application +/usr/local/bin/emit_measurements +EOF + +RUN chmod +x /startup.sh + +# Kafka configuration +ENV KAFKA_NODE_ID=1 \ + KAFKA_PROCESS_ROLES=broker,controller \ + KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ + KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ + KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ + KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ + KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + +# Expose Kafka port +EXPOSE 9092 + +CMD ["/startup.sh"] diff --git a/README.md b/README.md index 3ba7348..55ad026 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,8 @@ Details about developing the python bindings can be found in [py-denormalized/RE ### Running an example -1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka` -2. Start emitting some sample data: `cargo run --example emit_measurements` -3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` +1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` ### Checkpointing diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 2c1a95d..1363491 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -12,6 +12,7 @@ datafusion = { workspace = true } arrow = { workspace = true, features = ["prettyprint"] } arrow-schema = { workspace = true } arrow-array = { workspace = true } +anyhow = "1.0.86" tracing = { workspace = true } futures = { workspace = true } tracing-log = { workspace = true } diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 6285243..c68b044 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -1,11 +1,11 @@ use rand::seq::SliceRandom; -use rdkafka::producer::FutureProducer; +use std::result::Result; use std::time::{SystemTime, UNIX_EPOCH}; use rdkafka::config::ClientConfig; +use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; -use denormalized::prelude::*; use denormalized_examples::Measurment; /// This script emits test data to a kafka cluster @@ -14,7 +14,7 @@ use denormalized_examples::Measurment; /// Sample sensor data will then be emitted to two topics: `temperature` and `humidity` /// This data is read processed by other exmpales #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), anyhow::Error> { let mut tasks = tokio::task::JoinSet::new(); let producer: FutureProducer = ClientConfig::new() diff --git a/py-denormalized/README.md b/py-denormalized/README.md index 1c40bd6..1bc40a0 100644 --- a/py-denormalized/README.md +++ b/py-denormalized/README.md @@ -3,6 +3,19 @@ denormalized-python Python bindings for [denormalized](https://github.com/probably-nothing-labs/denormalized) +Denormalized is a single node stream processing engine written in Rust. This directory contains the bindings for building pipelines using python. + +## Getting Started + +1. Install denormalized `pip install denormalized` +2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example + +This script will connect to the kafka instance running in docker and aggregate the metrics in realtime. + +There are several other examples in the [examples/ folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs. + + ## Development Make sure you're in the `py-denormalized/` directory. diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 86cf198..208ab9e 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -25,9 +25,14 @@ module-name = "denormalized._d_internal" [tool.rye] dev-dependencies = [ + "pip>=24.2", + "ipython>=8.26.0", + "pytest>=8.3.2", + "maturin>=1.7.4", +, "pandas>=2.2.3", "jupyterlab>=4.3.0", ] diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index 99c769d..6e7d5fd 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -1,9 +1,12 @@ -"""stream_aggregate example.""" +"""stream_aggregate example. + +docker build -t emgeee/kafka_emit_measurements:latest . +""" import json +import pprint as pp import signal import sys -import pprint as pp from denormalized import Context from denormalized.datafusion import col @@ -12,7 +15,6 @@ def signal_handler(sig, frame): - print("You pressed Ctrl+C!") sys.exit(0) diff --git a/py-denormalized/python/examples/stream_join.py b/py-denormalized/python/examples/stream_join.py index 1155a62..10804aa 100644 --- a/py-denormalized/python/examples/stream_join.py +++ b/py-denormalized/python/examples/stream_join.py @@ -1,18 +1,19 @@ -"""stream_aggregate example.""" +"""stream_aggregate example. + +docker build -t emgeee/kafka_emit_measurements:latest . +""" import json +import pprint as pp import signal import sys -import pprint as pp from denormalized import Context -from denormalized.datafusion import col, expr +from denormalized.datafusion import col from denormalized.datafusion import functions as f -from denormalized.datafusion import lit def signal_handler(sig, frame): - print("You pressed Ctrl+C!") sys.exit(0) diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index d8d69b3..d56849e 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -142,6 +142,7 @@ nest-asyncio==1.6.0 # via ipykernel notebook-shim==0.2.4 # via jupyterlab +maturin==1.7.4 numpy==2.1.0 # via pandas # via pyarrow From 695f0861c29bc9678f99262321b36b365ffd30ff Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 12 Nov 2024 14:04:58 -0800 Subject: [PATCH 07/10] Add ability to specify timestamp_column on kafka stream (#57) --- README.md | 10 +++++++--- py-denormalized/README.md | 2 +- py-denormalized/pyproject.toml | 14 ++++---------- .../python/denormalized/context.py | 19 +++++++++++++++++-- .../python/denormalized/data_stream.py | 8 ++++---- .../python/examples/stream_aggregate.py | 9 +++++++-- .../python/examples/stream_join.py | 14 ++++++++++---- .../python/examples/udf_example.py | 13 +++++++++++-- py-denormalized/requirements-dev.lock | 4 +++- py-denormalized/requirements.lock | 2 +- py-denormalized/src/context.rs | 8 +++++--- 11 files changed, 70 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 55ad026..ce1626d 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,12 @@ sample_event = { def print_batch(rb): print(rb) -ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092") +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + "localhost:9092", + "occurred_at_ms", +) ds.window( [col("sensor_name")], @@ -90,7 +94,7 @@ Details about developing the python bindings can be found in [py-denormalized/RE ### Running an example -1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 --name emit_measuremetns emgeee/kafka_emit_measurements:latest` 2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` ### Checkpointing diff --git a/py-denormalized/README.md b/py-denormalized/README.md index 1bc40a0..886273a 100644 --- a/py-denormalized/README.md +++ b/py-denormalized/README.md @@ -8,7 +8,7 @@ Denormalized is a single node stream processing engine written in Rust. This dir ## Getting Started 1. Install denormalized `pip install denormalized` -2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest` 3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example This script will connect to the kafka instance running in docker and aggregate the metrics in realtime. diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 208ab9e..b76a343 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -5,13 +5,10 @@ build-backend = "maturin" [project] name = "denormalized" requires-python = ">=3.12" -classifiers = [ ] -dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml +classifiers = [] +dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml description = "" -dependencies = [ - "pyarrow>=17.0.0", - "datafusion>=40.1.0", -] +dependencies = ["pyarrow>=17.0.0", "datafusion>=40.1.0"] [project.optional-dependencies] tests = ["pytest"] @@ -25,14 +22,11 @@ module-name = "denormalized._d_internal" [tool.rye] dev-dependencies = [ - "pip>=24.2", - "ipython>=8.26.0", - "pytest>=8.3.2", "maturin>=1.7.4", -, + "pyarrow-stubs>=17.11", "pandas>=2.2.3", "jupyterlab>=4.3.0", ] diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 1b3c5df..e3f7c84 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -1,6 +1,8 @@ from denormalized._d_internal import PyContext + from .data_stream import DataStream + class Context: """Context.""" @@ -14,9 +16,22 @@ def __repr__(self): def __str__(self): return self.ctx.__str__() - def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> DataStream: + def from_topic( + self, + topic: str, + sample_json: str, + bootstrap_servers: str, + timestamp_column: str, + group_id: str = "default_group", + ) -> DataStream: """Create a new context from a topic.""" - py_ds = self.ctx.from_topic(topic, sample_json, bootstrap_servers) + py_ds = self.ctx.from_topic( + topic, + sample_json, + bootstrap_servers, + timestamp_column, + group_id, + ) ds = DataStream(py_ds) return ds diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index ca28dfb..e3790e0 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -1,9 +1,10 @@ +from typing import Callable + import pyarrow as pa from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from denormalized.utils import to_internal_expr, to_internal_exprs -from typing import Callable class DataStream: """Represents a stream of data that can be manipulated using various operations.""" @@ -73,10 +74,9 @@ def with_column(self, name: str, predicate: Expr) -> "DataStream": DataStream: A new DataStream with the additional column. """ return DataStream(self.ds.with_column(name, to_internal_expr(predicate))) - + def drop_columns(self, columns: list[str]) -> "DataStream": - """Drops columns from the DataStream. - """ + """Drops columns from the DataStream.""" return DataStream(self.ds.drop_columns(columns)) def join_on( diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index 6e7d5fd..6a88982 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -21,6 +21,7 @@ def signal_handler(sig, frame): signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" +timestamp_column = "occurred_at_ms" sample_event = { "occurred_at_ms": 100, @@ -33,8 +34,12 @@ def print_batch(rb): pp.pprint(rb.to_pydict()) -ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + bootstrap_server, + timestamp_column, +) ds.window( diff --git a/py-denormalized/python/examples/stream_join.py b/py-denormalized/python/examples/stream_join.py index 10804aa..70e76c2 100644 --- a/py-denormalized/python/examples/stream_join.py +++ b/py-denormalized/python/examples/stream_join.py @@ -1,6 +1,6 @@ """stream_aggregate example. -docker build -t emgeee/kafka_emit_measurements:latest . +docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest """ import json @@ -13,13 +13,14 @@ from denormalized.datafusion import functions as f -def signal_handler(sig, frame): +def signal_handler(_sig, _frame): sys.exit(0) signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" +timestamp_column = "occurred_at_ms" sample_event = { "occurred_at_ms": 100, @@ -34,11 +35,16 @@ def print_batch(rb): ctx = Context() temperature_ds = ctx.from_topic( - "temperature", json.dumps(sample_event), bootstrap_server + "temperature", json.dumps(sample_event), bootstrap_server, timestamp_column ) humidity_ds = ( - ctx.from_topic("humidity", json.dumps(sample_event), bootstrap_server) + ctx.from_topic( + "humidity", + json.dumps(sample_event), + bootstrap_server, + timestamp_column, + ) .with_column("humidity_sensor", col("sensor_name")) .drop_columns(["sensor_name"]) .window( diff --git a/py-denormalized/python/examples/udf_example.py b/py-denormalized/python/examples/udf_example.py index 4993cfe..a198705 100644 --- a/py-denormalized/python/examples/udf_example.py +++ b/py-denormalized/python/examples/udf_example.py @@ -15,9 +15,11 @@ def signal_handler(sig, frame): sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) bootstrap_server = "localhost:9092" +timestamp_column = "occurred_at_ms" sample_event = { "occurred_at_ms": 100, @@ -25,19 +27,26 @@ def signal_handler(sig, frame): "reading": 0.0, } + def gt(lhs: pa.Array, rhs: pa.Scalar) -> pa.Array: return pc.greater(lhs, rhs) + greater_than_udf = udf(gt, [pa.float64(), pa.float64()], pa.bool_(), "stable") + def print_batch(rb: pa.RecordBatch): if not len(rb): return print(rb) -ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) +ds = Context().from_topic( + "temperature", + json.dumps(sample_event), + bootstrap_server, + timestamp_column, +) ds.window( [col("sensor_name")], diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index d56849e..135f630 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -3,7 +3,7 @@ # # last locked with the following flags: # pre: false -# features: [] +# features: ["dev"] # all-features: false # with-sources: false # generate-hashes: false @@ -181,6 +181,8 @@ pure-eval==0.2.3 pyarrow==17.0.0 # via datafusion # via denormalized + # via pyarrow-stubs +pyarrow-stubs==17.11 pycparser==2.22 # via cffi pygments==2.18.0 diff --git a/py-denormalized/requirements.lock b/py-denormalized/requirements.lock index 5980bbc..216bf46 100644 --- a/py-denormalized/requirements.lock +++ b/py-denormalized/requirements.lock @@ -3,7 +3,7 @@ # # last locked with the following flags: # pre: false -# features: [] +# features: ["dev"] # all-features: false # with-sources: false # generate-hashes: false diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index e6e6ffa..cee7a88 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -73,10 +73,12 @@ impl PyContext { pub fn from_topic( &self, + py: Python, topic: String, sample_json: String, bootstrap_servers: String, - py: Python, + timestamp_column: String, + group_id: String, ) -> PyResult { let context = self.context.clone(); let rt = &get_tokio_runtime(py).0; @@ -85,13 +87,13 @@ impl PyContext { let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic = topic_builder - .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) + .with_timestamp(timestamp_column, TimestampUnit::Int64Millis) .with_encoding("json")? .with_topic(topic) .infer_schema_from_json(sample_json.as_str())? .build_reader(ConnectionOpts::from([ ("auto.offset.reset".to_string(), "latest".to_string()), - ("group.id".to_string(), "sample_pipeline".to_string()), + ("group.id".to_string(), group_id.to_string()), ])) .await?; From dc82a1055773c660addeacebd5c45a9c08488873 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Tue, 12 Nov 2024 16:17:43 -0800 Subject: [PATCH 08/10] Add example python udaf --- py-denormalized/requirements-dev.lock | 3 +++ 1 file changed, 3 insertions(+) diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index 135f630..2174ce9 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -181,10 +181,13 @@ pure-eval==0.2.3 pyarrow==17.0.0 # via datafusion # via denormalized +<<<<<<< HEAD # via pyarrow-stubs pyarrow-stubs==17.11 pycparser==2.22 # via cffi +======= +>>>>>>> a684603 (Add example python udaf) pygments==2.18.0 # via ipython # via nbconvert From 733e4c9ad4e3b465f4de698b9b94ce05819362a0 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Tue, 12 Nov 2024 16:14:32 -0800 Subject: [PATCH 09/10] Fixing the UDAF example --- .../core/src/logical_plan/streaming_window.rs | 2 +- .../core/src/physical_plan/continuous/mod.rs | 2 +- .../python/examples/udaf_example.py | 60 ++++++------------- 3 files changed, 20 insertions(+), 44 deletions(-) diff --git a/crates/core/src/logical_plan/streaming_window.rs b/crates/core/src/logical_plan/streaming_window.rs index 8bb2a93..29ae2df 100644 --- a/crates/core/src/logical_plan/streaming_window.rs +++ b/crates/core/src/logical_plan/streaming_window.rs @@ -83,7 +83,7 @@ pub struct StreamingWindowSchema { impl StreamingWindowSchema { pub fn try_new(aggr_expr: Aggregate) -> Result { let inner_schema = aggr_expr.schema.inner().clone(); - let fields = inner_schema.flattened_fields().to_owned(); + let fields = inner_schema.fields(); let mut builder = SchemaBuilder::new(); diff --git a/crates/core/src/physical_plan/continuous/mod.rs b/crates/core/src/physical_plan/continuous/mod.rs index 664aada..5970890 100644 --- a/crates/core/src/physical_plan/continuous/mod.rs +++ b/crates/core/src/physical_plan/continuous/mod.rs @@ -40,7 +40,7 @@ pub(crate) fn create_group_accumulator( } fn add_window_columns_to_schema(schema: SchemaRef) -> Schema { - let fields = schema.flattened_fields().to_owned(); + let fields = schema.fields(); let mut builder = SchemaBuilder::new(); diff --git a/py-denormalized/python/examples/udaf_example.py b/py-denormalized/python/examples/udaf_example.py index 797f77b..724fdf0 100644 --- a/py-denormalized/python/examples/udaf_example.py +++ b/py-denormalized/python/examples/udaf_example.py @@ -4,12 +4,13 @@ import signal import sys from collections import Counter - +from typing import List import pyarrow as pa + from denormalized import Context from denormalized.datafusion import Accumulator, col from denormalized.datafusion import functions as f -from denormalized.datafusion import lit, udaf +from denormalized.datafusion import udaf def signal_handler(sig, frame): @@ -26,7 +27,6 @@ def signal_handler(sig, frame): "reading": 0.0, } - class TotalValuesRead(Accumulator): # Define the state type as a struct containing a map acc_state_type = pa.struct([("counts", pa.map_(pa.string(), pa.int64()))]) @@ -41,48 +41,25 @@ def update(self, values: pa.Array) -> None: def merge(self, states: pa.Array) -> None: # Merge multiple states into this accumulator + if states is None or len(states) == 0: + return for state in states: if state is not None: - count_map = state["counts"] - # Iterate through the map's keys and values - for k, v in zip(count_map.keys(), count_map.values()): - self.counts[k.as_py()] += v.as_py() + counts_map = state.to_pylist()[0] # will always be one element struct + for k, v in counts_map["counts"]: + self.counts[k] += v - def state(self) -> pa.Array: + def state(self) -> List[pa.Scalar]: # Convert current state to Arrow array format - if not self.counts: - # Handle empty state - return pa.array( - [{"counts": pa.array([], type=pa.map_(pa.string(), pa.int64()))}], - type=self.acc_state_type, - ) - - # Convert counter to key-value pairs - keys, values = zip(*self.counts.items()) - - # Create a single-element array containing our state struct - return pa.array( - [ - { - "counts": pa.array( - list(zip(keys, values)), type=pa.map_(pa.string(), pa.int64()) - ) - } - ], - type=self.acc_state_type, - ) - - def evaluate(self) -> pa.Array: - # Convert final state to output format - if not self.counts: - return pa.array([], type=pa.map_(pa.string(), pa.int64())) - - keys, values = zip(*self.counts.items()) - return pa.array(list(zip(keys, values)), type=pa.map_(pa.string(), pa.int64())) + result = {"counts": dict(self.counts.items())} + return [pa.scalar(result, type=pa.struct([("counts", pa.map_(pa.string(), pa.int64()))]))] + + def evaluate(self) -> pa.Scalar: + return self.state()[0] input_type = [pa.string()] -return_type = pa.string() +return_type = TotalValuesRead.acc_state_type state_type = [TotalValuesRead.acc_state_type] sample_udaf = udaf(TotalValuesRead, input_type, return_type, state_type, "stable") @@ -92,15 +69,14 @@ def print_batch(rb: pa.RecordBatch): return print(rb) - ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) +ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server, "occurred_at_ms") -ds.window( +ds = ds.window( [], [ sample_udaf(col("sensor_name")).alias("count"), ], - 1000, + 2000, None, ).sink(print_batch) From fcef54e8b43ae88c23bc9f006cfdf30651e7eea6 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Tue, 12 Nov 2024 16:20:03 -0800 Subject: [PATCH 10/10] merge main --- py-denormalized/requirements-dev.lock | 3 --- 1 file changed, 3 deletions(-) diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index 2174ce9..135f630 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -181,13 +181,10 @@ pure-eval==0.2.3 pyarrow==17.0.0 # via datafusion # via denormalized -<<<<<<< HEAD # via pyarrow-stubs pyarrow-stubs==17.11 pycparser==2.22 # via cffi -======= ->>>>>>> a684603 (Add example python udaf) pygments==2.18.0 # via ipython # via nbconvert