diff --git a/backend/src/lib.rs b/backend/src/lib.rs index e4c2763..07921d8 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -26,7 +26,7 @@ use std::{ use actix::prelude::*; pub use boringtun::*; -use db_connector::{models::wg_keys::WgKey, Pool}; +use db_connector::Pool; use ipnetwork::IpNetwork; use lettre::SmtpTransport; use serde::{ser::SerializeStruct, Serialize}; @@ -70,7 +70,7 @@ pub struct BridgeState { pub port_discovery: Arc>>, pub charger_remote_conn_map: Mutex>, pub undiscovered_chargers: Arc>>>, - pub lost_connections: Mutex>>, + pub lost_connections: Mutex)>>>, pub socket: UdpSocket, } diff --git a/backend/src/routes/management.rs b/backend/src/routes/management.rs index 7b47973..a31b221 100644 --- a/backend/src/routes/management.rs +++ b/backend/src/routes/management.rs @@ -30,10 +30,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use crate::{ - error::Error, - routes::charger::add::get_charger_from_db, - utils::{get_connection, web_block_unpacked}, - AppState, BridgeState, + error::Error, routes::charger::add::get_charger_from_db, utils::{get_connection, web_block_unpacked}, AppState, BridgeState }; use super::charger::add::password_matches; @@ -192,34 +189,34 @@ pub async fn management( } } - let mut conn = get_connection(&state)?; - let keys_in_use: Vec = web_block_unpacked(move || { - use db_connector::schema::wg_keys::dsl::*; - - match WgKey::belonging_to(&charger) - .filter(in_use.eq(true)) - .load(&mut conn) - { - Ok(k) => Ok(k), - Err(_err) => Err(Error::InternalError), - } - }) - .await?; - - { - let mut lost_map = bridge_state.lost_connections.lock().unwrap(); - let _ = lost_map.insert(data.id, keys_in_use); - } - - { + let addresses = { let mut map = bridge_state.charger_remote_conn_map.lock().unwrap(); - map.retain(|key, _| { + let mut addresses = Vec::new(); + map.retain(|key, addr| { if key.charger_id == data.id { + addresses.push((*addr, key.conn_no)); false } else { true } }); + addresses + }; + + let losing_conns = { + let mut clients = bridge_state.web_client_map.lock().unwrap(); + let mut losing_conns = Vec::new(); + for (addr, conn_no) in addresses.into_iter() { + if let Some(recipient) = clients.remove(&addr) { + losing_conns.push((conn_no, recipient)); + } + } + losing_conns + }; + + { + let mut lost_conns = bridge_state.lost_connections.lock().unwrap(); + lost_conns.insert(data.id, losing_conns); } let (fw_version, port) = match &data.data { diff --git a/backend/src/routes/state.rs b/backend/src/routes/state.rs index 7984742..3cfd36f 100644 --- a/backend/src/routes/state.rs +++ b/backend/src/routes/state.rs @@ -4,7 +4,6 @@ use std::{ }; use actix_web::{get, web, HttpResponse, Responder}; -use db_connector::models::wg_keys::WgKey; use ipnetwork::IpNetwork; use serde::Serialize; @@ -13,16 +12,16 @@ use crate::{ BridgeState, DiscoveryCharger, }; -#[derive(Serialize, Debug)] -struct ServerState { - clients: Vec, - undiscovered_clients: Vec, - charger_management_map: Vec, - charger_management_map_with_id: Vec, - port_discovery: Vec, - charger_remote_conn_map: Vec, - undiscovered_chargers: HashMap>, - lost_connections: Vec<(i32, Vec)>, +#[derive(Serialize, Debug, PartialEq, Clone)] +pub struct ServerState { + pub clients: Vec, + pub undiscovered_clients: Vec, + pub charger_management_map: Vec, + pub charger_management_map_with_id: Vec, + pub port_discovery: Vec, + pub charger_remote_conn_map: Vec, + pub undiscovered_chargers: HashMap>, + pub lost_connections: Vec<(i32, Vec)>, } #[get("/state")] @@ -81,9 +80,9 @@ pub async fn state(brige_state: web::Data) -> actix_web::Result)> = { + let lost_connections: Vec<(i32, Vec)> = { let map = brige_state.lost_connections.lock().unwrap(); - map.iter().map(|(id, key)| (id.to_owned(), key.clone())).collect() + map.iter().map(|(id, conns)| (id.to_owned(), conns.into_iter().map(|(conn_no, _)| *conn_no).collect())).collect() }; let state = ServerState { diff --git a/backend/src/udp_server/multiplex.rs b/backend/src/udp_server/multiplex.rs index 83717ca..1633c99 100644 --- a/backend/src/udp_server/multiplex.rs +++ b/backend/src/udp_server/multiplex.rs @@ -34,7 +34,7 @@ use rand_core::OsRng; use threadpool::ThreadPool; use crate::{ - udp_server::packet::ManagementCommand, + udp_server::{management::RemoteConnMeta, packet::ManagementCommand}, ws_udp_bridge::{open_connection, Message}, BridgeState, }; @@ -233,9 +233,16 @@ pub fn run_server(state: web::Data, thread_pool: ThreadPool) { v.insert(tunn_data.clone()); let tunn = tunn_data.clone(); let mut lost_map = state.lost_connections.lock().unwrap(); - if let Some(keys) = lost_map.remove(&id) { - for key in keys.iter() { - open_connection(key.connection_no, id, tunn.clone(), state.port_discovery.clone()).ok(); + let mut undiscovered_clients = state.undiscovered_clients.lock().unwrap(); + if let Some(conns) = lost_map.remove(&id) { + for (conn_no, recipient) in conns.into_iter() { + let meta = RemoteConnMeta { + charger_id: id, + conn_no + }; + undiscovered_clients.insert(meta, recipient); + + open_connection(conn_no, id, tunn.clone(), state.port_discovery.clone()).ok(); } } log::debug!("Adding management connection from {}", addr); diff --git a/backend/src/ws_udp_bridge.rs b/backend/src/ws_udp_bridge.rs index 2d1dce2..234b127 100644 --- a/backend/src/ws_udp_bridge.rs +++ b/backend/src/ws_udp_bridge.rs @@ -27,7 +27,6 @@ use db_connector::models::wg_keys::WgKey; use diesel::prelude::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::net::SocketAddr; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -61,7 +60,6 @@ fn validate_key_id(key_id: &str) -> Result<(), ValidationError> { pub struct WebClient { pub key_id: uuid::Uuid, pub charger_id: i32, - pub peer_sock_addr: Option, pub app_state: web::Data, pub bridge_state: web::Data, pub conn_no: i32, @@ -88,9 +86,7 @@ impl StreamHandler> for WebClient { match item { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Binary(msg)) => { - let peer_sock_addr = if let Some(addr) = self.peer_sock_addr { - addr - } else { + let peer_sock_addr = { let meta = RemoteConnMeta { charger_id: self.charger_id.clone(), conn_no: self.conn_no, @@ -99,7 +95,6 @@ impl StreamHandler> for WebClient { match map.get(&meta) { Some(addr) => { let addr = addr.to_owned(); - self.peer_sock_addr = Some(addr); addr } None => { @@ -179,19 +174,10 @@ impl StreamHandler> for WebClient { }; { let mut map = self.bridge_state.charger_remote_conn_map.lock().unwrap(); - - match self.peer_sock_addr { - Some(addr) => { - let mut map = self.bridge_state.web_client_map.lock().unwrap(); - map.remove(&addr); - } - None => { - if let Some(addr) = map.get(&meta) { - let mut map = self.bridge_state.web_client_map.lock().unwrap(); - map.remove(&addr); - } - } - }; + if let Some(addr) = map.get(&meta) { + let mut map = self.bridge_state.web_client_map.lock().unwrap(); + map.remove(&addr); + } map.remove(&meta); } @@ -321,7 +307,6 @@ async fn start_ws( key_id: keys.id, charger_id: keys.charger_id, app_state: state.clone(), - peer_sock_addr: None, conn_no: keys.connection_no, bridge_state: bridge_state.clone(), };