Skip to content

Commit

Permalink
feat(advance-runner): detect invalid claim indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 authored and GMKrieger committed Aug 15, 2024
1 parent 0afc9d8 commit e0af853
Showing 1 changed file with 103 additions and 6 deletions.
109 changes: 103 additions & 6 deletions offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ pub enum BrokerFacadeError {
#[snafu(display("broker internal error"))]
BrokerInternalError { source: BrokerError },

#[snafu(display(
"expected first_index from claim to be {}, but got {}",
expected,
got
))]
InvalidIndexes { expected: u128, got: u128 },

#[snafu(display("failed to consume input event"))]
ConsumeError { source: BrokerError },

Expand Down Expand Up @@ -101,24 +108,40 @@ impl BrokerFacade {
"producing rollups claim"
);

let result = self
let last_claim_event = self
.client
.peek_latest(&self.claims_stream)
.await
.context(BrokerInternalSnafu)?;

let claim_produced = match result {
let should_enqueue_claim = match last_claim_event {
Some(event) => {
tracing::trace!(?event, "got last claim produced");
rollups_claim.epoch_index <= event.payload.epoch_index
let last_claim = event.payload;
tracing::trace!(?last_claim, "got last claim from Redis");
let should_enqueue_claim =
rollups_claim.epoch_index > last_claim.epoch_index;

// If this happens, then something is wrong with the dispatcher.
let invalid_indexes =
rollups_claim.first_index != last_claim.last_index + 1;
if should_enqueue_claim && invalid_indexes {
tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}",
rollups_claim.first_index, last_claim.last_index);
return Err(BrokerFacadeError::InvalidIndexes {
expected: last_claim.last_index + 1,
got: rollups_claim.first_index,
});
};

should_enqueue_claim
}
None => {
tracing::trace!("no claims in the stream");
false
true
}
};

if !claim_produced {
if should_enqueue_claim {
self.client
.produce(&self.claims_stream, rollups_claim)
.await
Expand Down Expand Up @@ -300,4 +323,78 @@ mod tests {
vec![rollups_claim0, rollups_claim1]
);
}

#[test_log::test(tokio::test)]
async fn test_invalid_indexes_overlapping() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim1 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 0,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 0,
last_index: 6,
};
let rollups_claim2 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 1,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 6,
last_index: 7,
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
expected: 7,
got: 6
}
.to_string(),
result.unwrap_err().to_string()
)
}

#[test_log::test(tokio::test)]
async fn test_invalid_indexes_nonsequential() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim1 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 0,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 0,
last_index: 6,
};
let rollups_claim2 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 1,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 11,
last_index: 14,
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
expected: 7,
got: 11
}
.to_string(),
result.unwrap_err().to_string()
)
}
}

0 comments on commit e0af853

Please sign in to comment.