From dc9331a096656f57dd179e2c40e32235bd838a48 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Wed, 18 Sep 2024 16:21:16 -0700 Subject: [PATCH] example of ctrl_c to stop pipeline --- Cargo.lock | 1 + crates/common/src/error/py_err.rs | 6 +++ crates/orchestrator/Cargo.toml | 2 +- py-denormalized/Cargo.toml | 14 +++++- .../python/denormalized/datastream.py | 4 ++ .../python/examples/stream_aggregate.py | 11 ++++- py-denormalized/src/datastream.rs | 47 ++++++++++++++++--- py-denormalized/src/lib.rs | 18 ++++++- py-denormalized/src/utils.rs | 3 +- 9 files changed, 93 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41de9c1..4bdfc34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1352,6 +1352,7 @@ dependencies = [ "datafusion", "datafusion-python", "denormalized", + "futures", "pyo3", "tokio", ] diff --git a/crates/common/src/error/py_err.rs b/crates/common/src/error/py_err.rs index 7a8b9ab..33249ac 100644 --- a/crates/common/src/error/py_err.rs +++ b/crates/common/src/error/py_err.rs @@ -8,3 +8,9 @@ impl From for PyErr { PyRuntimeError::new_err(format!("{:?}", error)) } } + +impl From for DenormalizedError { + fn from(error: PyErr) -> Self { + DenormalizedError::Other(error.into()) + } +} diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index ba7c0af..70eb9c7 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -4,7 +4,7 @@ version = { workspace = true } edition = { workspace = true } [dependencies] -crossbeam = "0.8.4" +crossbeam = { version = "0.8.4" } datafusion = { workspace = true } log.workspace = true once_cell = "1.19.0" diff --git a/py-denormalized/Cargo.toml b/py-denormalized/Cargo.toml index dc79b8c..57585c3 100644 --- a/py-denormalized/Cargo.toml +++ b/py-denormalized/Cargo.toml @@ -10,7 +10,11 @@ name = "denormalized_python" crate-type = ["cdylib"] [dependencies] -pyo3 = { workspace = true, features = ["extension-module", "abi3", "abi3-py311"] } +pyo3 = { workspace = true, features = [ + "extension-module", + "abi3", + "abi3-py311", +] } chrono = { workspace = true } denormalized = { workspace = true, features = ["python"] } datafusion = { workspace = true, features = [ @@ -19,4 +23,10 @@ datafusion = { workspace = true, features = [ "unicode_expressions", ] } datafusion-python = { path = "../../datafusion-python/" } -tokio = { workspace = true } +tokio = { workspace = true, features = [ + "macros", + "rt", + "rt-multi-thread", + "sync", +] } +futures = { workspace = true } diff --git a/py-denormalized/python/denormalized/datastream.py b/py-denormalized/python/denormalized/datastream.py index 4729da4..2af0d2f 100644 --- a/py-denormalized/python/denormalized/datastream.py +++ b/py-denormalized/python/denormalized/datastream.py @@ -159,3 +159,7 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None: topic (str): The Kafka topic to sink the data to. """ self.ds.sink_kafka(bootstrap_servers, topic) + + def sink_python(self) -> None: + """Sink the DataStream to a Python function.""" + self.ds.sink_python() diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index a875bc9..0b9b7dc 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -6,6 +6,15 @@ from denormalized._internal import expr from denormalized._internal import functions as f +import signal +import sys + +def signal_handler(sig, frame): + print('You pressed Ctrl+C!') + sys.exit(0) + +signal.signal(signal.SIGINT, signal_handler) + bootstrap_server = "localhost:9092" sample_event = { @@ -32,4 +41,4 @@ None, ).filter( expr.Expr.column("max") > (expr.Expr.literal(pa.scalar(113))) -).sink_kafka(bootstrap_server, "out_py_topic") +).sink_python() diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index 60dc483..34b1e9c 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -1,5 +1,6 @@ use pyo3::prelude::*; +use futures::StreamExt; use std::sync::Arc; use std::time::Duration; @@ -7,6 +8,7 @@ use tokio::task::JoinHandle; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::PyArrowType; +use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion_python::expr::{join::PyJoinType, PyExpr}; @@ -187,19 +189,52 @@ impl PyDataStream { pub fn sink_kafka(&self, bootstrap_servers: String, topic: String, py: Python) -> PyResult<()> { let ds = self.ds.as_ref().clone(); - // let bootstrap_servers = bootstrap_servers.clone(); - // let topic = topic.clone(); - let rt = &get_tokio_runtime(py).0; + let fut: JoinHandle> = rt.spawn(async move { ds.sink_kafka(bootstrap_servers, topic).await }); - let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??; Ok(()) } - pub fn sink_python(&self, _py: Python) -> PyResult<()> { - todo!("Iterate over the datastream and call a python function for each record batch") + pub fn sink_python(&self, py: Python) -> PyResult<()> { + let ds = self.ds.as_ref().clone(); + let rt = &get_tokio_runtime(py).0; + + let fut: JoinHandle> = rt.spawn(async move { + let mut stream: SendableRecordBatchStream = + ds.df.as_ref().clone().execute_stream().await?; + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => break, // Explicitly check for ctrl-c and exit + // loop if it occurs + message = stream.next() => { + match message.transpose(){ + Ok(Some(batch)) => { + println!( + "{}", + datafusion::common::arrow::util::pretty::pretty_format_batches(&[ + batch + ]) + .unwrap() + ); + }, + Ok(None) => {}, + Err(err) => { + return Err(err.into()); + }, + } + } + } + } + + Ok(()) + }); + + let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??; + + Ok(()) } } diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index feb7a92..4e1c285 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -1,5 +1,7 @@ use pyo3::prelude::*; +use datafusion_python::{expr, functions}; + pub mod context; pub mod datastream; @@ -13,10 +15,24 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. #[pymodule] fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { + // Register the Tokio Runtime as a module attribute so we can reuse it + m.add( + "runtime", + TokioRuntime(tokio::runtime::Runtime::new().unwrap()), + )?; + m.add_class::()?; m.add_class::()?; - datafusion_python::_internal(py, m)?; + // Register `expr` as a submodule. Matching `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/ + let expr = PyModule::new_bound(py, "expr")?; + expr::init_module(&expr)?; + m.add_submodule(&expr)?; + + // Register the functions as a submodule + let funcs = PyModule::new_bound(py, "functions")?; + functions::init_module(&funcs)?; + m.add_submodule(&funcs)?; Ok(()) } diff --git a/py-denormalized/src/utils.rs b/py-denormalized/src/utils.rs index 3349c6d..a7f41f6 100644 --- a/py-denormalized/src/utils.rs +++ b/py-denormalized/src/utils.rs @@ -1,4 +1,3 @@ -// use crate::errors::DataFusionError; use crate::TokioRuntime; // use datafusion::logical_expr::Volatility; use pyo3::prelude::*; @@ -26,10 +25,10 @@ where F::Output: Send, { let runtime: &Runtime = &get_tokio_runtime(py).0; + // allow_threads explicitly releases the GIL until the future returns py.allow_threads(|| runtime.block_on(f)) } - /// Print a string to the python console pub fn python_print(py: Python, str: String) -> PyResult<()> { // Import the Python 'builtins' module to access the print function