Skip to content

Commit

Permalink
make sink_python work
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Sep 20, 2024
1 parent 8694d96 commit 3c36be7
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
4 changes: 2 additions & 2 deletions py-denormalized/python/denormalized/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,6 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
"""
self.ds.sink_kafka(bootstrap_servers, topic)

def sink_python(self) -> None:
def sink_python(self, func) -> None:
"""Sink the DataStream to a Python function."""
self.ds.sink_python()
self.ds.sink_python(func)
6 changes: 5 additions & 1 deletion py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def signal_handler(sig, frame):
"reading": 0.0,
}

def sample_func(rb):
print("hello world2!")
print(len(rb))

ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server)

Expand All @@ -41,4 +45,4 @@ def signal_handler(sig, frame):
None,
).filter(
expr.Expr.column("max") > (expr.Expr.literal(pa.scalar(113)))
).sink_python()
).sink_python(sample_func)
23 changes: 12 additions & 11 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use tokio::task::JoinHandle;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::pyarrow::ToPyArrow;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion_python::expr::{join::PyJoinType, PyExpr};

use denormalized::datastream::DataStream;

use crate::errors::py_denormalized_err;
use crate::errors::{py_denormalized_err, Result};
use crate::utils::{get_tokio_runtime, python_print, wait_for_future};

#[pyclass(name = "PyDataStream", module = "denormalized", subclass)]
Expand Down Expand Up @@ -198,11 +199,12 @@ impl PyDataStream {
Ok(())
}

pub fn sink_python(&self, py: Python) -> PyResult<()> {
/// Execute the dataframe and pass the resulting recordbatch to a python function
pub fn sink_python(&self, func: PyObject, py: Python) -> PyResult<()> {
let ds = self.ds.as_ref().clone();
let rt = &get_tokio_runtime(py).0;

let fut: JoinHandle<denormalized::common::error::Result<()>> = rt.spawn(async move {
let fut: JoinHandle<Result<()>> = rt.spawn(async move {
let mut stream: SendableRecordBatchStream =
ds.df.as_ref().clone().execute_stream().await?;

Expand All @@ -211,15 +213,13 @@ impl PyDataStream {
_ = tokio::signal::ctrl_c() => break, // Explicitly check for ctrl-c and exit
// loop if it occurs
message = stream.next() => {
match message.transpose(){
match message.transpose() {
Ok(Some(batch)) => {
println!(
"{}",
datafusion::common::arrow::util::pretty::pretty_format_batches(&[
batch
])
.unwrap()
);
Python::with_gil(|py| -> PyResult<()> {
let batch = batch.clone().to_pyarrow(py)?;
func.call1(py, (batch,))?;
Ok(())
})?;
},
Ok(None) => {},
Err(err) => {
Expand All @@ -233,6 +233,7 @@ impl PyDataStream {
Ok(())
});

// rt.block_on(fut).map_err(py_denormalized_err)??;
let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(())
Expand Down
9 changes: 9 additions & 0 deletions py-denormalized/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::error::Error;
use std::fmt::Debug;

use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
use denormalized::common::error::DenormalizedError as InnerDenormalizedError;
use pyo3::{exceptions::PyException, PyErr};

Expand All @@ -16,6 +17,7 @@ pub enum DenormalizedError {
ArrowError(ArrowError),
Common(String),
PythonError(PyErr),
DataFusionError(DataFusionError),
}

impl fmt::Display for DenormalizedError {
Expand All @@ -25,6 +27,7 @@ impl fmt::Display for DenormalizedError {
DenormalizedError::ArrowError(e) => write!(f, "Arrow error: {e:?}"),
DenormalizedError::PythonError(e) => write!(f, "Python error {e:?}"),
DenormalizedError::Common(e) => write!(f, "{e}"),
DenormalizedError::DataFusionError(e) => write!(f, "DataFusionError{e}"),
}
}
}
Expand Down Expand Up @@ -56,6 +59,12 @@ impl From<DenormalizedError> for PyErr {
}
}

impl From<DataFusionError> for DenormalizedError {
fn from(err: DataFusionError) -> DenormalizedError {
DenormalizedError::DataFusionError(err)
}
}

impl Error for DenormalizedError {}

pub fn py_type_err(e: impl Debug) -> PyErr {
Expand Down

0 comments on commit 3c36be7

Please sign in to comment.