Skip to content

Commit

Permalink
replication: do not preempt streams at transaction boundary
Browse files Browse the repository at this point in the history
After this patch, we do not cut off streams early in the middle
of a transaction. And that's because we currently cannot assume
that our clients can apply partial transactions correctly.
  • Loading branch information
psarna committed Oct 24, 2023
1 parent b375291 commit fd95b12
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions libsql-server/src/replication/primary/frame_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct FrameStream {
max_frames: Option<usize>,
/// a future that resolves when the logger was closed.
logger_closed_fut: BoxFuture<'static, ()>,
/// whether a stream is in-between transactions (last frame ended a transaction)
transaction_boundary: bool,
}

impl FrameStream {
Expand All @@ -47,6 +49,7 @@ impl FrameStream {
produced_frames: 0,
max_frames,
logger_closed_fut,
transaction_boundary: false,
})
}

Expand All @@ -55,10 +58,16 @@ impl FrameStream {
return;
}
if let Some(max_frames) = self.max_frames {
if self.produced_frames == max_frames {
tracing::debug!("Max number of frames reached ({max_frames}), closing stream");
self.state = FrameStreamState::Closed;
return;
if self.produced_frames >= max_frames {
tracing::trace!(
"Max number of frames reached ({} >= {max_frames})",
self.produced_frames
);
if self.transaction_boundary {
tracing::debug!("Closing stream");
self.state = FrameStreamState::Closed;
return;
}
}
}

Expand Down Expand Up @@ -117,6 +126,7 @@ impl Stream for FrameStream {
Ok(frame) => {
self.current_frame_no += 1;
self.produced_frames += 1;
self.transaction_boundary = frame.header().size_after != 0;
self.transition_state_next_frame();
tracing::trace!("sending frame_no {}", frame.header().frame_no);
Poll::Ready(Some(Ok(frame)))
Expand Down

0 comments on commit fd95b12

Please sign in to comment.