Skip to content

Commit

Permalink
example of ctrl_c to stop pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Sep 18, 2024
1 parent 5cc1209 commit dc9331a
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/common/src/error/py_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ impl From<DenormalizedError> for PyErr {
PyRuntimeError::new_err(format!("{:?}", error))
}
}

impl From<PyErr> for DenormalizedError {
fn from(error: PyErr) -> Self {
DenormalizedError::Other(error.into())
}
}
2 changes: 1 addition & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = { workspace = true }
edition = { workspace = true }

[dependencies]
crossbeam = "0.8.4"
crossbeam = { version = "0.8.4" }
datafusion = { workspace = true }
log.workspace = true
once_cell = "1.19.0"
Expand Down
14 changes: 12 additions & 2 deletions py-denormalized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ name = "denormalized_python"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { workspace = true, features = ["extension-module", "abi3", "abi3-py311"] }
pyo3 = { workspace = true, features = [
"extension-module",
"abi3",
"abi3-py311",
] }
chrono = { workspace = true }
denormalized = { workspace = true, features = ["python"] }
datafusion = { workspace = true, features = [
Expand All @@ -19,4 +23,10 @@ datafusion = { workspace = true, features = [
"unicode_expressions",
] }
datafusion-python = { path = "../../datafusion-python/" }
tokio = { workspace = true }
tokio = { workspace = true, features = [
"macros",
"rt",
"rt-multi-thread",
"sync",
] }
futures = { workspace = true }
4 changes: 4 additions & 0 deletions py-denormalized/python/denormalized/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,7 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
topic (str): The Kafka topic to sink the data to.
"""
self.ds.sink_kafka(bootstrap_servers, topic)

def sink_python(self) -> None:
"""Sink the DataStream to a Python function."""
self.ds.sink_python()
11 changes: 10 additions & 1 deletion py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
from denormalized._internal import expr
from denormalized._internal import functions as f

import signal
import sys

def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

bootstrap_server = "localhost:9092"

sample_event = {
Expand All @@ -32,4 +41,4 @@
None,
).filter(
expr.Expr.column("max") > (expr.Expr.literal(pa.scalar(113)))
).sink_kafka(bootstrap_server, "out_py_topic")
).sink_python()
47 changes: 41 additions & 6 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use pyo3::prelude::*;

use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;

use tokio::task::JoinHandle;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion_python::expr::{join::PyJoinType, PyExpr};

Expand Down Expand Up @@ -187,19 +189,52 @@ impl PyDataStream {

pub fn sink_kafka(&self, bootstrap_servers: String, topic: String, py: Python) -> PyResult<()> {
let ds = self.ds.as_ref().clone();
// let bootstrap_servers = bootstrap_servers.clone();
// let topic = topic.clone();

let rt = &get_tokio_runtime(py).0;

let fut: JoinHandle<denormalized::common::error::Result<()>> =
rt.spawn(async move { ds.sink_kafka(bootstrap_servers, topic).await });

let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(())
}

pub fn sink_python(&self, _py: Python) -> PyResult<()> {
todo!("Iterate over the datastream and call a python function for each record batch")
pub fn sink_python(&self, py: Python) -> PyResult<()> {
let ds = self.ds.as_ref().clone();
let rt = &get_tokio_runtime(py).0;

let fut: JoinHandle<denormalized::common::error::Result<()>> = rt.spawn(async move {
let mut stream: SendableRecordBatchStream =
ds.df.as_ref().clone().execute_stream().await?;

loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => break, // Explicitly check for ctrl-c and exit
// loop if it occurs
message = stream.next() => {
match message.transpose(){
Ok(Some(batch)) => {
println!(
"{}",
datafusion::common::arrow::util::pretty::pretty_format_batches(&[
batch
])
.unwrap()
);
},
Ok(None) => {},
Err(err) => {
return Err(err.into());
},
}
}
}
}

Ok(())
});

let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(())
}
}
18 changes: 17 additions & 1 deletion py-denormalized/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use pyo3::prelude::*;

use datafusion_python::{expr, functions};

pub mod context;
pub mod datastream;

Expand All @@ -13,10 +15,24 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
/// A Python module implemented in Rust.
#[pymodule]
fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
// Register the Tokio Runtime as a module attribute so we can reuse it
m.add(
"runtime",
TokioRuntime(tokio::runtime::Runtime::new().unwrap()),
)?;

m.add_class::<datastream::PyDataStream>()?;
m.add_class::<context::PyContext>()?;

datafusion_python::_internal(py, m)?;
// Register `expr` as a submodule. Matching `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/
let expr = PyModule::new_bound(py, "expr")?;
expr::init_module(&expr)?;
m.add_submodule(&expr)?;

// Register the functions as a submodule
let funcs = PyModule::new_bound(py, "functions")?;
functions::init_module(&funcs)?;
m.add_submodule(&funcs)?;

Ok(())
}
3 changes: 1 addition & 2 deletions py-denormalized/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// use crate::errors::DataFusionError;
use crate::TokioRuntime;
// use datafusion::logical_expr::Volatility;
use pyo3::prelude::*;
Expand Down Expand Up @@ -26,10 +25,10 @@ where
F::Output: Send,
{
let runtime: &Runtime = &get_tokio_runtime(py).0;
// allow_threads explicitly releases the GIL until the future returns
py.allow_threads(|| runtime.block_on(f))
}


/// Print a string to the python console
pub fn python_print(py: Python, str: String) -> PyResult<()> {
// Import the Python 'builtins' module to access the print function
Expand Down

0 comments on commit dc9331a

Please sign in to comment.