diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index 8529848..6acfeed 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -20,7 +20,7 @@ pub struct Orchestrator { senders: HashMap>, } -pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG +pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG /** * 1. Keep track of checkpoint per source. diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 60c3221..2fe1306 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -18,10 +18,6 @@ async fn main() -> Result<()> { .init(); let bootstrap_servers = String::from("localhost:9092"); - - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) - .await; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -36,7 +32,10 @@ async fn main() -> Result<()> { ])) .await?; - ctx.from_topic(source_topic) + let _ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await + .from_topic(source_topic) .await? .window( vec![col("sensor_name")], diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index 6e1740f..7e2372f 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -51,23 +51,27 @@ impl From for Arc { impl PyContext { /// creates a new PyDataFrame #[new] - pub fn new() -> PyResult { + pub fn new(py: Python) -> PyResult { + let rt = &get_tokio_runtime(py).0; + let fut: JoinHandle> = rt.spawn(async move { + Ok(Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await) + }); + + let context = wait_for_future(py, fut).map_err(py_denormalized_err)??; + Ok(Self { - context: Arc::new(Context::new()?), + context: Arc::new(context), }) } - fn foo(&self, _py: Python) -> PyResult { - println!("Fooooo"); - Ok("foo wtf".to_string()) - } - fn __repr__(&self, _py: Python) -> PyResult { - Ok("__repr__ PyContext".to_string()) + Ok("PyContext".to_string()) } fn __str__(&self, _py: Python) -> PyResult { - Ok("__str__ PyContext".to_string()) + Ok("PyContext".to_string()) } pub fn from_topic(