Skip to content

Commit

Permalink
configure slatedb backend in python bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Oct 21, 2024
1 parent 8eae191 commit 13fbc49
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion crates/orchestrator/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Orchestrator {
senders: HashMap<String, channel::Sender<OrchestrationMessage>>,
}

pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG
pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG

/**
* 1. Keep track of checkpoint per source.
Expand Down
9 changes: 4 additions & 5 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ async fn main() -> Result<()> {
.init();

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await;
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers);

// Connect to source topic
Expand All @@ -36,7 +32,10 @@ async fn main() -> Result<()> {
]))
.await?;

ctx.from_topic(source_topic)
let _ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await
.from_topic(source_topic)
.await?
.window(
vec![col("sensor_name")],
Expand Down
22 changes: 13 additions & 9 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,27 @@ impl From<PyContext> for Arc<Context> {
impl PyContext {
/// creates a new PyDataFrame
#[new]
pub fn new() -> PyResult<Self> {
pub fn new(py: Python) -> PyResult<Self> {
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<Context>> = rt.spawn(async move {
Ok(Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await)
});

let context = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(Self {
context: Arc::new(Context::new()?),
context: Arc::new(context),
})
}

fn foo(&self, _py: Python) -> PyResult<String> {
println!("Fooooo");
Ok("foo wtf".to_string())
}

fn __repr__(&self, _py: Python) -> PyResult<String> {
Ok("__repr__ PyContext".to_string())
Ok("PyContext".to_string())
}

fn __str__(&self, _py: Python) -> PyResult<String> {
Ok("__str__ PyContext".to_string())
Ok("PyContext".to_string())
}

pub fn from_topic(
Expand Down

0 comments on commit 13fbc49

Please sign in to comment.