From 474be6224e31e92137927d7efcc44b1e7dd07526 Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Sat, 2 Sep 2023 17:57:35 +0000 Subject: [PATCH] stream replication frames --- crates/replication/src/client.rs | 13 ++++++++++ crates/replication/src/lib.rs | 42 +++++++++++++++++++++++++++++--- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/crates/replication/src/client.rs b/crates/replication/src/client.rs index edc2245e68..51fa61ecb8 100644 --- a/crates/replication/src/client.rs +++ b/crates/replication/src/client.rs @@ -105,6 +105,19 @@ impl Client { Ok(frames) } + pub async fn log_entries( + &self, + next_offset: u64, + ) -> anyhow::Result + Unpin> { + let mut client = self.replication.clone(); + let frames = client + .log_entries(pb::LogOffset { next_offset }) + .await? + .into_inner(); + + Ok(frames) + } + pub async fn execute(&self, sql: &str) -> anyhow::Result<()> { let mut proxy = self.proxy.clone(); diff --git a/crates/replication/src/lib.rs b/crates/replication/src/lib.rs index 019bb1434a..4b6f7ff4bb 100644 --- a/crates/replication/src/lib.rs +++ b/crates/replication/src/lib.rs @@ -9,6 +9,7 @@ pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0"); pub type FrameNo = u64; use anyhow::Context; pub use frame::{Frame, FrameHeader}; +use futures::{Stream, StreamExt}; pub use replica::hook::{Frames, InjectorHookCtx}; use replica::snapshot::SnapshotFileHeader; pub use replica::snapshot::TempSnapshot; @@ -206,7 +207,7 @@ impl Replicator { pub async fn sync_from_http(&self) -> anyhow::Result { tracing::trace!("Syncing frames from HTTP"); - let frames = match self.fetch_log_entries(false).await { + let mut frames = match self.fetch_log_entries(false).await { Ok(frames) => Ok(frames), Err(e) => { if let Some(status) = e.downcast_ref::() { @@ -221,6 +222,38 @@ impl Replicator { } }?; + // Based on tonic's default message limit + const FLUSH_BUFFER_THRESHOLD: usize = 8 * 1024 * 1024; + + const FRAME_BUFFER_INITIAL_CAPACITY: usize = 64; + + let mut frame_buffer = Vec::with_capacity(FRAME_BUFFER_INITIAL_CAPACITY); + let mut buffer_size = 0; + let mut len = 0; + + while let Some(frame) = frames.next().await { + buffer_size += frame.as_slice().len(); + frame_buffer.push(frame); + + if buffer_size >= FLUSH_BUFFER_THRESHOLD { + let old_frame_buffer = core::mem::replace( + &mut frame_buffer, + Vec::with_capacity(FRAME_BUFFER_INITIAL_CAPACITY), + ); + buffer_size = 0; + + len += self.send_frames(old_frame_buffer).await?; + } + } + + if !frame_buffer.is_empty() { + len += self.send_frames(frame_buffer).await?; + } + + Ok(len) + } + + async fn send_frames(&self, frames: Vec) -> anyhow::Result { let len = frames.len(); self.next_offset.fetch_add(len as u64, Ordering::Relaxed); self.frames_sender.send(Frames::Vec(frames)).await?; @@ -228,7 +261,10 @@ impl Replicator { Ok(len) } - async fn fetch_log_entries(&self, send_hello: bool) -> anyhow::Result> { + async fn fetch_log_entries( + &self, + send_hello: bool, + ) -> anyhow::Result + Unpin> { let client = self .client .clone() @@ -240,7 +276,7 @@ impl Replicator { } client - .batch_log_entries(self.next_offset.load(Ordering::Relaxed)) + .log_entries(self.next_offset.load(Ordering::Relaxed)) .await } }