Skip to content

Commit

Permalink
feat: xs.handler.register is now <topic>.register
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Sep 3, 2024
1 parent 9a7a431 commit 4b715a3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 36 deletions.
6 changes: 3 additions & 3 deletions examples/discord-bot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ open examples/discord-bot/beat.nu | .append "discord.heartbeat.register" --meta
}
pulse: 1000
}
```


# to enable a `./roll <n>d<m>` command
open examples/discord-bot/roller.nu | .append "discord.roller.register"
```
77 changes: 44 additions & 33 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use crate::nu::value_to_json;
use crate::store::{FollowOption, ReadOptions, Store};
use crate::thread_pool::ThreadPool;

#[derive(Clone, Debug, serde::Deserialize)]
#[derive(Clone, Debug, serde::Deserialize, Default)]
pub struct HandlerMeta {
topic: String,
stateful: Option<bool>,
initial_state: Option<serde_json::Value>,
pulse: Option<u64>,
Expand All @@ -29,14 +28,21 @@ pub struct HandlerMeta {
#[derive(Clone)]
struct HandlerTask {
id: Scru128Id,
topic: String,
meta: HandlerMeta,
engine: nu::Engine,
closure: Closure,
state: Option<Value>,
}

impl HandlerTask {
fn new(id: Scru128Id, meta: HandlerMeta, mut engine: nu::Engine, expression: String) -> Self {
fn new(
id: Scru128Id,
topic: String,
meta: HandlerMeta,
mut engine: nu::Engine,
expression: String,
) -> Self {
// TODO: need to establish the different points at which a handler will pick its starting
// point in the stream

Expand All @@ -59,6 +65,7 @@ impl HandlerTask {

Self {
id,
topic,
meta: meta.clone(),
engine,
closure,
Expand Down Expand Up @@ -154,7 +161,7 @@ async fn spawn(
}
let _ = store
.append(
&handler.meta.topic,
&format!("{}.state", &handler.topic),
Some(
store
.cas_insert(&value_to_json(&value).to_string())
Expand All @@ -176,7 +183,7 @@ async fn spawn(
_ => {
let _ = store
.append(
&handler.meta.topic,
&handler.topic,
Some(
store
.cas_insert(&value_to_json(&value).to_string())
Expand All @@ -198,7 +205,7 @@ async fn spawn(

let _ = store
.append(
&format!("{}.registered", &handler.meta.topic),
&format!("{}.registered", &handler.topic),
None,
Some(serde_json::json!({
"handler_id": handler.id.to_string(),
Expand All @@ -224,28 +231,33 @@ pub async fn serve(
let mut handlers: HashMap<String, HandlerTask> = HashMap::new();

while let Some(frame) = recver.recv().await {
if frame.topic == "xs.handler.register" {
if let Some(topic) = frame.topic.strip_suffix(".register") {
let meta = frame
.meta
.clone()
.and_then(|meta| serde_json::from_value::<HandlerMeta>(meta).ok());

if let Some(meta) = meta {
// TODO: emit a .err event on any of these unwraps
let hash = frame.hash.unwrap();
let reader = store.cas_reader(hash).await.unwrap();
let mut expression = String::new();
reader
.compat()
.read_to_string(&mut expression)
.await
.unwrap();

let handler = HandlerTask::new(frame.id, meta.clone(), engine.clone(), expression);
handlers.insert(meta.topic.clone(), handler.clone());
// TODO: this tx is to send commands to the spawned handler, e.g. update / stop it
let _tx = spawn(store.clone(), handler, pool.clone()).await;
}
.and_then(|meta| serde_json::from_value::<HandlerMeta>(meta).ok())
.unwrap_or_else(HandlerMeta::default);

// TODO: emit a .err event on any of these unwraps
let hash = frame.hash.unwrap();
let reader = store.cas_reader(hash).await.unwrap();
let mut expression = String::new();
reader
.compat()
.read_to_string(&mut expression)
.await
.unwrap();

let handler = HandlerTask::new(
frame.id,
topic.to_string(),
meta.clone(),
engine.clone(),
expression,
);
handlers.insert(topic.to_string(), handler.clone());
// TODO: this tx is to send commands to the spawned handler, e.g. update / stop it
let _tx = spawn(store.clone(), handler, pool.clone()).await;
continue;
}
}
Expand Down Expand Up @@ -274,7 +286,7 @@ mod tests {

let frame_handler = store
.append(
"xs.handler.register",
"action.register",
Some(
store
.cas_insert(
Expand All @@ -286,7 +298,7 @@ mod tests {
.await
.unwrap(),
),
Some(serde_json::json!({"topic": "action"})),
None,
)
.await;

Expand All @@ -300,7 +312,7 @@ mod tests {

assert_eq!(
recver.recv().await.unwrap().topic,
"xs.handler.register".to_string()
"action.register".to_string()
);
assert_eq!(
recver.recv().await.unwrap().topic,
Expand Down Expand Up @@ -346,7 +358,7 @@ mod tests {

let frame_handler = store
.append(
"xs.handler.register",
"counter.register",
Some(
store
.cas_insert(
Expand All @@ -361,7 +373,6 @@ mod tests {
.unwrap(),
),
Some(serde_json::json!({
"topic": "counter",
"stateful": true,
"initial_state": { "count": 0 }
})),
Expand All @@ -378,7 +389,7 @@ mod tests {

assert_eq!(
recver.recv().await.unwrap().topic,
"xs.handler.register".to_string()
"counter.register".to_string()
);
assert_eq!(
recver.recv().await.unwrap().topic,
Expand All @@ -395,7 +406,7 @@ mod tests {
assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string());

let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "counter".to_string());
assert_eq!(frame.topic, "counter.state".to_string());
let meta = frame.meta.unwrap();
assert_eq!(meta["handler_id"], frame_handler.id.to_string());
assert_eq!(meta["frame_id"], frame_count1.id.to_string());
Expand All @@ -407,7 +418,7 @@ mod tests {
assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string());

let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "counter".to_string());
assert_eq!(frame.topic, "counter.state".to_string());
let meta = frame.meta.unwrap();
assert_eq!(meta["handler_id"], frame_handler.id.to_string());
assert_eq!(meta["frame_id"], frame_count2.id.to_string());
Expand Down

0 comments on commit 4b715a3

Please sign in to comment.