From e9f2f7f50ffbe38d93cee4272c5c2cad3c116f6e Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 3 Oct 2024 16:56:19 -0700 Subject: [PATCH] sink_python() method -> sink() --- py-denormalized/pyproject.toml | 9 +++------ py-denormalized/python/denormalized/datastream.py | 3 ++- py-denormalized/python/examples/stream_aggregate.py | 4 ++-- py-denormalized/python/examples/udf_example.py | 6 +++--- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 4eb7bb8..bde07b2 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -5,12 +5,9 @@ build-backend = "maturin" [project] name = "denormalized" requires-python = ">=3.12" -classifiers = [ - "Programming Language :: Rust", - "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", -] -dynamic = ["version"] +classifiers = [ ] +dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml +description = "" dependencies = [ "pyarrow>=17.0.0", "datafusion>=40.1.0", diff --git a/py-denormalized/python/denormalized/datastream.py b/py-denormalized/python/denormalized/datastream.py index d961e75..418c520 100644 --- a/py-denormalized/python/denormalized/datastream.py +++ b/py-denormalized/python/denormalized/datastream.py @@ -3,6 +3,7 @@ from denormalized.datafusion import Expr from denormalized.utils import to_internal_expr, to_internal_exprs +from typing import Callable class DataStream: """Represents a stream of data that can be manipulated using various operations.""" @@ -177,6 +178,6 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None: """ self.ds.sink_kafka(bootstrap_servers, topic) - def sink_python(self, func) -> None: + def sink(self, func: Callable[[pa.RecordBatch], None]) -> None: """Sink the DataStream to a Python function.""" self.ds.sink_python(func) diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index dbf96b5..f96d0ac 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -23,7 +23,7 @@ def signal_handler(sig, frame): "reading": 0.0, } -def sample_sink_func(rb): +def print_batch(rb): print(rb) ctx = Context() @@ -44,4 +44,4 @@ def sample_sink_func(rb): None, ).filter( col("max") > (lit(113)) -).sink_python(sample_sink_func) +).sink(print_batch) diff --git a/py-denormalized/python/examples/udf_example.py b/py-denormalized/python/examples/udf_example.py index 563bd99..4993cfe 100644 --- a/py-denormalized/python/examples/udf_example.py +++ b/py-denormalized/python/examples/udf_example.py @@ -30,7 +30,7 @@ def gt(lhs: pa.Array, rhs: pa.Scalar) -> pa.Array: greater_than_udf = udf(gt, [pa.float64(), pa.float64()], pa.bool_(), "stable") -def sample_sink_func(rb: pa.RecordBatch): +def print_batch(rb: pa.RecordBatch): if not len(rb): return print(rb) @@ -55,6 +55,6 @@ def sample_sink_func(rb: pa.RecordBatch): col("count"), lit(1400.0), ), -).sink_python( - sample_sink_func +).sink( + print_batch )