Skip to content

Commit

Permalink
backend: stop caching the SocketAddr for a specific connection in the…
Browse files Browse the repository at this point in the history
… websocket actor. fix #76
  • Loading branch information
ffreddow committed Nov 6, 2024
1 parent 4d4db40 commit f7d20ce
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 64 deletions.
4 changes: 2 additions & 2 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{

use actix::prelude::*;
pub use boringtun::*;
use db_connector::{models::wg_keys::WgKey, Pool};
use db_connector::Pool;
use ipnetwork::IpNetwork;
use lettre::SmtpTransport;
use serde::{ser::SerializeStruct, Serialize};
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct BridgeState {
pub port_discovery: Arc<Mutex<HashMap<ManagementResponse, Instant>>>,
pub charger_remote_conn_map: Mutex<HashMap<RemoteConnMeta, SocketAddr>>,
pub undiscovered_chargers: Arc<Mutex<HashMap<IpNetwork, HashSet<DiscoveryCharger>>>>,
pub lost_connections: Mutex<HashMap<i32, Vec<WgKey>>>,
pub lost_connections: Mutex<HashMap<i32, Vec<(i32, Recipient<Message>)>>>,
pub socket: UdpSocket,
}

Expand Down
47 changes: 22 additions & 25 deletions backend/src/routes/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::{
error::Error,
routes::charger::add::get_charger_from_db,
utils::{get_connection, web_block_unpacked},
AppState, BridgeState,
error::Error, routes::charger::add::get_charger_from_db, utils::{get_connection, web_block_unpacked}, AppState, BridgeState
};

use super::charger::add::password_matches;
Expand Down Expand Up @@ -192,34 +189,34 @@ pub async fn management(
}
}

let mut conn = get_connection(&state)?;
let keys_in_use: Vec<WgKey> = web_block_unpacked(move || {
use db_connector::schema::wg_keys::dsl::*;

match WgKey::belonging_to(&charger)
.filter(in_use.eq(true))
.load(&mut conn)
{
Ok(k) => Ok(k),
Err(_err) => Err(Error::InternalError),
}
})
.await?;

{
let mut lost_map = bridge_state.lost_connections.lock().unwrap();
let _ = lost_map.insert(data.id, keys_in_use);
}

{
let addresses = {
let mut map = bridge_state.charger_remote_conn_map.lock().unwrap();
map.retain(|key, _| {
let mut addresses = Vec::new();
map.retain(|key, addr| {
if key.charger_id == data.id {
addresses.push((*addr, key.conn_no));
false
} else {
true
}
});
addresses
};

let losing_conns = {
let mut clients = bridge_state.web_client_map.lock().unwrap();
let mut losing_conns = Vec::new();
for (addr, conn_no) in addresses.into_iter() {
if let Some(recipient) = clients.remove(&addr) {
losing_conns.push((conn_no, recipient));
}
}
losing_conns
};

{
let mut lost_conns = bridge_state.lost_connections.lock().unwrap();
lost_conns.insert(data.id, losing_conns);
}

let (fw_version, port) = match &data.data {
Expand Down
25 changes: 12 additions & 13 deletions backend/src/routes/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
};

use actix_web::{get, web, HttpResponse, Responder};
use db_connector::models::wg_keys::WgKey;
use ipnetwork::IpNetwork;
use serde::Serialize;

Expand All @@ -13,16 +12,16 @@ use crate::{
BridgeState, DiscoveryCharger,
};

#[derive(Serialize, Debug)]
struct ServerState {
clients: Vec<SocketAddr>,
undiscovered_clients: Vec<RemoteConnMeta>,
charger_management_map: Vec<SocketAddr>,
charger_management_map_with_id: Vec<i32>,
port_discovery: Vec<ManagementResponse>,
charger_remote_conn_map: Vec<RemoteConnMeta>,
undiscovered_chargers: HashMap<IpNetwork, HashSet<DiscoveryCharger>>,
lost_connections: Vec<(i32, Vec<WgKey>)>,
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct ServerState {
pub clients: Vec<SocketAddr>,
pub undiscovered_clients: Vec<RemoteConnMeta>,
pub charger_management_map: Vec<SocketAddr>,
pub charger_management_map_with_id: Vec<i32>,
pub port_discovery: Vec<ManagementResponse>,
pub charger_remote_conn_map: Vec<RemoteConnMeta>,
pub undiscovered_chargers: HashMap<IpNetwork, HashSet<DiscoveryCharger>>,
pub lost_connections: Vec<(i32, Vec<i32>)>,
}

#[get("/state")]
Expand Down Expand Up @@ -81,9 +80,9 @@ pub async fn state(brige_state: web::Data<BridgeState>) -> actix_web::Result<imp
map.clone()
};

let lost_connections: Vec<(i32, Vec<WgKey>)> = {
let lost_connections: Vec<(i32, Vec<i32>)> = {
let map = brige_state.lost_connections.lock().unwrap();
map.iter().map(|(id, key)| (id.to_owned(), key.clone())).collect()
map.iter().map(|(id, conns)| (id.to_owned(), conns.into_iter().map(|(conn_no, _)| *conn_no).collect())).collect()
};

let state = ServerState {
Expand Down
15 changes: 11 additions & 4 deletions backend/src/udp_server/multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use rand_core::OsRng;
use threadpool::ThreadPool;

use crate::{
udp_server::packet::ManagementCommand,
udp_server::{management::RemoteConnMeta, packet::ManagementCommand},
ws_udp_bridge::{open_connection, Message},
BridgeState,
};
Expand Down Expand Up @@ -233,9 +233,16 @@ pub fn run_server(state: web::Data<BridgeState>, thread_pool: ThreadPool) {
v.insert(tunn_data.clone());
let tunn = tunn_data.clone();
let mut lost_map = state.lost_connections.lock().unwrap();
if let Some(keys) = lost_map.remove(&id) {
for key in keys.iter() {
open_connection(key.connection_no, id, tunn.clone(), state.port_discovery.clone()).ok();
let mut undiscovered_clients = state.undiscovered_clients.lock().unwrap();
if let Some(conns) = lost_map.remove(&id) {
for (conn_no, recipient) in conns.into_iter() {
let meta = RemoteConnMeta {
charger_id: id,
conn_no
};
undiscovered_clients.insert(meta, recipient);

open_connection(conn_no, id, tunn.clone(), state.port_discovery.clone()).ok();
}
}
log::debug!("Adding management connection from {}", addr);
Expand Down
25 changes: 5 additions & 20 deletions backend/src/ws_udp_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use db_connector::models::wg_keys::WgKey;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Instant;
Expand Down Expand Up @@ -61,7 +60,6 @@ fn validate_key_id(key_id: &str) -> Result<(), ValidationError> {
pub struct WebClient {
pub key_id: uuid::Uuid,
pub charger_id: i32,
pub peer_sock_addr: Option<SocketAddr>,
pub app_state: web::Data<AppState>,
pub bridge_state: web::Data<BridgeState>,
pub conn_no: i32,
Expand All @@ -88,9 +86,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebClient {
match item {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Binary(msg)) => {
let peer_sock_addr = if let Some(addr) = self.peer_sock_addr {
addr
} else {
let peer_sock_addr = {
let meta = RemoteConnMeta {
charger_id: self.charger_id.clone(),
conn_no: self.conn_no,
Expand All @@ -99,7 +95,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebClient {
match map.get(&meta) {
Some(addr) => {
let addr = addr.to_owned();
self.peer_sock_addr = Some(addr);
addr
}
None => {
Expand Down Expand Up @@ -179,19 +174,10 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebClient {
};
{
let mut map = self.bridge_state.charger_remote_conn_map.lock().unwrap();

match self.peer_sock_addr {
Some(addr) => {
let mut map = self.bridge_state.web_client_map.lock().unwrap();
map.remove(&addr);
}
None => {
if let Some(addr) = map.get(&meta) {
let mut map = self.bridge_state.web_client_map.lock().unwrap();
map.remove(&addr);
}
}
};
if let Some(addr) = map.get(&meta) {
let mut map = self.bridge_state.web_client_map.lock().unwrap();
map.remove(&addr);
}

map.remove(&meta);
}
Expand Down Expand Up @@ -321,7 +307,6 @@ async fn start_ws(
key_id: keys.id,
charger_id: keys.charger_id,
app_state: state.clone(),
peer_sock_addr: None,
conn_no: keys.connection_no,
bridge_state: bridge_state.clone(),
};
Expand Down

0 comments on commit f7d20ce

Please sign in to comment.