diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index daa6099e..e146b7ce 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -1052,7 +1052,7 @@ async fn follow_basic() -> eyre::Result<()> { .await; assert_eq!(status_code, StatusCode::OK); - sleep(Duration::from_secs(3)).await; + sleep(Duration::from_secs(5)).await; check_bookie_versions( follower.clone(), main.agent.actor_id(), diff --git a/crates/corro-agent/src/api/peer/follow.rs b/crates/corro-agent/src/api/peer/follow.rs index 665e547e..17fc1a57 100644 --- a/crates/corro-agent/src/api/peer/follow.rs +++ b/crates/corro-agent/src/api/peer/follow.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, io, time::Duration}; +use std::io, time::Duration; use bytes::{BufMut, BytesMut}; use corro_types::{ @@ -160,7 +160,7 @@ pub async fn serve_follow( }; debug!("sending cleared version since from - {from_ts}"); - let mut last_empty_ts: HashMap = HashMap::new(); + loop { let conn = agent.pool().read().await?; @@ -175,7 +175,7 @@ pub async fn serve_follow( let mut bk_prepped = conn.prepare_cached(&format!("SELECT actor_id, start_version, end_version, db_version, last_seq, ts FROM __corro_bookkeeping WHERE (db_version IS NOT NULL AND db_version > ?) OR (db_version IS NULL and ts > ?) {extra_where_clause} - ORDER BY db_version ASC"))?; + ORDER BY db_version ASC, ts ASC"))?; let map = |row: &Row| { Ok(( @@ -220,7 +220,7 @@ pub async fn serve_follow( let last_seq = last_seq.unwrap(); let db_version:CrsqlDbVersion = db_version.unwrap(); let mut prepped = conn.prepare_cached( - "SELECT \"table\", pk, cid, val, col_version, db_version, seq, site_id, cl FROM crsql_changes WHERE db_version = ? ORDER BY db_version ASC, seq ASC, ts ASC", + "SELECT \"table\", pk, cid, val, col_version, db_version, seq, site_id, cl FROM crsql_changes WHERE db_version = ? ORDER BY db_version ASC, seq ASC", )?; // implicit read transaction let rows = prepped.query_map([db_version], row_to_change)?;