From ce18cd960889f0f171b39517996fbb4169926b8f Mon Sep 17 00:00:00 2001
From: Neil Hansen <51208969+neilyio@users.noreply.github.com>
Date: Fri, 20 Sep 2024 13:33:31 -0700
Subject: [PATCH] chore: introduce retries in replication tests (#1689)
---
pg_search/tests/replication.rs | 45 ++++++++++++++++++++++------------
shared/src/fixtures/db.rs | 38 +++++++++++++++++++++++++++-
2 files changed, 67 insertions(+), 16 deletions(-)
diff --git a/pg_search/tests/replication.rs b/pg_search/tests/replication.rs
index 7897f2ff09..20af4e3543 100644
--- a/pg_search/tests/replication.rs
+++ b/pg_search/tests/replication.rs
@@ -230,9 +230,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
"SELECT description FROM mock_items.search('description:shoes')".fetch(&mut source_conn);
// Wait for the replication to complete
- std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(String,)> =
- "SELECT description FROM mock_items.search('description:shoes')".fetch(&mut target_conn);
+ "SELECT description FROM mock_items.search('description:shoes')".fetch_retry(
+ &mut target_conn,
+ 5,
+ 1000,
+ |result| !result.is_empty(),
+ );
assert_eq!(source_results.len(), 1);
assert_eq!(target_results.len(), 1);
@@ -247,10 +251,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
.fetch(&mut source_conn);
// Wait for the replication to complete
- std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(String,)> =
- "SELECT description FROM mock_items.search('description:\"running shoes\"')"
- .fetch(&mut target_conn);
+ "SELECT description FROM mock_items.search('description:\"running shoes\"')".fetch_retry(
+ &mut target_conn,
+ 5,
+ 1000,
+ |result| !result.is_empty(),
+ );
assert_eq!(source_results.len(), 1);
assert_eq!(target_results.len(), 1);
@@ -264,10 +271,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
"SELECT rating FROM mock_items WHERE description = 'Red sports shoes'"
.fetch(&mut source_conn);
- std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(i32,)> =
- "SELECT rating FROM mock_items WHERE description = 'Red sports shoes'"
- .fetch(&mut target_conn);
+ "SELECT rating FROM mock_items WHERE description = 'Red sports shoes'".fetch_retry(
+ &mut target_conn,
+ 5,
+ 1000,
+ |result| !result.is_empty(),
+ );
assert_eq!(source_results.len(), 1);
assert_eq!(target_results.len(), 1);
@@ -281,10 +291,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
"SELECT description FROM mock_items WHERE description = 'Red sports shoes'"
.fetch(&mut source_conn);
- std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(String,)> =
- "SELECT description FROM mock_items WHERE description = 'Red sports shoes'"
- .fetch(&mut target_conn);
+ "SELECT description FROM mock_items WHERE description = 'Red sports shoes'".fetch_retry(
+ &mut target_conn,
+ 5,
+ 1000,
+ |result| !result.is_empty(),
+ );
assert_eq!(source_results.len(), 0);
assert_eq!(target_results.len(), 0);
@@ -436,12 +449,14 @@ async fn test_replication_with_pg_search_only_on_replica() -> Result<()> {
VALUES ('Green hiking shoes', 'Footwear', true, '16:00:00', '2024-07-11', '{}', '2024-07-11 16:00:00', 3)"
.execute(&mut source_conn);
- // Wait for the replication to complete
- std::thread::sleep(std::time::Duration::from_secs(1));
-
// Verify the insert is replicated to the target database and can be searched using pg_search
let target_results: Vec<(String,)> =
- "SELECT description FROM mock_items.search('description:shoes')".fetch(&mut target_conn);
+ "SELECT description FROM mock_items.search('description:shoes')".fetch_retry(
+ &mut target_conn,
+ 5,
+ 1000,
+ |result| !result.is_empty(),
+ );
assert_eq!(target_results.len(), 1);
assert_eq!(target_results[0].0, "Green hiking shoes");
diff --git a/shared/src/fixtures/db.rs b/shared/src/fixtures/db.rs
index abf3733120..2a41849690 100644
--- a/shared/src/fixtures/db.rs
+++ b/shared/src/fixtures/db.rs
@@ -16,6 +16,7 @@
// along with this program. If not, see .
use super::arrow::schema_to_batch;
+use anyhow::Result;
use async_std::prelude::Stream;
use async_std::stream::StreamExt;
use async_std::task::block_on;
@@ -26,7 +27,7 @@ use sqlx::{
testing::{TestArgs, TestContext, TestSupport},
ConnectOptions, Decode, Executor, FromRow, PgConnection, Postgres, Type,
};
-use std::time::{SystemTime, UNIX_EPOCH};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub struct Db {
context: TestContext,
@@ -94,6 +95,41 @@ where
})
}
+ fn fetch_retry(
+ self,
+ connection: &mut PgConnection,
+ retries: u32,
+ delay_ms: u64,
+ validate: fn(&[T]) -> bool,
+ ) -> Vec
+ where
+ T: for<'r> FromRow<'r, ::Row> + Send + Unpin,
+ {
+ for attempt in 0..retries {
+ match block_on(async {
+ sqlx::query_as::<_, T>(self.as_ref())
+ .fetch_all(&mut *connection)
+ .await
+ .map_err(anyhow::Error::from)
+ }) {
+ Ok(result) => {
+ if validate(&result) {
+ return result;
+ } else if attempt < retries - 1 {
+ block_on(async_std::task::sleep(Duration::from_millis(delay_ms)));
+ } else {
+ return vec![];
+ }
+ }
+ Err(_) if attempt < retries - 1 => {
+ block_on(async_std::task::sleep(Duration::from_millis(delay_ms)));
+ }
+ Err(e) => panic!("Fetch attempt {}/{} failed: {}", attempt + 1, retries, e),
+ }
+ }
+ panic!("Exhausted retries for query '{}'", self.as_ref());
+ }
+
fn fetch_dynamic(self, connection: &mut PgConnection) -> Vec {
block_on(async {
sqlx::query(self.as_ref())