Skip to content

Commit

Permalink
Merge pull request #505 from psarna/preemptboundary
Browse files Browse the repository at this point in the history
replication: do not preempt streams at transaction boundary
  • Loading branch information
psarna authored Oct 24, 2023
2 parents e15da4e + 3c29e80 commit af74c27
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions libsql-server/src/replication/primary/frame_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct FrameStream {
max_frames: Option<usize>,
/// a future that resolves when the logger was closed.
logger_closed_fut: BoxFuture<'static, ()>,
/// whether a stream is in-between transactions (last frame ended a transaction)
transaction_boundary: bool,
}

impl FrameStream {
Expand All @@ -47,6 +49,7 @@ impl FrameStream {
produced_frames: 0,
max_frames,
logger_closed_fut,
transaction_boundary: false,
})
}

Expand All @@ -55,10 +58,16 @@ impl FrameStream {
return;
}
if let Some(max_frames) = self.max_frames {
if self.produced_frames == max_frames {
tracing::debug!("Max number of frames reached ({max_frames}), closing stream");
self.state = FrameStreamState::Closed;
return;
if self.produced_frames >= max_frames {
tracing::trace!(
"Max number of frames reached ({} >= {max_frames})",
self.produced_frames
);
if self.transaction_boundary {
tracing::debug!("Closing stream");
self.state = FrameStreamState::Closed;
return;
}
}
}

Expand Down Expand Up @@ -117,6 +126,7 @@ impl Stream for FrameStream {
Ok(frame) => {
self.current_frame_no += 1;
self.produced_frames += 1;
self.transaction_boundary = frame.header().size_after != 0;
self.transition_state_next_frame();
tracing::trace!("sending frame_no {}", frame.header().frame_no);
Poll::Ready(Some(Ok(frame)))
Expand Down

0 comments on commit af74c27

Please sign in to comment.