diff --git a/crates/core/src/http/response.rs b/crates/core/src/http/response.rs index 424f6196a..7d4e7b053 100644 --- a/crates/core/src/http/response.rs +++ b/crates/core/src/http/response.rs @@ -514,10 +514,10 @@ mod test { #[tokio::test] async fn test_body_stream2() { - let mut body = ResBody::Stream(Box::pin(iter(vec![ + let mut body = ResBody::stream(iter(vec![ Result::<_, Box>::Ok(BytesMut::from("Hello").freeze()), Result::<_, Box>::Ok(BytesMut::from(" World").freeze()), - ]))); + ])); let mut result = bytes::BytesMut::new(); while let Some(Ok(data)) = body.next().await { diff --git a/crates/core/src/tower_compat.rs b/crates/core/src/tower_compat.rs index f94396840..30bc49176 100644 --- a/crates/core/src/tower_compat.rs +++ b/crates/core/src/tower_compat.rs @@ -116,6 +116,7 @@ impl FlowCtrlOutContext { } #[doc(hidden)] +#[derive(Clone, Debug, Default)] pub struct FlowCtrlService; impl Service> for FlowCtrlService { type Response = hyper::Response; @@ -243,8 +244,8 @@ where mod tests { use super::*; - use crate::prelude::*; use crate::test::{ResponseExt, TestClient}; + use crate::{handler, Router}; #[tokio::test] async fn test_tower_layer() { @@ -252,7 +253,7 @@ mod tests { inner: S, } - impl Service for TestService + impl tower::Service for TestService where S: Service, { @@ -284,11 +285,13 @@ mod tests { "Hello World" } let router = Router::new().hoop(MyServiceLayer.compat()).get(hello); - asset_eq!( + assert_eq!( TestClient::get("http://127.0.0.1:5800") .send(router) + .await .take_string() - .await, + .await + .unwrap(), "Hello World" ); } diff --git a/examples/with-socketioxide/Cargo.toml b/examples/with-socketioxide/Cargo.toml new file mode 100644 index 000000000..6269c1663 --- /dev/null +++ b/examples/with-socketioxide/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "example-with-socketioxide" +version.workspace = true +edition.workspace = true +publish.workspace = true + +[dependencies] +hyper = { workspace = true, features = ["server"] } +salvo = { workspace = true, features = ["tower-compat"] } +tokio = { workspace = true, features = ["macros"] } +tower = { version = "0.4" } +tower-http = { version = "0.4" } +serde = { workspace = true, features = ["derive"] } +socketioxide = "0.5" +tracing.workspace = true +tracing-subscriber.workspace = true diff --git a/examples/with-socketioxide/src/chat.html b/examples/with-socketioxide/src/chat.html new file mode 100644 index 000000000..2411cdf40 --- /dev/null +++ b/examples/with-socketioxide/src/chat.html @@ -0,0 +1,104 @@ + + + + + + + + Chat app example + + + + + + + +
+ + +
+ + + \ No newline at end of file diff --git a/examples/with-socketioxide/src/main.rs b/examples/with-socketioxide/src/main.rs new file mode 100644 index 000000000..962b451f8 --- /dev/null +++ b/examples/with-socketioxide/src/main.rs @@ -0,0 +1,120 @@ +use std::sync::Arc; + +use salvo::prelude::*; +use serde::Deserialize; +use socketioxide::{adapter::LocalAdapter, Socket}; +use socketioxide::{Namespace, SocketIoLayer}; +use tokio::time::Duration; +use tower::ServiceBuilder; +use tracing::info; + +#[handler] +fn index() -> Text<&'static str> { + Text::Html(include_str!("chat.html")) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt().init(); + + let acceptor = TcpListener::new("0.0.0.0:5800").bind().await; + let ns = Namespace::builder().add("/", handle_socket).build(); + let router = Router::new() + .push(Router::new().get(index)) + .push(Router::new().post(SocketIoLayer::new(ns).compat())); + Server::new(acceptor).serve(router).await; +} + +#[derive(Deserialize, Clone, Debug)] +struct Nickname(String); + +#[derive(Deserialize)] +struct Auth { + pub nickname: Nickname, +} + +pub async fn handle_socket(socket: Arc>) { + info!("Socket connected on / with id: {}", socket.sid); + if let Ok(data) = socket.handshake.data::() { + info!("Nickname: {:?}", data.nickname); + socket.extensions.insert(data.nickname); + socket.emit("message", "Welcome to the chat!").ok(); + socket.join("default").unwrap(); + } else { + info!("No nickname provided, disconnecting..."); + socket.disconnect().ok(); + return; + } + + socket.on( + "message", + |socket, (room, message): (String, String), _, _| async move { + let Nickname(ref nickname) = *socket.extensions.get().unwrap(); + info!("transfering message from {nickname} to {room}: {message}"); + info!("Sockets in room: {:?}", socket.local().sockets().unwrap()); + if let Some(dest) = socket + .to("default") + .sockets() + .unwrap() + .iter() + .find(|s| s.extensions.get::().map(|n| n.0 == room).unwrap_or_default()) + { + info!("Sending message to {}", room); + dest.emit("message", format!("{}: {}", nickname, message)).ok(); + } + + socket + .to(room) + .emit("message", format!("{}: {}", nickname, message)) + .ok(); + }, + ); + + socket.on("join", |socket, room: String, _, _| async move { + info!("Joining room {}", room); + socket.join(room).unwrap(); + }); + + socket.on("leave", |socket, room: String, _, _| async move { + info!("Leaving room {}", room); + socket.leave(room).unwrap(); + }); + + socket.on("list", |socket, room: Option, _, _| async move { + if let Some(room) = room { + info!("Listing sockets in room {}", room); + let sockets = socket + .within(room) + .sockets() + .unwrap() + .iter() + .filter_map(|s| s.extensions.get::()) + .fold("".to_string(), |a, b| a + &b.0 + ", ") + .trim_end_matches(", ") + .to_string(); + socket.emit("message", sockets).ok(); + } else { + let rooms = socket.rooms().unwrap(); + info!("Listing rooms: {:?}", &rooms); + socket.emit("message", rooms).ok(); + } + }); + + socket.on("nickname", |socket, nickname: Nickname, _, _| async move { + let previous = socket.extensions.insert(nickname.clone()); + info!("Nickname changed from {:?} to {:?}", &previous, &nickname); + let msg = format!( + "{} changed his nickname to {}", + previous.map(|n| n.0).unwrap_or_default(), + nickname.0 + ); + socket.to("default").emit("message", msg).ok(); + }); + + socket.on_disconnect(|socket, reason| async move { + info!("Socket disconnected: {} {}", socket.sid, reason); + let Nickname(ref nickname) = *socket.extensions.get().unwrap(); + let msg = format!("{} left the chat", nickname); + socket.to("default").emit("message", msg).ok(); + }); +} diff --git a/examples/work-with-tower/Cargo.toml b/examples/with-tower/Cargo.toml similarity index 91% rename from examples/work-with-tower/Cargo.toml rename to examples/with-tower/Cargo.toml index 1155e13c4..dfaa88324 100644 --- a/examples/work-with-tower/Cargo.toml +++ b/examples/with-tower/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "example-work-with-tower" +name = "example-with-tower" version.workspace = true edition.workspace = true publish.workspace = true diff --git a/examples/work-with-tower/src/main.rs b/examples/with-tower/src/main.rs similarity index 100% rename from examples/work-with-tower/src/main.rs rename to examples/with-tower/src/main.rs