Skip to content

Commit

Permalink
sink_python() method -> sink() (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee authored Oct 4, 2024
1 parent 0b88531 commit 8c28354
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 12 deletions.
9 changes: 3 additions & 6 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ build-backend = "maturin"
[project]
name = "denormalized"
requires-python = ">=3.12"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dynamic = ["version"]
classifiers = [ ]
dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml
description = ""
dependencies = [
"pyarrow>=17.0.0",
"datafusion>=40.1.0",
Expand Down
3 changes: 2 additions & 1 deletion py-denormalized/python/denormalized/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from denormalized.datafusion import Expr
from denormalized.utils import to_internal_expr, to_internal_exprs

from typing import Callable

class DataStream:
"""Represents a stream of data that can be manipulated using various operations."""
Expand Down Expand Up @@ -177,6 +178,6 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
"""
self.ds.sink_kafka(bootstrap_servers, topic)

def sink_python(self, func) -> None:
def sink(self, func: Callable[[pa.RecordBatch], None]) -> None:
"""Sink the DataStream to a Python function."""
self.ds.sink_python(func)
4 changes: 2 additions & 2 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def signal_handler(sig, frame):
"reading": 0.0,
}

def sample_sink_func(rb):
def print_batch(rb):
print(rb)

ctx = Context()
Expand All @@ -44,4 +44,4 @@ def sample_sink_func(rb):
None,
).filter(
col("max") > (lit(113))
).sink_python(sample_sink_func)
).sink(print_batch)
6 changes: 3 additions & 3 deletions py-denormalized/python/examples/udf_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def gt(lhs: pa.Array, rhs: pa.Scalar) -> pa.Array:

greater_than_udf = udf(gt, [pa.float64(), pa.float64()], pa.bool_(), "stable")

def sample_sink_func(rb: pa.RecordBatch):
def print_batch(rb: pa.RecordBatch):
if not len(rb):
return
print(rb)
Expand All @@ -55,6 +55,6 @@ def sample_sink_func(rb: pa.RecordBatch):
col("count"),
lit(1400.0),
),
).sink_python(
sample_sink_func
).sink(
print_batch
)

0 comments on commit 8c28354

Please sign in to comment.