Skip to content

Commit

Permalink
add python sink_kafka method
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Sep 17, 2024
1 parent bf12e6d commit 5f5416e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl DataStream {

/// execute the stream and print the results to stdout.
/// Mainly used for development and debugging
pub async fn print_stream(&self) -> Result<()> {
pub async fn print_stream(self) -> Result<()> {
if orchestrator::SHOULD_CHECKPOINT {
let plan = self.df.as_ref().clone().create_physical_plan().await?;
let node_ids = extract_node_ids_and_partitions(&plan);
Expand Down
6 changes: 4 additions & 2 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from denormalized._internal import expr
from denormalized._internal import functions as f

bootstrap_server = "localhost:9092"

sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}

ctx = Context()
ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092")
ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server)


ds.window(
Expand All @@ -30,4 +32,4 @@
None,
).filter(
expr.Expr.column("max") > (expr.Expr.literal(pa.scalar(113)))
).print_physical_plan().print_plan().print_schema()
).sink_kafka(bootstrap_server, "out_py_topic")
31 changes: 19 additions & 12 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datafusion_python::expr::{join::PyJoinType, PyExpr};
use denormalized::datastream::DataStream;

use crate::errors::py_denormalized_err;
use crate::utils::{get_tokio_runtime, wait_for_future, python_print};
use crate::utils::{get_tokio_runtime, python_print, wait_for_future};

#[pyclass(name = "PyDataStream", module = "denormalized", subclass)]
#[derive(Clone)]
Expand Down Expand Up @@ -161,13 +161,12 @@ impl PyDataStream {
pub fn print_physical_plan(&self, py: Python) -> PyResult<Self> {
let ds = self.ds.clone();
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<String>> =
rt.spawn(async move {
let physical_plan = ds.df.as_ref().clone().create_physical_plan().await?;
let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref());
let fut: JoinHandle<denormalized::common::error::Result<String>> = rt.spawn(async move {
let physical_plan = ds.df.as_ref().clone().create_physical_plan().await?;
let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref());

Ok(format!("{}", displayable_plan.indent(true)))
});
Ok(format!("{}", displayable_plan.indent(true)))
});

let str = wait_for_future(py, fut).map_err(py_denormalized_err)??;
python_print(py, str)?;
Expand All @@ -176,8 +175,7 @@ impl PyDataStream {
}

pub fn print_stream(&self, py: Python) -> PyResult<()> {
// Implement the method using the original Rust code
let ds = self.ds.clone();
let ds = self.ds.as_ref().clone();
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<()>> =
rt.spawn(async move { ds.print_stream().await });
Expand All @@ -187,8 +185,17 @@ impl PyDataStream {
Ok(())
}

pub fn sink_kafka(&self, _bootstrap_servers: String, _topic: String) -> PyResult<()> {
// Implement the method using the original Rust code
todo!()
pub fn sink_kafka(&self, bootstrap_servers: String, topic: String, py: Python) -> PyResult<()> {
let ds = self.ds.as_ref().clone();
// let bootstrap_servers = bootstrap_servers.clone();
// let topic = topic.clone();

let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<()>> =
rt.spawn(async move { ds.sink_kafka(bootstrap_servers, topic).await });

let _ = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(())
}
}

0 comments on commit 5f5416e

Please sign in to comment.