From 2414f2f7a07a401ca562b56735dbe11a2ed30b52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sosth=C3=A8ne=20Gu=C3=A9don?= Date: Wed, 11 Sep 2024 15:03:04 +0200 Subject: [PATCH] Clear all connections pool on IO errors --- Cargo.lock | 2 +- Cargo.toml | 2 +- pkcs11/Cargo.toml | 2 +- pkcs11/src/backend/login.rs | 5 ++- pkcs11/src/config/device.rs | 6 +++ pkcs11/tests/basic.rs | 74 +++++++++++++++++++++++++++++++++++++ pkcs11/tests/tools/mod.rs | 74 ++++++++++++++++++++++++++----------- 7 files changed, 138 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2db0c73b..84361b5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1466,7 +1466,7 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "ureq" version = "2.10.1" -source = "git+https://github.com/Nitrokey/ureq.git?rev=dc716a0eb412db14ca75694ab7b99fb6f5d179f0#dc716a0eb412db14ca75694ab7b99fb6f5d179f0" +source = "git+https://github.com/Nitrokey/ureq.git?rev=9ee324596cad8132d488721652dad7c37ed1987c#9ee324596cad8132d488721652dad7c37ed1987c" dependencies = [ "base64 0.22.1", "log", diff --git a/Cargo.toml b/Cargo.toml index c1872fef..76436cbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,4 @@ panic = 'abort' # Abort on panic strip = true # Strip symbols from binary [patch.crates-io] -ureq = { git = "https://github.com/Nitrokey/ureq.git", rev = "dc716a0eb412db14ca75694ab7b99fb6f5d179f0" } +ureq = { git = "https://github.com/Nitrokey/ureq.git", rev = "9ee324596cad8132d488721652dad7c37ed1987c" } diff --git a/pkcs11/Cargo.toml b/pkcs11/Cargo.toml index 0862b7c1..1ccd5876 100644 --- a/pkcs11/Cargo.toml +++ b/pkcs11/Cargo.toml @@ -50,7 +50,7 @@ pkcs11 = "0.5.0" tempfile = "3.12.0" test-log = "0.2.16" time = "0.3.36" -tokio = {version = "1", default-features = false, features = ["net", "sync", "rt", "io-util"] } +tokio = {version = "1", default-features = false, features = ["net", "sync", "rt", "io-util", "time"] } [features] pkcs11-full-tests = [] diff --git a/pkcs11/src/backend/login.rs b/pkcs11/src/backend/login.rs index 9ba29fd3..72fad128 100644 --- a/pkcs11/src/backend/login.rs +++ b/pkcs11/src/backend/login.rs @@ -249,14 +249,14 @@ impl LoginCtx { count: retry_limit, delay_seconds, } = self.slot.retries.unwrap_or(RetryConfig { - count: 1, + count: 0, delay_seconds: 0, }); let delay = Duration::from_secs(delay_seconds); loop { - if retry_count == retry_limit { + if retry_count > retry_limit { error!( "Retry count exceeded after {retry_limit} attempts, instance is unreachable" ); @@ -292,6 +292,7 @@ impl LoginCtx { ureq::ErrorKind::Io | ureq::ErrorKind::ConnectionFailed ) => { + self.slot.clear_all_pools(); instance.bump_failed(); warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {err}, retrying in {delay_seconds}s"); thread::sleep(delay); diff --git a/pkcs11/src/config/device.rs b/pkcs11/src/config/device.rs index 34ccd993..324612bb 100644 --- a/pkcs11/src/config/device.rs +++ b/pkcs11/src/config/device.rs @@ -208,4 +208,10 @@ impl Slot { !pwd.is_empty() } + + pub fn clear_all_pools(&self) { + for instance in &self.instances { + instance.config.client.clear_pool(); + } + } } diff --git a/pkcs11/tests/basic.rs b/pkcs11/tests/basic.rs index cd8db3ea..57d77076 100644 --- a/pkcs11/tests/basic.rs +++ b/pkcs11/tests/basic.rs @@ -372,3 +372,77 @@ fn multi_instance_retries() { }, ) } + +#[test_log::test] +fn pool_not_reused() { + tools::run_tests( + &[(8444, 8443), (8445, 8443)], + P11Config { + slots: vec![SlotConfig { + label: "Test slot".into(), + operator: Some(UserConfig { + username: "operator".into(), + password: Some("opPassphrase".into()), + }), + administrator: Some(UserConfig { + username: "admin".into(), + password: Some("Administrator".into()), + }), + description: Some("Test slot".into()), + instances: vec![ + InstanceConfig { + url: format!("https://{NETHSM_DOCKER_HOSTNAME}:8444/api/v1"), + danger_insecure_cert: true, + sha256_fingerprints: Vec::new(), + max_idle_connections: None, + }, + InstanceConfig { + url: format!("https://{NETHSM_DOCKER_HOSTNAME}:8445/api/v1"), + danger_insecure_cert: true, + sha256_fingerprints: Vec::new(), + max_idle_connections: None, + }, + ], + retries: None, + timeout_seconds: Some(5), + connections_max_idle_duration: None, + tcp_keepalive: None, + }], + ..Default::default() + }, + |test_ctx, ctx| { + let slot = 0; + let session = ctx.open_session(slot, 0x04, None, None).unwrap(); + let (public_key, private_key) = ctx + .generate_key_pair( + session, + &RSA_MECHANISM, + RSA_PUBLIC_KEY_ATTRIBUTES, + RSA_PRIVATE_KEY_ATTRIBUTES, + ) + .unwrap(); + let data = [0x42; 32]; + + for _ in 0..2 { + ctx.sign_init(session, &RSA_MECHANISM, private_key).unwrap(); + // Verifying signatures is not supported + let _signature = ctx.sign(session, &data).unwrap(); + } + + test_ctx.stall_active_connections(); + let start_at = Instant::now(); + ctx.sign_init(session, &RSA_MECHANISM, private_key).unwrap(); + ctx.sign(session, &data).unwrap_err(); + assert!(start_at.elapsed() > Duration::from_secs(5)); + assert!(start_at.elapsed() < Duration::from_secs(6)); + + let start_at = Instant::now(); + ctx.sign_init(session, &RSA_MECHANISM, private_key).unwrap(); + ctx.sign(session, &data).unwrap(); + assert!(start_at.elapsed() < Duration::from_secs(1)); + + ctx.destroy_object(session, public_key).unwrap(); + ctx.destroy_object(session, private_key).unwrap(); + }, + ) +} diff --git a/pkcs11/tests/tools/mod.rs b/pkcs11/tests/tools/mod.rs index 41f058ac..09b8febc 100644 --- a/pkcs11/tests/tools/mod.rs +++ b/pkcs11/tests/tools/mod.rs @@ -25,6 +25,7 @@ use tempfile::NamedTempFile; use time::format_description; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime::{self}; +use tokio::sync::broadcast; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::task::AbortHandle; use ureq::AgentBuilder; @@ -100,6 +101,7 @@ fn tls_conf() -> rustls::ClientConfig { pub struct TestContext { blocked_ports: HashSet, + stall_connections: broadcast::Sender<()>, } pub struct TestDropper { @@ -188,6 +190,11 @@ impl TestContext { .unwrap(); assert!(out_in.status.success()); } + + /// Make all active connections wait before killing the connection + pub fn stall_active_connections(&self) { + self.stall_connections.send(()).unwrap(); + } } impl Drop for TestDropper { @@ -199,27 +206,28 @@ impl Drop for TestDropper { } } -static PROXY_SENDER: LazyLock> = LazyLock::new(|| { - let (tx, mut rx) = unbounded_channel(); - std::thread::spawn(move || { - runtime::Builder::new_current_thread() - .enable_io() - .build() - .unwrap() - .block_on(async move { - let mut tasks = Vec::new(); - while let Some((from_port, to_port)) = rx.recv().await { - tasks.push(tokio::spawn(proxy(from_port, to_port))); - } - for task in tasks { - task.abort(); - } - }) +static PROXY_SENDER: LazyLock)>> = + LazyLock::new(|| { + let (tx, mut rx) = unbounded_channel(); + std::thread::spawn(move || { + runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap() + .block_on(async move { + let mut tasks = Vec::new(); + while let Some((from_port, to_port, sender)) = rx.recv().await { + tasks.push(tokio::spawn(proxy(from_port, to_port, sender))); + } + for task in tasks { + task.abort(); + } + }) + }); + tx }); - tx -}); -async fn proxy(from_port: u16, to_port: u16) { +async fn proxy(from_port: u16, to_port: u16, stall_sender: broadcast::Sender<()>) { let listener = TcpListener::bind(((Ipv4Addr::from([127, 0, 0, 1])), from_port)) .await .unwrap(); @@ -245,28 +253,43 @@ async fn proxy(from_port: u16, to_port: u16) { async fn handle_stream( mut rx: tokio::net::tcp::OwnedReadHalf, mut tx: tokio::net::tcp::OwnedWriteHalf, + mut stall_receiver: broadcast::Receiver<()>, ) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let mut buffer = vec![0; 12 * 1024]; + let mut should_stall = false; loop { let n = rx.read(&mut buffer).await.unwrap(); + match stall_receiver.try_recv() { + Ok(()) | Err(broadcast::error::TryRecvError::Lagged(_)) => should_stall = true, + Err(broadcast::error::TryRecvError::Empty) => {} + Err(broadcast::error::TryRecvError::Closed) => {} + } + if n == 0 { return; } + if should_stall { + tokio::time::sleep(Duration::from_secs(50)).await; + return; + } + tx.write_all(&buffer[..n]).await.unwrap(); } } let (rx1, tx1) = socket1.into_split(); let (rx2, tx2) = socket2.into_split(); + let stall_rx = stall_sender.subscribe(); + let stall_tx = stall_sender.subscribe(); dropper .0 - .push(tokio::spawn(handle_stream(rx1, tx2)).abort_handle()); + .push(tokio::spawn(handle_stream(rx1, tx2, stall_tx)).abort_handle()); dropper .0 - .push(tokio::spawn(handle_stream(rx2, tx1)).abort_handle()); + .push(tokio::spawn(handle_stream(rx2, tx1, stall_rx)).abort_handle()); } } @@ -286,6 +309,7 @@ pub fn run_tests( serialize_test, context: TestContext { blocked_ports: HashSet::new(), + stall_connections: broadcast::channel(1).0, }, }; @@ -343,7 +367,13 @@ pub fn run_tests( } for (in_port, out_port) in proxies { - PROXY_SENDER.send((*in_port, *out_port)).unwrap(); + PROXY_SENDER + .send(( + *in_port, + *out_port, + test_dropper.context.stall_connections.clone(), + )) + .unwrap(); } let mut tmpfile: NamedTempFile = NamedTempFile::new().unwrap();