Skip to content

Commit

Permalink
chore: introduce retries in replication tests (paradedb#1689)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilyio authored Sep 20, 2024
1 parent 975ebb2 commit ce18cd9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
45 changes: 30 additions & 15 deletions pg_search/tests/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
"SELECT description FROM mock_items.search('description:shoes')".fetch(&mut source_conn);

// Wait for the replication to complete
std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(String,)> =
"SELECT description FROM mock_items.search('description:shoes')".fetch(&mut target_conn);
"SELECT description FROM mock_items.search('description:shoes')".fetch_retry(
&mut target_conn,
5,
1000,
|result| !result.is_empty(),
);

assert_eq!(source_results.len(), 1);
assert_eq!(target_results.len(), 1);
Expand All @@ -247,10 +251,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
.fetch(&mut source_conn);

// Wait for the replication to complete
std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(String,)> =
"SELECT description FROM mock_items.search('description:\"running shoes\"')"
.fetch(&mut target_conn);
"SELECT description FROM mock_items.search('description:\"running shoes\"')".fetch_retry(
&mut target_conn,
5,
1000,
|result| !result.is_empty(),
);

assert_eq!(source_results.len(), 1);
assert_eq!(target_results.len(), 1);
Expand All @@ -264,10 +271,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
"SELECT rating FROM mock_items WHERE description = 'Red sports shoes'"
.fetch(&mut source_conn);

std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(i32,)> =
"SELECT rating FROM mock_items WHERE description = 'Red sports shoes'"
.fetch(&mut target_conn);
"SELECT rating FROM mock_items WHERE description = 'Red sports shoes'".fetch_retry(
&mut target_conn,
5,
1000,
|result| !result.is_empty(),
);

assert_eq!(source_results.len(), 1);
assert_eq!(target_results.len(), 1);
Expand All @@ -281,10 +291,13 @@ async fn test_ephemeral_postgres() -> Result<()> {
"SELECT description FROM mock_items WHERE description = 'Red sports shoes'"
.fetch(&mut source_conn);

std::thread::sleep(std::time::Duration::from_secs(1));
let target_results: Vec<(String,)> =
"SELECT description FROM mock_items WHERE description = 'Red sports shoes'"
.fetch(&mut target_conn);
"SELECT description FROM mock_items WHERE description = 'Red sports shoes'".fetch_retry(
&mut target_conn,
5,
1000,
|result| !result.is_empty(),
);

assert_eq!(source_results.len(), 0);
assert_eq!(target_results.len(), 0);
Expand Down Expand Up @@ -436,12 +449,14 @@ async fn test_replication_with_pg_search_only_on_replica() -> Result<()> {
VALUES ('Green hiking shoes', 'Footwear', true, '16:00:00', '2024-07-11', '{}', '2024-07-11 16:00:00', 3)"
.execute(&mut source_conn);

// Wait for the replication to complete
std::thread::sleep(std::time::Duration::from_secs(1));

// Verify the insert is replicated to the target database and can be searched using pg_search
let target_results: Vec<(String,)> =
"SELECT description FROM mock_items.search('description:shoes')".fetch(&mut target_conn);
"SELECT description FROM mock_items.search('description:shoes')".fetch_retry(
&mut target_conn,
5,
1000,
|result| !result.is_empty(),
);

assert_eq!(target_results.len(), 1);
assert_eq!(target_results[0].0, "Green hiking shoes");
Expand Down
38 changes: 37 additions & 1 deletion shared/src/fixtures/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use super::arrow::schema_to_batch;
use anyhow::Result;
use async_std::prelude::Stream;
use async_std::stream::StreamExt;
use async_std::task::block_on;
Expand All @@ -26,7 +27,7 @@ use sqlx::{
testing::{TestArgs, TestContext, TestSupport},
ConnectOptions, Decode, Executor, FromRow, PgConnection, Postgres, Type,
};
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub struct Db {
context: TestContext<Postgres>,
Expand Down Expand Up @@ -94,6 +95,41 @@ where
})
}

fn fetch_retry<T>(
self,
connection: &mut PgConnection,
retries: u32,
delay_ms: u64,
validate: fn(&[T]) -> bool,
) -> Vec<T>
where
T: for<'r> FromRow<'r, <Postgres as sqlx::Database>::Row> + Send + Unpin,
{
for attempt in 0..retries {
match block_on(async {
sqlx::query_as::<_, T>(self.as_ref())
.fetch_all(&mut *connection)
.await
.map_err(anyhow::Error::from)
}) {
Ok(result) => {
if validate(&result) {
return result;
} else if attempt < retries - 1 {
block_on(async_std::task::sleep(Duration::from_millis(delay_ms)));
} else {
return vec![];
}
}
Err(_) if attempt < retries - 1 => {
block_on(async_std::task::sleep(Duration::from_millis(delay_ms)));
}
Err(e) => panic!("Fetch attempt {}/{} failed: {}", attempt + 1, retries, e),
}
}
panic!("Exhausted retries for query '{}'", self.as_ref());
}

fn fetch_dynamic(self, connection: &mut PgConnection) -> Vec<PgRow> {
block_on(async {
sqlx::query(self.as_ref())
Expand Down

0 comments on commit ce18cd9

Please sign in to comment.