Skip to content

Commit

Permalink
Remove Record enum in favor of simple tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 27, 2024
1 parent 84bab4f commit 2539aa3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 83 deletions.
146 changes: 66 additions & 80 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use restate_types::net::partition_processor::{
InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcRequest,
PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse,
};
use restate_types::storage::StorageDecodeError;
use restate_types::time::MillisSinceEpoch;
use restate_wal_protocol::control::AnnounceLeader;
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};
Expand Down Expand Up @@ -256,6 +257,7 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
pub enum ProcessorError {
TrimGapEncountered { gap_to_lsn: Lsn },
Storage(#[from] StorageError),
Decode(#[from] StorageDecodeError),
Bifrost(#[from] restate_bifrost::Error),
StateMachine(#[from] state_machine::Error),
ActionEffect(#[from] leadership::Error),
Expand All @@ -264,10 +266,7 @@ pub enum ProcessorError {
Other(#[from] anyhow::Error),
}

enum Record {
Envelope(Lsn, Arc<Envelope>),
TrimGap(Lsn),
}
type LsnEnvelope = (Lsn, Arc<Envelope>);

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
where
Expand Down Expand Up @@ -374,7 +373,7 @@ where

// Start reading after the last applied lsn
let key_query = KeyFilter::Within(self.partition_key_range.clone());
let mut log_reader = self
let mut record_stream = self
.bifrost
.create_reader(
LogId::from(self.partition_id),
Expand All @@ -383,33 +382,32 @@ where
Lsn::MAX,
)
.map_err(ProcessorError::from)?
.map_ok(|entry| {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
if entry.is_data_record() {
entry
.try_decode_arc::<Envelope>()
.map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?)))
.expect("data record is present")
} else {
// read_commands will translate trim gaps to errors
anyhow::Ok(Record::TrimGap(
.map(|entry| match entry {
Ok(entry) => {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
if entry.is_data_record() {
entry
.trim_gap_to_sequence_number()
.expect("trim gap has to-LSN"),
))
.try_decode_arc::<Envelope>()
.map(|envelope| Ok((lsn, envelope.map_err(ProcessorError::from)?)))
.expect("data record is present")
} else {
Err(ProcessorError::TrimGapEncountered {
gap_to_lsn: entry
.trim_gap_to_sequence_number()
.expect("trim gap has to-LSN"),
})
}
}
Err(err) => Err(ProcessorError::from(err)),
})
.try_take_while(|entry| {
.try_take_while(|(_, envelope)| {
// a catch-all safety net if all lower layers didn't filter this record out. This
// could happen for old records that didn't store `Keys` in the log store.
//
// At some point, we should remove this and trust that stored records have Keys
// stored correctly.
std::future::ready(Ok(entry.as_ref().is_ok_and(|r| match r {
Record::Envelope(_, e) => e.matches_key_query(&key_query),
Record::TrimGap(_) => true,
})))
std::future::ready(Ok(envelope.matches_key_query(&key_query)))
});

// avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis.
Expand Down Expand Up @@ -446,7 +444,7 @@ where
old.updated_at = MillisSinceEpoch::now();
});
}
operation = Self::read_commands(&mut log_reader, self.max_command_batch_size, &mut command_buffer) => {
operation = Self::read_commands(&mut record_stream, self.max_command_batch_size, &mut command_buffer) => {
// check that reading has succeeded
operation?;

Expand All @@ -457,56 +455,49 @@ where
// clear buffers used when applying the next record
action_collector.clear();

for record in command_buffer.drain(..) {
match record {
Record::Envelope(lsn, envelope) => {
let command_start = Instant::now();

trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header);

let leadership_change = self.apply_record(
lsn,
envelope,
&mut transaction,
&mut action_collector).await.map_err(ProcessorError::from)?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await.map_err(ProcessorError::from)?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
action_collector.clear();

self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch);
if header.source.is_processor_generational() {
let Source::Processor { generational_node_id, .. } = header.source else {
unreachable!("processor source must have generational_node_id");
};
// all new AnnounceLeader messages should come from a PartitionProcessor
self.status.last_observed_leader_node = generational_node_id;
} else if announce_leader.node_id.is_some() {
// older AnnounceLeader messages have the announce_leader.node_id set
self.status.last_observed_leader_node = announce_leader.node_id;
}

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?;
for (lsn, envelope) in command_buffer.drain(..) {
let command_start = Instant::now();

trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header);

let leadership_change = self.apply_record(
lsn,
envelope,
&mut transaction,
&mut action_collector).await.map_err(ProcessorError::from)?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await.map_err(ProcessorError::from)?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
action_collector.clear();

self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch);
if header.source.is_processor_generational() {
let Source::Processor { generational_node_id, .. } = header.source else {
unreachable!("processor source must have generational_node_id");
};
// all new AnnounceLeader messages should come from a PartitionProcessor
self.status.last_observed_leader_node = generational_node_id;
} else if announce_leader.node_id.is_some() {
// older AnnounceLeader messages have the announce_leader.node_id set
self.status.last_observed_leader_node = announce_leader.node_id;
}

Span::current().record("is_leader", is_leader);
let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?;

if is_leader {
self.status.effective_mode = RunMode::Leader;
}
Span::current().record("is_leader", is_leader);

transaction = partition_store.transaction();
}
}
Record::TrimGap(gap_to_lsn) => {
return Err(ProcessorError::TrimGapEncountered { gap_to_lsn })
if is_leader {
self.status.effective_mode = RunMode::Leader;
}

transaction = partition_store.transaction();
}
}

Expand Down Expand Up @@ -864,10 +855,10 @@ where
async fn read_commands<S>(
log_reader: &mut S,
max_batching_size: usize,
record_buffer: &mut Vec<Record>,
record_buffer: &mut Vec<LsnEnvelope>,
) -> Result<(), ProcessorError>
where
S: Stream<Item = Result<anyhow::Result<Record>, restate_bifrost::Error>> + Unpin,
S: Stream<Item = Result<LsnEnvelope, ProcessorError>> + Unpin,
{
// beyond this point we must not await; otherwise we are no longer cancellation safe
let first_record = log_reader.next().await;
Expand All @@ -877,20 +868,15 @@ where
};

record_buffer.clear();
record_buffer.push(first_record??);
record_buffer.push(first_record?);

while record_buffer.len() < max_batching_size {
// read more message from the stream but only if they are immediately available
if let Some(record) = log_reader.next().now_or_never() {
let Some(record) = record else {
return Err(ProcessorError::LogReadStreamTerminated);
};
match record?? {
Record::TrimGap(gap_to_lsn) => {
return Err(ProcessorError::TrimGapEncountered { gap_to_lsn });
}
record => record_buffer.push(record),
}
record_buffer.push(record?);
} else {
// no more immediately available records found
break;
Expand Down
4 changes: 1 addition & 3 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,7 @@ impl PartitionProcessorManager {
.remove(processor.as_ref().expect("must be some").key_range());

match result {
Err(ProcessorError::TrimGapEncountered {
gap_to_lsn: to_lsn,
}) => {
Err(ProcessorError::TrimGapEncountered { gap_to_lsn: to_lsn }) => {
if self.snapshot_repository.is_some() {
info!(
trim_gap_to_lsn = ?to_lsn,
Expand Down

0 comments on commit 2539aa3

Please sign in to comment.