Skip to content

Commit

Permalink
feat: add Server-Sent Events support for stream cat (#16)
Browse files Browse the repository at this point in the history
- Adds Server-Sent Events (SSE) support to stream cat
- CLI: New `--sse` flag for `cat` command
- API: Responds to `Accept: text/event-stream` header
- Streams full frame data in SSE format:
  - event: topic
  - id: frame id
  - data: JSON-encoded full frame (including hash, but not the actual content)
- Enables real-time, push-based updates from server to client
  • Loading branch information
cablehead committed Sep 27, 2024
1 parent cec069e commit 1cd7c72
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
74 changes: 60 additions & 14 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use http_body_util::StreamBody;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::body::Bytes;
use hyper::header::ACCEPT;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
Expand All @@ -27,8 +28,14 @@ use crate::thread_pool::ThreadPool;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;

#[derive(Debug, PartialEq, Clone)]
enum AcceptType {
Ndjson,
EventStream,
}

enum Routes {
StreamCat,
StreamCat(AcceptType),
StreamAppend(String),
StreamItemGet(Scru128Id),
StreamItemRemove(Scru128Id),
Expand All @@ -38,9 +45,15 @@ enum Routes {
NotFound,
}

fn match_route(method: &Method, path: &str) -> Routes {
fn match_route(method: &Method, path: &str, headers: &hyper::HeaderMap) -> Routes {
match (method, path) {
(&Method::GET, "/") => Routes::StreamCat,
(&Method::GET, "/") => {
let accept_type = match headers.get(ACCEPT) {
Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
_ => AcceptType::Ndjson,
};
Routes::StreamCat(accept_type)
}

(&Method::POST, p) if p.starts_with("/pipe/") => {
if let Some(id_str) = p.strip_prefix("/pipe/") {
Expand Down Expand Up @@ -100,23 +113,16 @@ async fn handle(
) -> HTTPResult {
let method = req.method();
let path = req.uri().path();
let headers = req.headers().clone();

match match_route(method, path) {
Routes::StreamCat => {
match match_route(method, path, &headers) {
Routes::StreamCat(accept_type) => {
let options = match ReadOptions::from_query(req.uri().query()) {
Ok(opts) => opts,
Err(err) => return response_400(err.to_string()),
};

let rx = store.read(options).await;
let stream = ReceiverStream::new(rx);
let stream = stream.map(|frame| {
let mut encoded = serde_json::to_vec(&frame).unwrap();
encoded.push(b'\n');
Ok(hyper::body::Frame::data(bytes::Bytes::from(encoded)))
});
let body = StreamBody::new(stream).boxed();
Ok(Response::new(body))
handle_stream_cat(&mut store, options, accept_type).await
}

Routes::StreamAppend(topic) => handle_stream_append(&mut store, req, topic).await,
Expand Down Expand Up @@ -149,6 +155,46 @@ async fn handle(
}
}

async fn handle_stream_cat(
store: &mut Store,
options: ReadOptions,
accept_type: AcceptType,
) -> HTTPResult {
let rx = store.read(options).await;
let stream = ReceiverStream::new(rx);

let accept_type_clone = accept_type.clone();
let stream = stream.map(move |frame| {
let bytes = match accept_type_clone {
AcceptType::Ndjson => {
let mut encoded = serde_json::to_vec(&frame).unwrap();
encoded.push(b'\n');
encoded
}
AcceptType::EventStream => format!(
"event: {}\nid: {}\ndata: {}\n\n",
frame.topic,
frame.id,
serde_json::to_string(&frame).unwrap_or_default()
)
.into_bytes(),
};
Ok(hyper::body::Frame::data(Bytes::from(bytes)))
});

let body = StreamBody::new(stream).boxed();

let content_type = match accept_type {
AcceptType::Ndjson => "application/x-ndjson",
AcceptType::EventStream => "text/event-stream",
};

Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", content_type)
.body(body)?)
}

async fn handle_stream_append(
store: &mut Store,
req: Request<hyper::body::Incoming>,
Expand Down
12 changes: 8 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn cat(
tail: bool,
last_id: Option<String>,
limit: Option<u64>,
sse: bool,
) -> Result<Receiver<Bytes>, BoxError> {
let stream = connect(addr).await?;
let io = TokioIo::new(stream);
Expand Down Expand Up @@ -91,10 +92,13 @@ pub async fn cat(
"http://localhost/".to_string()
};

let req = Request::builder()
.method(Method::GET)
.uri(uri)
.body(empty())?;
let mut req = Request::builder().method(Method::GET).uri(uri);

if sse {
req = req.header("Accept", "text/event-stream");
}

let req = req.body(empty())?;

let res = sender.send_request(req).await?;

Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ struct CommandCat {
/// Limit the number of events
#[clap(long)]
limit: Option<u64>,

/// Use Server-Sent Events format
#[clap(long)]
sse: bool,
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -184,6 +188,7 @@ async fn cat(args: CommandCat) -> Result<(), Box<dyn std::error::Error + Send +
args.tail,
args.last_id.clone(),
args.limit,
args.sse,
)
.await?;
let mut stdout = tokio::io::stdout();
Expand Down

0 comments on commit 1cd7c72

Please sign in to comment.