Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislearn committed Oct 1, 2023
1 parent 598779e commit c7d0e32
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 7 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,10 @@ mod test {

#[tokio::test]
async fn test_body_stream2() {
let mut body = ResBody::Stream(Box::pin(iter(vec![
let mut body = ResBody::stream(iter(vec![
Result::<_, Box<dyn Error + Send + Sync>>::Ok(BytesMut::from("Hello").freeze()),
Result::<_, Box<dyn Error + Send + Sync>>::Ok(BytesMut::from(" World").freeze()),
])));
]));

let mut result = bytes::BytesMut::new();
while let Some(Ok(data)) = body.next().await {
Expand Down
11 changes: 7 additions & 4 deletions crates/core/src/tower_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl FlowCtrlOutContext {
}

#[doc(hidden)]
#[derive(Clone, Debug, Default)]
pub struct FlowCtrlService;
impl Service<hyper::Request<ReqBody>> for FlowCtrlService {
type Response = hyper::Response<ResBody>;
Expand Down Expand Up @@ -243,16 +244,16 @@ where
mod tests {

use super::*;
use crate::prelude::*;
use crate::test::{ResponseExt, TestClient};
use crate::{handler, Router};

#[tokio::test]
async fn test_tower_layer() {
struct TestService<S> {
inner: S,
}

impl<S, Req> Service<Req> for TestService<S>
impl<S, Req> tower::Service<Req> for TestService<S>
where
S: Service<Req>,
{
Expand Down Expand Up @@ -284,11 +285,13 @@ mod tests {
"Hello World"
}
let router = Router::new().hoop(MyServiceLayer.compat()).get(hello);
asset_eq!(
assert_eq!(
TestClient::get("http://127.0.0.1:5800")
.send(router)
.await
.take_string()
.await,
.await
.unwrap(),
"Hello World"
);
}
Expand Down
16 changes: 16 additions & 0 deletions examples/with-socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "example-with-socketioxide"
version.workspace = true
edition.workspace = true
publish.workspace = true

[dependencies]
hyper = { workspace = true, features = ["server"] }
salvo = { workspace = true, features = ["tower-compat"] }
tokio = { workspace = true, features = ["macros"] }
tower = { version = "0.4" }
tower-http = { version = "0.4" }
serde = { workspace = true, features = ["derive"] }
socketioxide = "0.5"
tracing.workspace = true
tracing-subscriber.workspace = true
104 changes: 104 additions & 0 deletions examples/with-socketioxide/src/chat.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Chat app example</title>
<script src="https://cdn.socket.io/4.6.0/socket.io.min.js"
integrity="sha384-c79GN5VsunZvi+Q/WObgk2in0CbZsHnjEqvFxC5DxHn9lTfNce2WW6h2pH6u/kF+"
crossorigin="anonymous"></script>

<script>
document.addEventListener("DOMContentLoaded", () => {
const logs = document.querySelector("textarea");
const form = document.querySelector("form");

const nickname = prompt("Enter your name");
const socket = io("http://localhost:3000", {
auth: {
nickname
}
});
function appendLog(val) {
logs.value += `${val}\n`;
logs.scrollTop = logs.scrollHeight;
}

socket.on("connect", () => {
appendLog(`[info]: Connected to server as ${nickname}`);
});

socket.on("disconnect", () => {
appendLog(`[info]: Disconnected from server`);
});

socket.on("message", (...data) => {
appendLog(`[server]: ${data}`);
});

form.addEventListener("submit", e => {
e.preventDefault();
/**
* @type {string}
*/
const input = document.querySelector("input").value;
if (input.startsWith("/")) {
switch (input.split(" ")[0].slice(1)) {
case "nick":
appendLog(`[info]: Changed nickname to ${input.split(" ")[1]}`);
socket.emit("nickname", input.split(" ")[1]);
break;
case "join":
appendLog(`[info]: Joined room ${input.split(" ")[1]}`);
socket.emit("join", input.split(" ")[1]);
break;
case "leave":
appendLog(`[info]: Left room ${input.split(" ")[1]}`);
socket.emit("leave", input.split(" ")[1]);
break;
case "msg":
const [_, to, ...msg] = input.split(" ");
appendLog(`[info]: Sent message to ${to} : ${msg.join(" ")}`);
socket.emit("message", [to, msg.join(" ")]);
break;
case "list":
appendLog(`[info]: Listing rooms`);
socket.emit("list", input.split(" ")[1]);
break;
case "quit":
appendLog(`[info]: Disconnected from server`);
socket.disconnect();
default:
appendLog("[info]: Unknown command");
}
} else {
socket.emit("message", input);
}

document.querySelector("input").value = "";
})
});
</script>
</head>

<body>
<textarea readonly cols="80" rows="20">
Help :
/nick <name> - change nickname
/join <room> - join room
/leave <room> - leave room
/msg <name|room>? <message> - send a message
/list - list all rooms
/list <room> - list all users in room
/quit - disconnect

</textarea>
<form>
<input style="width: 300px;" type="text" />
<button type="submit">Send</button>
</form>
</body>

</html>
120 changes: 120 additions & 0 deletions examples/with-socketioxide/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::sync::Arc;

use salvo::prelude::*;
use serde::Deserialize;
use socketioxide::{adapter::LocalAdapter, Socket};
use socketioxide::{Namespace, SocketIoLayer};
use tokio::time::Duration;
use tower::ServiceBuilder;
use tracing::info;

#[handler]
fn index() -> Text<&'static str> {
Text::Html(include_str!("chat.html"))
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();

let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
let ns = Namespace::builder().add("/", handle_socket).build();
let router = Router::new()
.push(Router::new().get(index))
.push(Router::new().post(SocketIoLayer::new(ns).compat()));
Server::new(acceptor).serve(router).await;
}

#[derive(Deserialize, Clone, Debug)]
struct Nickname(String);

#[derive(Deserialize)]
struct Auth {
pub nickname: Nickname,
}

pub async fn handle_socket(socket: Arc<Socket<LocalAdapter>>) {
info!("Socket connected on / with id: {}", socket.sid);
if let Ok(data) = socket.handshake.data::<Auth>() {
info!("Nickname: {:?}", data.nickname);
socket.extensions.insert(data.nickname);
socket.emit("message", "Welcome to the chat!").ok();
socket.join("default").unwrap();
} else {
info!("No nickname provided, disconnecting...");
socket.disconnect().ok();
return;
}

socket.on(
"message",
|socket, (room, message): (String, String), _, _| async move {
let Nickname(ref nickname) = *socket.extensions.get().unwrap();
info!("transfering message from {nickname} to {room}: {message}");
info!("Sockets in room: {:?}", socket.local().sockets().unwrap());
if let Some(dest) = socket
.to("default")
.sockets()
.unwrap()
.iter()
.find(|s| s.extensions.get::<Nickname>().map(|n| n.0 == room).unwrap_or_default())
{
info!("Sending message to {}", room);
dest.emit("message", format!("{}: {}", nickname, message)).ok();
}

socket
.to(room)
.emit("message", format!("{}: {}", nickname, message))
.ok();
},
);

socket.on("join", |socket, room: String, _, _| async move {
info!("Joining room {}", room);
socket.join(room).unwrap();
});

socket.on("leave", |socket, room: String, _, _| async move {
info!("Leaving room {}", room);
socket.leave(room).unwrap();
});

socket.on("list", |socket, room: Option<String>, _, _| async move {
if let Some(room) = room {
info!("Listing sockets in room {}", room);
let sockets = socket
.within(room)
.sockets()
.unwrap()
.iter()
.filter_map(|s| s.extensions.get::<Nickname>())
.fold("".to_string(), |a, b| a + &b.0 + ", ")
.trim_end_matches(", ")
.to_string();
socket.emit("message", sockets).ok();
} else {
let rooms = socket.rooms().unwrap();
info!("Listing rooms: {:?}", &rooms);
socket.emit("message", rooms).ok();
}
});

socket.on("nickname", |socket, nickname: Nickname, _, _| async move {
let previous = socket.extensions.insert(nickname.clone());
info!("Nickname changed from {:?} to {:?}", &previous, &nickname);
let msg = format!(
"{} changed his nickname to {}",
previous.map(|n| n.0).unwrap_or_default(),
nickname.0
);
socket.to("default").emit("message", msg).ok();
});

socket.on_disconnect(|socket, reason| async move {
info!("Socket disconnected: {} {}", socket.sid, reason);
let Nickname(ref nickname) = *socket.extensions.get().unwrap();
let msg = format!("{} left the chat", nickname);
socket.to("default").emit("message", msg).ok();
});
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "example-work-with-tower"
name = "example-with-tower"
version.workspace = true
edition.workspace = true
publish.workspace = true
Expand Down
File renamed without changes.

0 comments on commit c7d0e32

Please sign in to comment.