Skip to content

Commit

Permalink
Clear all connections pool on IO errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Sep 13, 2024
1 parent 94b8838 commit 69abbac
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ panic = 'abort' # Abort on panic
strip = true # Strip symbols from binary

[patch.crates-io]
ureq = { git = "https://github.com/Nitrokey/ureq.git", rev = "dc716a0eb412db14ca75694ab7b99fb6f5d179f0" }
ureq = { git = "https://github.com/Nitrokey/ureq.git", rev = "9ee324596cad8132d488721652dad7c37ed1987c" }
2 changes: 1 addition & 1 deletion pkcs11/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pkcs11 = "0.5.0"
tempfile = "3.12.0"
test-log = "0.2.16"
time = "0.3.36"
tokio = {version = "1", default-features = false, features = ["net", "sync", "rt", "io-util"] }
tokio = {version = "1", default-features = false, features = ["net", "sync", "rt", "io-util", "time"] }

[features]
pkcs11-full-tests = []
Expand Down
5 changes: 3 additions & 2 deletions pkcs11/src/backend/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,14 @@ impl LoginCtx {
count: retry_limit,
delay_seconds,
} = self.slot.retries.unwrap_or(RetryConfig {
count: 1,
count: 0,
delay_seconds: 0,
});

let delay = Duration::from_secs(delay_seconds);

loop {
if retry_count == retry_limit {
if retry_count > retry_limit {
error!(
"Retry count exceeded after {retry_limit} attempts, instance is unreachable"
);
Expand Down Expand Up @@ -292,6 +292,7 @@ impl LoginCtx {
ureq::ErrorKind::Io | ureq::ErrorKind::ConnectionFailed
) =>
{
self.slot.clear_all_pools();
instance.bump_failed();
warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {err}, retrying in {delay_seconds}s");
thread::sleep(delay);
Expand Down
6 changes: 6 additions & 0 deletions pkcs11/src/config/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,10 @@ impl Slot {

!pwd.is_empty()
}

pub fn clear_all_pools(&self) {
for instance in &self.instances {
instance.config.client.clear_pool();
}
}
}
74 changes: 74 additions & 0 deletions pkcs11/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,77 @@ fn multi_instance_retries() {
},
)
}

#[test_log::test]
fn pool_not_reused() {
tools::run_tests(
&[(8444, 8443), (8445, 8443)],
P11Config {
slots: vec![SlotConfig {
label: "Test slot".into(),
operator: Some(UserConfig {
username: "operator".into(),
password: Some("opPassphrase".into()),
}),
administrator: Some(UserConfig {
username: "admin".into(),
password: Some("Administrator".into()),
}),
description: Some("Test slot".into()),
instances: vec![
InstanceConfig {
url: format!("https://{NETHSM_DOCKER_HOSTNAME}:8444/api/v1"),
danger_insecure_cert: true,
sha256_fingerprints: Vec::new(),
max_idle_connections: None,
},
InstanceConfig {
url: format!("https://{NETHSM_DOCKER_HOSTNAME}:8445/api/v1"),
danger_insecure_cert: true,
sha256_fingerprints: Vec::new(),
max_idle_connections: None,
},
],
retries: None,
timeout_seconds: Some(5),
connections_max_idle_duration: None,
tcp_keepalive: None,
}],
..Default::default()
},
|test_ctx, ctx| {
let slot = 0;
let session = ctx.open_session(slot, 0x04, None, None).unwrap();
let (public_key, private_key) = ctx
.generate_key_pair(
session,
&RSA_MECHANISM,
RSA_PUBLIC_KEY_ATTRIBUTES,
RSA_PRIVATE_KEY_ATTRIBUTES,
)
.unwrap();
let data = [0x42; 32];

for _ in 0..2 {
ctx.sign_init(session, &RSA_MECHANISM, private_key).unwrap();
// Verifying signatures is not supported
let _signature = ctx.sign(session, &data).unwrap();
}

test_ctx.stall_active_connections();
let start_at = Instant::now();
ctx.sign_init(session, &RSA_MECHANISM, private_key).unwrap();
ctx.sign(session, &data).unwrap_err();
assert!(start_at.elapsed() > Duration::from_secs(5));
assert!(start_at.elapsed() < Duration::from_secs(6));

let start_at = Instant::now();
ctx.sign_init(session, &RSA_MECHANISM, private_key).unwrap();
ctx.sign(session, &data).unwrap();
assert!(start_at.elapsed() < Duration::from_secs(1));

ctx.destroy_object(session, public_key).unwrap();
ctx.destroy_object(session, private_key).unwrap();
},
)
}
74 changes: 52 additions & 22 deletions pkcs11/tests/tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tempfile::NamedTempFile;
use time::format_description;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::{self};
use tokio::sync::broadcast;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::task::AbortHandle;
use ureq::AgentBuilder;
Expand Down Expand Up @@ -100,6 +101,7 @@ fn tls_conf() -> rustls::ClientConfig {

pub struct TestContext {
blocked_ports: HashSet<u16>,
stall_connections: broadcast::Sender<()>,
}

pub struct TestDropper {
Expand Down Expand Up @@ -188,6 +190,11 @@ impl TestContext {
.unwrap();
assert!(out_in.status.success());
}

/// Make all active connections wait before killing the connection
pub fn stall_active_connections(&self) {
self.stall_connections.send(()).unwrap();
}
}

impl Drop for TestDropper {
Expand All @@ -199,27 +206,28 @@ impl Drop for TestDropper {
}
}

static PROXY_SENDER: LazyLock<UnboundedSender<(u16, u16)>> = LazyLock::new(|| {
let (tx, mut rx) = unbounded_channel();
std::thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
let mut tasks = Vec::new();
while let Some((from_port, to_port)) = rx.recv().await {
tasks.push(tokio::spawn(proxy(from_port, to_port)));
}
for task in tasks {
task.abort();
}
})
static PROXY_SENDER: LazyLock<UnboundedSender<(u16, u16, broadcast::Sender<()>)>> =
LazyLock::new(|| {
let (tx, mut rx) = unbounded_channel();
std::thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
let mut tasks = Vec::new();
while let Some((from_port, to_port, sender)) = rx.recv().await {
tasks.push(tokio::spawn(proxy(from_port, to_port, sender)));
}
for task in tasks {
task.abort();
}
})
});
tx
});
tx
});

async fn proxy(from_port: u16, to_port: u16) {
async fn proxy(from_port: u16, to_port: u16, stall_sender: broadcast::Sender<()>) {
let listener = TcpListener::bind(((Ipv4Addr::from([127, 0, 0, 1])), from_port))
.await
.unwrap();
Expand All @@ -245,28 +253,43 @@ async fn proxy(from_port: u16, to_port: u16) {
async fn handle_stream(
mut rx: tokio::net::tcp::OwnedReadHalf,
mut tx: tokio::net::tcp::OwnedWriteHalf,
mut stall_receiver: broadcast::Receiver<()>,
) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buffer = vec![0; 12 * 1024];
let mut should_stall = false;
loop {
let n = rx.read(&mut buffer).await.unwrap();

match stall_receiver.try_recv() {
Ok(()) | Err(broadcast::error::TryRecvError::Lagged(_)) => should_stall = true,
Err(broadcast::error::TryRecvError::Empty) => {}
Err(broadcast::error::TryRecvError::Closed) => {}
}

if n == 0 {
return;
}

if should_stall {
tokio::time::sleep(Duration::from_secs(50)).await;
return;
}

tx.write_all(&buffer[..n]).await.unwrap();
}
}

let (rx1, tx1) = socket1.into_split();
let (rx2, tx2) = socket2.into_split();
let stall_rx = stall_sender.subscribe();
let stall_tx = stall_sender.subscribe();
dropper
.0
.push(tokio::spawn(handle_stream(rx1, tx2)).abort_handle());
.push(tokio::spawn(handle_stream(rx1, tx2, stall_tx)).abort_handle());
dropper
.0
.push(tokio::spawn(handle_stream(rx2, tx1)).abort_handle());
.push(tokio::spawn(handle_stream(rx2, tx1, stall_rx)).abort_handle());
}
}

Expand All @@ -286,6 +309,7 @@ pub fn run_tests(
serialize_test,
context: TestContext {
blocked_ports: HashSet::new(),
stall_connections: broadcast::channel(1).0,
},
};

Expand Down Expand Up @@ -343,7 +367,13 @@ pub fn run_tests(
}

for (in_port, out_port) in proxies {
PROXY_SENDER.send((*in_port, *out_port)).unwrap();
PROXY_SENDER
.send((
*in_port,
*out_port,
test_dropper.context.stall_connections.clone(),
))
.unwrap();
}

let mut tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
Expand Down

0 comments on commit 69abbac

Please sign in to comment.