From 97e11f3433d7b940e98b26e6750a4ff514ba9a31 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 21 Oct 2024 16:07:07 -0700 Subject: [PATCH] configure slatedb backend in python bindings --- crates/orchestrator/src/orchestrator.rs | 2 +- examples/examples/emit_measurements.rs | 10 +++++----- examples/examples/simple_aggregation.rs | 9 ++++----- py-denormalized/src/context.rs | 22 +++++++++++++--------- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index 8529848..6acfeed 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -20,7 +20,7 @@ pub struct Orchestrator { senders: HashMap>, } -pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG +pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG /** * 1. Keep track of checkpoint per source. diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index af43e43..0556a47 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -33,11 +33,11 @@ async fn main() -> Result<()> { "sensor_2", "sensor_3", "sensor_4", - "sensor_10", - "sensor_11", - "sensor_12", - "sensor_13", - "sensor_14", + "sensor_5", + "sensor_6", + "sensor_7", + "sensor_8", + "sensor_9", ]; loop { diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 60c3221..2fe1306 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -18,10 +18,6 @@ async fn main() -> Result<()> { .init(); let bootstrap_servers = String::from("localhost:9092"); - - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) - .await; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -36,7 +32,10 @@ async fn main() -> Result<()> { ])) .await?; - ctx.from_topic(source_topic) + let _ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await + .from_topic(source_topic) .await? .window( vec![col("sensor_name")], diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index 6e1740f..7e2372f 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -51,23 +51,27 @@ impl From for Arc { impl PyContext { /// creates a new PyDataFrame #[new] - pub fn new() -> PyResult { + pub fn new(py: Python) -> PyResult { + let rt = &get_tokio_runtime(py).0; + let fut: JoinHandle> = rt.spawn(async move { + Ok(Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await) + }); + + let context = wait_for_future(py, fut).map_err(py_denormalized_err)??; + Ok(Self { - context: Arc::new(Context::new()?), + context: Arc::new(context), }) } - fn foo(&self, _py: Python) -> PyResult { - println!("Fooooo"); - Ok("foo wtf".to_string()) - } - fn __repr__(&self, _py: Python) -> PyResult { - Ok("__repr__ PyContext".to_string()) + Ok("PyContext".to_string()) } fn __str__(&self, _py: Python) -> PyResult { - Ok("__str__ PyContext".to_string()) + Ok("PyContext".to_string()) } pub fn from_topic(