From 5f5416e4724d05bb77599b5a4ac4517d29bf69f5 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 17 Sep 2024 11:00:33 -0700 Subject: [PATCH] add python sink_kafka method --- crates/core/src/datastream.rs | 2 +- .../python/examples/stream_aggregate.py | 6 ++-- py-denormalized/src/datastream.rs | 31 ++++++++++++------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 45e48fe..03c1e96 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -167,7 +167,7 @@ impl DataStream { /// execute the stream and print the results to stdout. /// Mainly used for development and debugging - pub async fn print_stream(&self) -> Result<()> { + pub async fn print_stream(self) -> Result<()> { if orchestrator::SHOULD_CHECKPOINT { let plan = self.df.as_ref().clone().create_physical_plan().await?; let node_ids = extract_node_ids_and_partitions(&plan); diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index e05b777..a875bc9 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -6,6 +6,8 @@ from denormalized._internal import expr from denormalized._internal import functions as f +bootstrap_server = "localhost:9092" + sample_event = { "occurred_at_ms": 100, "sensor_name": "foo", @@ -13,7 +15,7 @@ } ctx = Context() -ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092") +ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server) ds.window( @@ -30,4 +32,4 @@ None, ).filter( expr.Expr.column("max") > (expr.Expr.literal(pa.scalar(113))) -).print_physical_plan().print_plan().print_schema() +).sink_kafka(bootstrap_server, "out_py_topic") diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index 8aa91ed..8a1f47f 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -13,7 +13,7 @@ use datafusion_python::expr::{join::PyJoinType, PyExpr}; use denormalized::datastream::DataStream; use crate::errors::py_denormalized_err; -use crate::utils::{get_tokio_runtime, wait_for_future, python_print}; +use crate::utils::{get_tokio_runtime, python_print, wait_for_future}; #[pyclass(name = "PyDataStream", module = "denormalized", subclass)] #[derive(Clone)] @@ -161,13 +161,12 @@ impl PyDataStream { pub fn print_physical_plan(&self, py: Python) -> PyResult { let ds = self.ds.clone(); let rt = &get_tokio_runtime(py).0; - let fut: JoinHandle> = - rt.spawn(async move { - let physical_plan = ds.df.as_ref().clone().create_physical_plan().await?; - let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref()); + let fut: JoinHandle> = rt.spawn(async move { + let physical_plan = ds.df.as_ref().clone().create_physical_plan().await?; + let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref()); - Ok(format!("{}", displayable_plan.indent(true))) - }); + Ok(format!("{}", displayable_plan.indent(true))) + }); let str = wait_for_future(py, fut).map_err(py_denormalized_err)??; python_print(py, str)?; @@ -176,8 +175,7 @@ impl PyDataStream { } pub fn print_stream(&self, py: Python) -> PyResult<()> { - // Implement the method using the original Rust code - let ds = self.ds.clone(); + let ds = self.ds.as_ref().clone(); let rt = &get_tokio_runtime(py).0; let fut: JoinHandle> = rt.spawn(async move { ds.print_stream().await }); @@ -187,8 +185,17 @@ impl PyDataStream { Ok(()) } - pub fn sink_kafka(&self, _bootstrap_servers: String, _topic: String) -> PyResult<()> { - // Implement the method using the original Rust code - todo!() + pub fn sink_kafka(&self, bootstrap_servers: String, topic: String, py: Python) -> PyResult<()> { + let ds = self.ds.as_ref().clone(); + // let bootstrap_servers = bootstrap_servers.clone(); + // let topic = topic.clone(); + + let rt = &get_tokio_runtime(py).0; + let fut: JoinHandle> = + rt.spawn(async move { ds.sink_kafka(bootstrap_servers, topic).await }); + + let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??; + + Ok(()) } }