Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the concept of ephemeral events #15

Merged
merged 13 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ crossbeam-channel = "0.5.13"
tracing = "0.1.40"
console = "0.15.8"
chrono = "0.4.38"
bon = "2.2"
bon = "2.3"

[dev-dependencies]
assert_cmd = "2.0.14"
Expand Down
24 changes: 21 additions & 3 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hyper_util::rt::TokioIo;

use crate::listener::Listener;
use crate::nu;
use crate::store::{self, ReadOptions, Store};
use crate::store::{self, Frame, ReadOptions, Store, TTL};
use crate::thread_pool::ThreadPool;

type BoxError = Box<dyn std::error::Error + Send + Sync>;
Expand Down Expand Up @@ -156,6 +156,12 @@ async fn handle_stream_append(
) -> HTTPResult {
let (parts, mut body) = req.into_parts();

// Parse TTL from query parameters
let ttl = match TTL::from_query(parts.uri.query()) {
Ok(ttl) => ttl,
Err(e) => return response_400(e),
};

let hash = {
let writer = store.cas_writer().await?;
let mut writer = writer.compat_write();
Expand Down Expand Up @@ -188,7 +194,15 @@ async fn handle_stream_append(
Err(e) => return response_400(e.to_string()),
};

let frame = store.append(&topic, hash, meta).await;
let frame = store
.append(
Frame::with_topic(topic)
.maybe_hash(hash)
.maybe_meta(meta)
.ttl(ttl)
.build(),
)
.await;

Ok(Response::builder()
.status(StatusCode::OK)
Expand Down Expand Up @@ -256,7 +270,11 @@ pub async fn serve(
addr: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let _ = store
.append("xs.start", None, serde_json::json!({"addr": addr}).into())
.append(
Frame::with_topic("xs.start")
.meta(serde_json::json!({"addr": addr}))
.build(),
)
.await;

let mut listener = Listener::bind(addr).await?;
Expand Down
8 changes: 7 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use hyper::{Method, Request, StatusCode};
use hyper_util::rt::TokioIo;

use crate::listener::AsyncReadWriteBox;
use crate::store::TTL;

type BoxError = Box<dyn std::error::Error + Send + Sync>;

Expand Down Expand Up @@ -135,6 +136,7 @@ pub async fn append<R>(
topic: &str,
data: R,
meta: Option<&Value>,
ttl: Option<TTL>,
) -> Result<Bytes, BoxError>
where
R: AsyncRead + Unpin + Send + 'static,
Expand All @@ -148,7 +150,11 @@ where
}
});

let uri = format!("http://localhost/{}", topic);
let mut uri = format!("http://localhost/{}", topic);
if let Some(ttl) = ttl {
uri = format!("{}?{}", uri, ttl.to_query());
}

let mut req = Request::builder().method(Method::POST).uri(uri);

if let Some(meta_value) = meta {
Expand Down
39 changes: 20 additions & 19 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ async fn spawn(
if frame.topic == format!("{}.register", &handler.topic) && frame.id != handler.id {
let _ = store
.append(
&format!("{}.unregistered", &handler.topic),
None,
Some(serde_json::json!({
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
})),
Frame::with_topic(format!("{}.unregistered", &handler.topic))
.meta(serde_json::json!({
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.build(),
)
.await;
break;
Expand All @@ -140,13 +140,14 @@ async fn spawn(

let _ = store
.append(
&format!("{}.registered", &handler.topic),
None,
Some(serde_json::json!({
"handler_id": handler.id.to_string(),
})),
Frame::with_topic(format!("{}.registered", &handler.topic))
.meta(serde_json::json!({
"handler_id": handler.id.to_string(),
}))
.build(),
)
.await;

Ok(tx_command)
}

Expand Down Expand Up @@ -340,8 +341,8 @@ mod tests {
"action.registered".to_string()
);

let _ = store.append("topic1", None, None).await;
let frame_topic2 = store.append("topic2", None, None).await;
let _ = store.append(Frame::with_topic("topic1").build()).await;
let frame_topic2 = store.append(Frame::with_topic("topic2").build()).await;
assert_eq!(recver.recv().await.unwrap().topic, "topic1".to_string());
assert_eq!(recver.recv().await.unwrap().topic, "topic2".to_string());

Expand All @@ -355,7 +356,7 @@ mod tests {
let content = store.cas_read(&frame.hash.unwrap()).await.unwrap();
assert_eq!(content, r#""ran action""#.as_bytes());

let _ = store.append("topic3", None, None).await;
let _ = store.append(Frame::with_topic("topic3").build()).await;
assert_eq!(recver.recv().await.unwrap().topic, "topic3".to_string());
}

Expand Down Expand Up @@ -405,8 +406,8 @@ mod tests {
"counter.registered".to_string()
);

let _ = store.append("topic1", None, None).await;
let frame_count1 = store.append("count.me", None, None).await;
let _ = store.append(Frame::with_topic("topic1").build()).await;
let frame_count1 = store.append(Frame::with_topic("count.me").build()).await;
assert_eq!(recver.recv().await.unwrap().topic, "topic1".to_string());
assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string());

Expand All @@ -419,7 +420,7 @@ mod tests {
let value = serde_json::from_slice::<serde_json::Value>(&content).unwrap();
assert_eq!(value, serde_json::json!({ "state": { "count": 1 } }));

let frame_count2 = store.append("count.me", None, None).await;
let frame_count2 = store.append(Frame::with_topic("count.me").build()).await;
assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string());

let frame = recver.recv().await.unwrap();
Expand Down Expand Up @@ -475,7 +476,7 @@ mod tests {
"action.registered".to_string()
);

let _ = store.append("pew", None, None).await;
let _ = store.append(Frame::with_topic("pew").build()).await;
let frame_pew = recver.recv().await.unwrap();
assert_eq!(frame_pew.topic, "pew".to_string());

Expand Down Expand Up @@ -532,7 +533,7 @@ mod tests {
);
// fin assertions on these two frames

let _ = store.append("pew", None, None).await;
let _ = store.append(Frame::with_topic("pew").build()).await;
let frame_pew = recver.recv().await.unwrap();
assert_eq!(frame_pew.topic, "pew".to_string());

Expand Down
9 changes: 5 additions & 4 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hyper::service::service_fn;
use hyper_util::rt::TokioIo;

use crate::listener::Listener;
use crate::store::{FollowOption, ReadOptions, Store};
use crate::store::{FollowOption, Frame, ReadOptions, Store};

#[derive(Serialize, Deserialize, Debug)]
pub struct Request {
Expand Down Expand Up @@ -115,9 +115,10 @@ async fn handle(

let frame = store
.append(
"http.request",
hash,
Some(serde_json::to_value(&req_meta).unwrap()),
Frame::with_topic("http.request")
.maybe_hash(hash)
.maybe_meta(serde_json::to_value(&req_meta).ok())
.build(),
)
.await;

Expand Down
18 changes: 17 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ struct CommandAppend {
/// JSON metadata to include with the append
#[clap(long, value_parser)]
meta: Option<String>,

/// Time-to-live for the event (forever, temporary, ephemeral, or duration in milliseconds)
#[clap(long, value_parser)]
ttl: Option<String>,
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -200,6 +204,18 @@ async fn append(args: CommandAppend) -> Result<(), Box<dyn std::error::Error + S
.map(|meta_str| serde_json::from_str(&meta_str))
.transpose()?;

let ttl = match args.ttl {
Some(ttl_str) => {
let query = if let Ok(duration) = u64::from_str(&ttl_str) {
format!("ttl=time&duration={}", duration)
} else {
format!("ttl={}", ttl_str)
};
Some(xs::store::TTL::from_query(Some(&query))?)
}
None => None,
};

let input = if !std::io::stdin().is_terminal() {
// Stdin is a pipe, use it as input
Box::new(stdin()) as Box<dyn AsyncRead + Unpin + Send>
Expand All @@ -208,7 +224,7 @@ async fn append(args: CommandAppend) -> Result<(), Box<dyn std::error::Error + S
Box::new(tokio::io::empty()) as Box<dyn AsyncRead + Unpin + Send>
};

let response = xs::client::append(&args.addr, &args.topic, input, meta.as_ref()).await?;
let response = xs::client::append(&args.addr, &args.topic, input, meta.as_ref(), ttl).await?;

tokio::io::stdout().write_all(&response).await?;
Ok(())
Expand Down
15 changes: 12 additions & 3 deletions src/nu/commands/append_command.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::nu::util;
use crate::store::Store;
use async_std::io::WriteExt;

use nu_engine::CallExt;
use nu_protocol::engine::{Call, Command, EngineState, Stack};
use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};

use crate::nu::util;
use crate::store::{Frame, Store};

#[derive(Clone)]
pub struct AppendCommand {
store: Store,
Expand Down Expand Up @@ -95,7 +97,14 @@ impl Command for AppendCommand {
PipelineData::Empty => Ok(None),
}?;

let frame = store.append(topic.as_str(), hash, meta).await;
let frame = store
.append(
Frame::with_topic(topic)
.maybe_hash(hash)
.maybe_meta(meta)
.build(),
)
.await;
Ok::<_, ShellError>(frame)
})?;

Expand Down
Loading