diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 4d78c6b25..5046b1dd9 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -60,6 +60,7 @@ use restate_types::net::partition_processor::{ InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; +use restate_types::storage::StorageDecodeError; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; @@ -256,6 +257,7 @@ pub struct PartitionProcessor { pub enum ProcessorError { TrimGapEncountered { gap_to_lsn: Lsn }, Storage(#[from] StorageError), + Decode(#[from] StorageDecodeError), Bifrost(#[from] restate_bifrost::Error), StateMachine(#[from] state_machine::Error), ActionEffect(#[from] leadership::Error), @@ -264,10 +266,7 @@ pub enum ProcessorError { Other(#[from] anyhow::Error), } -enum Record { - Envelope(Lsn, Arc), - TrimGap(Lsn), -} +type LsnEnvelope = (Lsn, Arc); impl PartitionProcessor where @@ -374,7 +373,7 @@ where // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); - let mut log_reader = self + let mut record_stream = self .bifrost .create_reader( LogId::from(self.partition_id), @@ -383,33 +382,32 @@ where Lsn::MAX, ) .map_err(ProcessorError::from)? - .map_ok(|entry| { - trace!(?entry, "Read entry"); - let lsn = entry.sequence_number(); - if entry.is_data_record() { - entry - .try_decode_arc::() - .map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?))) - .expect("data record is present") - } else { - // read_commands will translate trim gaps to errors - anyhow::Ok(Record::TrimGap( + .map(|entry| match entry { + Ok(entry) => { + trace!(?entry, "Read entry"); + let lsn = entry.sequence_number(); + if entry.is_data_record() { entry - .trim_gap_to_sequence_number() - .expect("trim gap has to-LSN"), - )) + .try_decode_arc::() + .map(|envelope| Ok((lsn, envelope.map_err(ProcessorError::from)?))) + .expect("data record is present") + } else { + Err(ProcessorError::TrimGapEncountered { + gap_to_lsn: entry + .trim_gap_to_sequence_number() + .expect("trim gap has to-LSN"), + }) + } } + Err(err) => Err(ProcessorError::from(err)), }) - .try_take_while(|entry| { + .try_take_while(|(_, envelope)| { // a catch-all safety net if all lower layers didn't filter this record out. This // could happen for old records that didn't store `Keys` in the log store. // // At some point, we should remove this and trust that stored records have Keys // stored correctly. - std::future::ready(Ok(entry.as_ref().is_ok_and(|r| match r { - Record::Envelope(_, e) => e.matches_key_query(&key_query), - Record::TrimGap(_) => true, - }))) + std::future::ready(Ok(envelope.matches_key_query(&key_query))) }); // avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis. @@ -446,7 +444,7 @@ where old.updated_at = MillisSinceEpoch::now(); }); } - operation = Self::read_commands(&mut log_reader, self.max_command_batch_size, &mut command_buffer) => { + operation = Self::read_commands(&mut record_stream, self.max_command_batch_size, &mut command_buffer) => { // check that reading has succeeded operation?; @@ -457,56 +455,49 @@ where // clear buffers used when applying the next record action_collector.clear(); - for record in command_buffer.drain(..) { - match record { - Record::Envelope(lsn, envelope) => { - let command_start = Instant::now(); - - trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); - - let leadership_change = self.apply_record( - lsn, - envelope, - &mut transaction, - &mut action_collector).await.map_err(ProcessorError::from)?; - - apply_command_latency.record(command_start.elapsed()); - - if let Some((header, announce_leader)) = leadership_change { - // commit all changes so far, this is important so that the actuators see all changes - // when becoming leader. - transaction.commit().await.map_err(ProcessorError::from)?; - - // We can ignore all actions collected so far because as a new leader we have to instruct the - // actuators afresh. - action_collector.clear(); - - self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); - if header.source.is_processor_generational() { - let Source::Processor { generational_node_id, .. } = header.source else { - unreachable!("processor source must have generational_node_id"); - }; - // all new AnnounceLeader messages should come from a PartitionProcessor - self.status.last_observed_leader_node = generational_node_id; - } else if announce_leader.node_id.is_some() { - // older AnnounceLeader messages have the announce_leader.node_id set - self.status.last_observed_leader_node = announce_leader.node_id; - } - - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; + for (lsn, envelope) in command_buffer.drain(..) { + let command_start = Instant::now(); + + trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); + + let leadership_change = self.apply_record( + lsn, + envelope, + &mut transaction, + &mut action_collector).await.map_err(ProcessorError::from)?; + + apply_command_latency.record(command_start.elapsed()); + + if let Some((header, announce_leader)) = leadership_change { + // commit all changes so far, this is important so that the actuators see all changes + // when becoming leader. + transaction.commit().await.map_err(ProcessorError::from)?; + + // We can ignore all actions collected so far because as a new leader we have to instruct the + // actuators afresh. + action_collector.clear(); + + self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); + if header.source.is_processor_generational() { + let Source::Processor { generational_node_id, .. } = header.source else { + unreachable!("processor source must have generational_node_id"); + }; + // all new AnnounceLeader messages should come from a PartitionProcessor + self.status.last_observed_leader_node = generational_node_id; + } else if announce_leader.node_id.is_some() { + // older AnnounceLeader messages have the announce_leader.node_id set + self.status.last_observed_leader_node = announce_leader.node_id; + } - Span::current().record("is_leader", is_leader); + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; - if is_leader { - self.status.effective_mode = RunMode::Leader; - } + Span::current().record("is_leader", is_leader); - transaction = partition_store.transaction(); - } - } - Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) + if is_leader { + self.status.effective_mode = RunMode::Leader; } + + transaction = partition_store.transaction(); } } @@ -864,10 +855,10 @@ where async fn read_commands( log_reader: &mut S, max_batching_size: usize, - record_buffer: &mut Vec, + record_buffer: &mut Vec, ) -> Result<(), ProcessorError> where - S: Stream, restate_bifrost::Error>> + Unpin, + S: Stream> + Unpin, { // beyond this point we must not await; otherwise we are no longer cancellation safe let first_record = log_reader.next().await; @@ -877,7 +868,7 @@ where }; record_buffer.clear(); - record_buffer.push(first_record??); + record_buffer.push(first_record?); while record_buffer.len() < max_batching_size { // read more message from the stream but only if they are immediately available @@ -885,12 +876,7 @@ where let Some(record) = record else { return Err(ProcessorError::LogReadStreamTerminated); }; - match record?? { - Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }); - } - record => record_buffer.push(record), - } + record_buffer.push(record?); } else { // no more immediately available records found break; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index bd4177a73..f81e055e6 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -433,9 +433,7 @@ impl PartitionProcessorManager { .remove(processor.as_ref().expect("must be some").key_range()); match result { - Err(ProcessorError::TrimGapEncountered { - gap_to_lsn: to_lsn, - }) => { + Err(ProcessorError::TrimGapEncountered { gap_to_lsn: to_lsn }) => { if self.snapshot_repository.is_some() { info!( trim_gap_to_lsn = ?to_lsn,