Skip to content

Commit

Permalink
Add test of server sent events
Browse files Browse the repository at this point in the history
  • Loading branch information
kahnclusions committed Aug 2, 2024
1 parent ba52291 commit 66cbf45
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 3 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["app", "frontend", "server", "fnord_ui", "qbittorrent_rs", "qbittorrent_rs_proto"]
members = ["app", "frontend", "server", "fnord_ui", "qbittorrent_rs", "qbittorrent_rs_proto", "leptos_sse"]

# need to be applied only to wasm build
[profile.release]
Expand Down
4 changes: 3 additions & 1 deletion app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ bincode = { version = "1.3.3", optional = true }
base64 = { version = "0.22.1", optional = true }
anyhow.workspace = true

leptos_sse = { path = "../leptos_sse" }

[features]
default = []
hydrate = ["leptos/hydrate", "dep:wasm-bindgen", "dep:wasm-bindgen-futures", "fnord-ui/hydrate"]
ssr = ["leptos/ssr", "leptos_meta/ssr", "leptos_router/ssr", "dep:leptos_axum", "dep:simple_crypt", "dep:bincode", "dep:base64", "fnord-ui/ssr", "qbittorrent-rs"]
ssr = ["leptos/ssr", "leptos_meta/ssr", "leptos_router/ssr", "dep:leptos_axum", "dep:simple_crypt", "dep:bincode", "dep:base64", "fnord-ui/ssr", "qbittorrent-rs", "leptos_sse/ssr"]

21 changes: 20 additions & 1 deletion app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use leptos_meta::*;
use leptos_router::{components::*, StaticSegment};

use fnord_ui::components::{Navbar, NavbarBrand};
use leptos_sse::sse_signal;
use serde::{Deserialize, Serialize};

pub mod error_template;

Expand Down Expand Up @@ -73,7 +75,7 @@ fn HomePage(is_auth: Signal<bool>, action: ServerAction<Login>) -> impl IntoView
let res = move || {
if is_auth() {
Either::Left(view! {
<div>"Hello"</div>
<div><Dashboard /></div>
})
} else {
Either::Right(view! {
Expand Down Expand Up @@ -105,3 +107,20 @@ fn HomePage(is_auth: Signal<bool>, action: ServerAction<Login>) -> impl IntoView
<div>{res()}</div>
}
}

#[component]
fn Dashboard() -> impl IntoView {
// Provide websocket connection
// leptos_sse::provide_sse("http://localhost:3000/sse").unwrap();

// Create sse signal
let count = sse_signal::<Count>("http://localhost:3010/sse");
view! {
<div>Count: {move || { view! { <span>{count.get().value.to_string()}</span>}}}</div>
}
}

#[derive(Clone, Default, Serialize, Deserialize)]
pub struct Count {
pub value: i32,
}
42 changes: 42 additions & 0 deletions leptos_sse/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[package]
name = "leptos_sse"
version = "0.4.0"
edition = "2021"
description = "Leptos server signals synced through server-sent-events (SSE)"
repository = "https://github.com/messense/leptos_sse"
license = "MIT"
keywords = ["leptos", "server", "signal", "sse"]
categories = [
"wasm",
"web-programming",
"web-programming::http-client",
"web-programming::http-server",
]

[dependencies]
cfg-if = "1"
js-sys = "0.3.61"
json-patch = "1.0.0"
leptos.workspace = true
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0"
wasm-bindgen = { version = "0.2.84", default-features = false }
web-sys = { version = "0.3.61", features = ["EventSource", "MessageEvent"] }
pin-project-lite = "0.2.12"
tokio = { version = "1.36.0", optional = true }
tokio-stream = { version = "0.1.14", optional = true }
tracing.workspace = true

axum = { version = "0.7", default-features = false, features = [
"tokio",
"json",
], optional = true }
futures = { version = "0.3.28", default-features = false, optional = true }

[features]
default = []
ssr = ["dep:axum", "dep:futures", "dep:tokio", "dep:tokio-stream"]

[package.metadata.docs.rs]
features = ["ssr"]
rustdoc-args = ["--cfg", "docsrs"]
118 changes: 118 additions & 0 deletions leptos_sse/src/axum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::borrow::Cow;
use std::pin::Pin;
use std::task::Poll;

use axum::response::sse::Event;
use futures::stream::{Stream, StreamExt, TryStream};
use pin_project_lite::pin_project;
use serde::Serialize;
use serde_json::Value;
use tokio::sync::mpsc;
pub use tokio::sync::mpsc::error::{SendError, TrySendError};
use tokio_stream::wrappers::ReceiverStream;

use crate::ServerSignalUpdate;

pin_project! {
/// A signal owned by the server which writes to the SSE when mutated.
#[derive(Clone, Debug)]
pub struct ServerSentEvents<S> {
name: Cow<'static, str>,
#[pin]
stream: S,
json_value: Value,
}
}

impl<S> ServerSentEvents<S> {
/// Create a new [`ServerSentEvents`] a stream, initializing `T` to default.
///
/// This function can fail if serilization of `T` fails.
pub fn new<T>(name: impl Into<Cow<'static, str>>, stream: S) -> Result<Self, serde_json::Error>
where
T: Default + Serialize,
S: TryStream<Ok = T, Error = axum::BoxError>,
{
Ok(ServerSentEvents {
name: name.into(),
stream,
json_value: serde_json::to_value(T::default())?,
})
}

/// Create a server-sent-events (SSE) channel pair.
///
/// The `buffer` argument controls how many unsent messages can be stored without waiting.
///
/// The first item in the tuple is the MPSC channel sender half.
pub fn channel<T>(
name: impl Into<Cow<'static, str>>,
buffer: usize,
) -> Result<
(
Sender<T>,
ServerSentEvents<impl TryStream<Ok = T, Error = axum::BoxError>>,
),
serde_json::Error,
>
where
T: Default + Serialize,
{
let (sender, receiver) = mpsc::channel::<T>(buffer);
let stream = ReceiverStream::new(receiver).map(Ok);
Ok((Sender(sender), ServerSentEvents::new(name, stream)?))
}
}

impl<S> Stream for ServerSentEvents<S>
where
S: TryStream<Error = axum::BoxError>,
S::Ok: Serialize,
{
type Item = Result<Event, axum::BoxError>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.try_poll_next(cx) {
Poll::Ready(Some(Ok(value))) => {
let new_json = serde_json::to_value(value)?;
let update = ServerSignalUpdate::new_from_json::<S::Item>(
this.name.clone(),
this.json_value,
&new_json,
);
*this.json_value = new_json;
let event = Event::default().json_data(update)?;
Poll::Ready(Some(Ok(event)))
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

/// Sender half of a server-sent events stream.
#[derive(Clone, Debug)]
pub struct Sender<T>(mpsc::Sender<T>);

impl<T> Sender<T> {
/// Send an SSE message.
pub async fn send(&self, value: T) -> Result<(), SendError<T>>
where
T: Serialize,
{
self.0.send(value).await
}

/// Attempts to immediately send an SSE message.
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>>
where
T: Serialize,
{
self.0.try_send(value)
}
}
Loading

0 comments on commit 66cbf45

Please sign in to comment.