diff --git a/libsql/Cargo.toml b/libsql/Cargo.toml index c336f54a4b..9661ef2566 100644 --- a/libsql/Cargo.toml +++ b/libsql/Cargo.toml @@ -42,7 +42,7 @@ fallible-iterator = { version = "0.3", optional = true } libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true } async-stream = { version = "0.3.5", optional = true } -reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls" ], optional = true } +reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls", "json" ], optional = true } [dev-dependencies] criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] } diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 7f22914520..f70debf3b7 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -397,8 +397,16 @@ impl Database { let start_frame_no = sync_ctx.durable_frame_num + 1; let end_frame_no = max_frame_no; - for frame_no in start_frame_no..end_frame_no+1 { - self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?; + let mut frame_no = start_frame_no; + while frame_no <= end_frame_no { + // The server returns its maximum frame number. To avoid resending + // frames the server already knows about, we need to update the + // frame number to the one returned by the server. + let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?; + if max_frame_no > frame_no { + frame_no = max_frame_no; + } + frame_no += 1; } let frame_count = end_frame_no - start_frame_no + 1; @@ -409,7 +417,7 @@ impl Database { } #[cfg(feature = "sync")] - async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<()> { + async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result { let frame_size: usize = 24+page_size as usize; let frame = vec![0; frame_size]; let rc = unsafe { @@ -419,12 +427,12 @@ impl Database { return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no))); } let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1); - self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?; - Ok(()) + let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?; + Ok(max_frame_no) } #[cfg(feature = "sync")] - async fn push_with_retry(&self, uri: String, auth_token: &Option, frame: Vec, max_retries: usize) -> Result<()> { + async fn push_with_retry(&self, uri: String, auth_token: &Option, frame: Vec, max_retries: usize) -> Result { let mut nr_retries = 0; loop { let client = reqwest::Client::new(); @@ -437,7 +445,9 @@ impl Database { } let res = builder.body(frame.to_vec()).send().await.unwrap(); if res.status().is_success() { - return Ok(()); + let resp = res.json::().await.unwrap(); + let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap(); + return Ok(max_frame_no as u32); } if nr_retries > max_retries { return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));