Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Aug 29, 2024
1 parent b803b23 commit 9655c90
Showing 1 changed file with 46 additions and 18 deletions.
64 changes: 46 additions & 18 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,52 @@ pub async fn serve(
// TODO: so we shouldn't block here, but rather collect all the rx.await() futures
// for this frame and then wait for all of them to finish before moving on to the
let value = rx.await.unwrap().unwrap();
match value {
Value::Nothing { .. } => (),
_ => {
let _ = store
.append(
&handler.meta.topic,
Some(
store
.cas_insert(&value_to_json(&value).to_string())
.await
.unwrap(),
),
Some(serde_json::json!({
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
})),
)
.await;

if handler.meta.stateful.unwrap_or(false) {
match value {
Value::Nothing { .. } => (),
Value::Record { ref val, .. } => {
if let Some(state) = val.get("state") {
handler.state = Some(state.clone());
}
let _ = store
.append(
&handler.meta.topic,
Some(
store
.cas_insert(&value_to_json(&value).to_string())
.await
.unwrap(),
),
Some(serde_json::json!({
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
})),
)
.await;
}
_ => panic!("unexpected value type"),
}
} else {
match value {
Value::Nothing { .. } => (),
_ => {
let _ = store
.append(
&handler.meta.topic,
Some(
store
.cas_insert(&value_to_json(&value).to_string())
.await
.unwrap(),
),
Some(serde_json::json!({
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
})),
)
.await;
}
}
}
}
Expand Down

0 comments on commit 9655c90

Please sign in to comment.