Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.x] feat: add invalid-indexes error checking to the advance-runner #523

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 106 additions & 6 deletions offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ pub enum BrokerFacadeError {
#[snafu(display("broker internal error"))]
BrokerInternalError { source: BrokerError },

#[snafu(display(
"expected first_index from claim to be {}, but got {}",
expected,
got
))]
InvalidIndexes { expected: u128, got: u128 },

#[snafu(display("failed to consume input event"))]
ConsumeError { source: BrokerError },

Expand Down Expand Up @@ -101,24 +108,43 @@ impl BrokerFacade {
"producing rollups claim"
);

let result = self
let last_claim_event = self
.client
.peek_latest(&self.claims_stream)
.await
.context(BrokerInternalSnafu)?;

let claim_produced = match result {
let should_enqueue_claim = match last_claim_event {
Some(event) => {
tracing::trace!(?event, "got last claim produced");
rollups_claim.epoch_index <= event.payload.epoch_index
let last_claim = event.payload;
tracing::trace!(
?last_claim,
"got last claim from broker stream"
);
let should_enqueue_claim =
rollups_claim.epoch_index > last_claim.epoch_index;

// If this happens, then something is wrong with the dispatcher.
let invalid_indexes =
rollups_claim.first_index != last_claim.last_index + 1;
if should_enqueue_claim && invalid_indexes {
tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}",
rollups_claim.first_index, last_claim.last_index);
return Err(BrokerFacadeError::InvalidIndexes {
expected: last_claim.last_index + 1,
got: rollups_claim.first_index,
});
};

should_enqueue_claim
}
None => {
tracing::trace!("no claims in the stream");
false
true
}
};

if !claim_produced {
if should_enqueue_claim {
self.client
.produce(&self.claims_stream, rollups_claim)
.await
Expand Down Expand Up @@ -300,4 +326,78 @@ mod tests {
vec![rollups_claim0, rollups_claim1]
);
}

#[test_log::test(tokio::test)]
async fn test_invalid_indexes_overlapping() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim1 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 0,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 0,
last_index: 6,
};
let rollups_claim2 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 1,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 6,
last_index: 7,
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
expected: 7,
got: 6
}
.to_string(),
result.unwrap_err().to_string()
)
}

#[test_log::test(tokio::test)]
async fn test_invalid_indexes_nonsequential() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim1 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 0,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 0,
last_index: 6,
};
let rollups_claim2 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 1,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 11,
last_index: 14,
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
expected: 7,
got: 11
}
.to_string(),
result.unwrap_err().to_string()
)
}
}
4 changes: 2 additions & 2 deletions offchain/advance-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ impl Runner {
.produce_outputs(proofs)
.await
.context(ProduceOutputsSnafu)?;
tracing::trace!("produced outputs in broker");
tracing::trace!("produced outputs in broker stream");

self.broker
.produce_rollups_claim(rollups_claim)
.await
.context(ProduceClaimSnafu)?;
tracing::info!("produced epoch claim");
tracing::info!("produced epoch claim in broker stream");
}
Err(source) => {
if let ServerManagerError::EmptyEpochError { .. } = source {
Expand Down
6 changes: 4 additions & 2 deletions offchain/authority-claimer/src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,16 @@ impl DuplicateChecker for DefaultDuplicateChecker {
.flatten() // Back to only one Option
.map(|claim| claim.last_index + 1) // Maps to a number
.unwrap_or(0); // If None, unwrap to 0
tracing::debug!("checking duplicate claim: expected_first_index={}, rollups_claim={:?}",
expected_first_index, rollups_claim);
if rollups_claim.first_index == expected_first_index {
// This claim is the one the blockchain expects, so it is not considered duplicate.
// This claim is the one the blockchain expects, so it is not considered a duplicate.
Ok(false)
} else if rollups_claim.last_index < expected_first_index {
// This claim is already on the blockchain.
Ok(true)
} else {
// This claim is not on blockchain, but it isn't the one blockchain expects.
// This claim is not on the blockchain, but it isn't the one the blockchain expects.
// If this happens, there is a bug on the dispatcher.
Err(DuplicateCheckerError::ClaimMismatch {
expected_first_index,
Expand Down
14 changes: 7 additions & 7 deletions offchain/dispatcher/src/drivers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ impl Context {
broker: &impl BrokerSend,
) -> Result<(), BrokerFacadeError> {
let input_block_number = input.block_added.number.as_u64();
let input_epoch = self.calculate_epoch(input_block_number);
self.last_finished_epoch.map(|last_finished_epoch| {
// Asserting that the calculated epoch comes after the last finished epoch.
// (If last_finished_epoch == None then we don't need the assertion.)
assert!(input_epoch > last_finished_epoch)
});

GMKrieger marked this conversation as resolved.
Show resolved Hide resolved
self.finish_epoch_if_needed(input_block_number, broker)
.await?;

Expand All @@ -76,13 +83,6 @@ impl Context {
.inc();

self.inputs_sent += 1;

let input_epoch = self.calculate_epoch(input_block_number);
self.last_finished_epoch.map(|last_finished_epoch| {
// Asserting that the calculated epoch comes after the last finished epoch.
// (If last_finished_epoch == None then we don't need the assertion.)
assert!(input_epoch > last_finished_epoch)
});
self.last_input_epoch = Some(input_epoch);

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions offchain/dispatcher/src/drivers/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ impl MachineDriver {
}
};

let block = block.number.as_u64();
context.finish_epoch_if_needed(block, broker).await?;
let block_number = block.number.as_u64();
tracing::debug!("reacting to standalone block {}", block_number);
context.finish_epoch_if_needed(block_number, broker).await?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions offchain/dispatcher/src/machine/rollups_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ impl BrokerSend for BrokerFacade {
tracing::info!(?inputs_sent_count, "finishing epoch");

let mut broker = self.broker.lock().await;
let status = self.broker_status(&mut broker).await?;
let status = self.broker_status(&mut broker).await?; // Epoch number gets incremented here!

let event = build_next_finish_epoch(&status);
tracing::trace!(?event, "producing finish epoch event");
tracing::info!(?event, "producing finish epoch event");

epoch_sanity_check!(event, inputs_sent_count);

Expand Down
Loading