diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index f5d81d51d1..05f41b3872 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -297,19 +297,20 @@ async fn test_cluster() -> anyhow::Result<()> { .expect("failed to get samples from client1"); // We still have a quorum (2 of 3 keepers), so we should be able to insert + // Removing a node may require a new leader election which would require + // some polling, so we go ahead and do that. let samples = oximeter_test_utils::generate_test_samples( input.n_projects, input.n_instances, input.n_cpus, input.n_samples, ); - client3 - .insert_samples(&samples) + wait_for_insert(log, &client3, &samples) .await .expect("failed to insert samples at server3"); wait_for_num_points(&log, &client2, samples.len() * 3) .await - .expect("failed to get samples from client1"); + .expect("failed to get samples from client2"); // Stop another keeper deployment.stop_keeper(1.into()).expect("failed to stop keeper 1"); @@ -355,7 +356,9 @@ async fn test_cluster() -> anyhow::Result<()> { input.n_cpus, input.n_samples, ); - client1.insert_samples(&samples).await.expect("failed to insert samples"); + wait_for_insert(log, &client1, &samples) + .await + .expect("failed to insert samples at server1"); wait_for_num_points(&log, &client2, samples.len() * 4) .await .expect("failed to get samples from client1"); @@ -375,7 +378,9 @@ async fn test_cluster() -> anyhow::Result<()> { input.n_cpus, input.n_samples, ); - client1.insert_samples(&samples).await.expect("failed to insert samples"); + wait_for_insert(log, &client1, &samples) + .await + .expect("failed to insert samples at server1"); wait_for_num_points(&log, &client2, samples.len() * 5) .await .expect("failed to get samples from client1"); @@ -519,3 +524,30 @@ async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { info!(log, "Clickhouse server ready: {}", client.url()); Ok(()) } + +/// Wait for insert to succeed after messing with keeper configuration +async fn wait_for_insert( + log: &Logger, + client: &Client, + samples: &[Sample], +) -> anyhow::Result<()> { + poll::wait_for_condition( + || async { + client + .insert_samples(&samples) + .await + .map_err(|_| poll::CondCheckError::::NotYet) + }, + &Duration::from_millis(1000), + &Duration::from_secs(60), + ) + .await + .with_context(|| { + format!( + "failed to insert samples at clickhouse server: {}", + client.url() + ) + })?; + info!(log, "inserted samples at clickhouse server: {}", client.url()); + Ok(()) +}