diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index f550edc5d..0cf722597 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -13,6 +13,13 @@ pub enum BrokerFacadeError { #[snafu(display("broker internal error"))] BrokerInternalError { source: BrokerError }, + #[snafu(display( + "expected first_index from claim to be {}, but got {}", + expected, + got + ))] + InvalidIndexes { expected: u128, got: u128 }, + #[snafu(display("failed to consume input event"))] ConsumeError { source: BrokerError }, @@ -101,24 +108,40 @@ impl BrokerFacade { "producing rollups claim" ); - let result = self + let last_claim_event = self .client .peek_latest(&self.claims_stream) .await .context(BrokerInternalSnafu)?; - let claim_produced = match result { + let should_enqueue_claim = match last_claim_event { Some(event) => { - tracing::trace!(?event, "got last claim produced"); - rollups_claim.epoch_index <= event.payload.epoch_index + let last_claim = event.payload; + tracing::trace!(?last_claim, "got last claim from Redis"); + let should_enqueue_claim = + rollups_claim.epoch_index > last_claim.epoch_index; + + // If this happens, then something is wrong with the dispatcher. + let invalid_indexes = + rollups_claim.first_index != last_claim.last_index + 1; + if should_enqueue_claim && invalid_indexes { + tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}", + rollups_claim.first_index, last_claim.last_index); + return Err(BrokerFacadeError::InvalidIndexes { + expected: last_claim.last_index + 1, + got: rollups_claim.first_index, + }); + }; + + should_enqueue_claim } None => { tracing::trace!("no claims in the stream"); - false + true } }; - if !claim_produced { + if should_enqueue_claim { self.client .produce(&self.claims_stream, rollups_claim) .await @@ -300,4 +323,78 @@ mod tests { vec![rollups_claim0, rollups_claim1] ); } + + #[test_log::test(tokio::test)] + async fn test_invalid_indexes_overlapping() { + let docker = Cli::default(); + let mut state = TestState::setup(&docker).await; + let rollups_claim1 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 0, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 0, + last_index: 6, + }; + let rollups_claim2 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 1, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 6, + last_index: 7, + }; + state + .fixture + .produce_rollups_claim(rollups_claim1.clone()) + .await; + let result = state + .facade + .produce_rollups_claim(rollups_claim2.clone()) + .await; + assert!(result.is_err()); + assert_eq!( + BrokerFacadeError::InvalidIndexes { + expected: 7, + got: 6 + } + .to_string(), + result.unwrap_err().to_string() + ) + } + + #[test_log::test(tokio::test)] + async fn test_invalid_indexes_nonsequential() { + let docker = Cli::default(); + let mut state = TestState::setup(&docker).await; + let rollups_claim1 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 0, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 0, + last_index: 6, + }; + let rollups_claim2 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 1, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 11, + last_index: 14, + }; + state + .fixture + .produce_rollups_claim(rollups_claim1.clone()) + .await; + let result = state + .facade + .produce_rollups_claim(rollups_claim2.clone()) + .await; + assert!(result.is_err()); + assert_eq!( + BrokerFacadeError::InvalidIndexes { + expected: 7, + got: 11 + } + .to_string(), + result.unwrap_err().to_string() + ) + } }