diff --git a/libsql-server/src/replication/primary/frame_stream.rs b/libsql-server/src/replication/primary/frame_stream.rs index 5bdf9af1ff..336eb1114b 100644 --- a/libsql-server/src/replication/primary/frame_stream.rs +++ b/libsql-server/src/replication/primary/frame_stream.rs @@ -23,6 +23,8 @@ pub struct FrameStream { max_frames: Option, /// a future that resolves when the logger was closed. logger_closed_fut: BoxFuture<'static, ()>, + /// whether a stream is in-between transactions (last frame ended a transaction) + transaction_boundary: bool, } impl FrameStream { @@ -47,6 +49,7 @@ impl FrameStream { produced_frames: 0, max_frames, logger_closed_fut, + transaction_boundary: false, }) } @@ -55,10 +58,16 @@ impl FrameStream { return; } if let Some(max_frames) = self.max_frames { - if self.produced_frames == max_frames { - tracing::debug!("Max number of frames reached ({max_frames}), closing stream"); - self.state = FrameStreamState::Closed; - return; + if self.produced_frames >= max_frames { + tracing::trace!( + "Max number of frames reached ({} >= {max_frames})", + self.produced_frames + ); + if self.transaction_boundary { + tracing::debug!("Closing stream"); + self.state = FrameStreamState::Closed; + return; + } } } @@ -117,6 +126,7 @@ impl Stream for FrameStream { Ok(frame) => { self.current_frame_no += 1; self.produced_frames += 1; + self.transaction_boundary = frame.header().size_after != 0; self.transition_state_next_frame(); tracing::trace!("sending frame_no {}", frame.header().frame_no); Poll::Ready(Some(Ok(frame)))