Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: validate transactions in parallel (no trie) #12654

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 44 additions & 27 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,17 @@ impl RuntimeAdapter for NightshadeRuntime {
}
}

let cost = match validate_transaction(
runtime_config,
gas_price,
transaction,
verify_signature,
current_protocol_version,
) {
Ok(cost) => cost,
Err(e) => return Ok(Some(e)),
};

if let Some(state_root) = state_root {
let shard_uid =
self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?;
Expand All @@ -576,7 +587,7 @@ impl RuntimeAdapter for NightshadeRuntime {
&mut state_update,
gas_price,
transaction,
verify_signature,
&cost,
// here we do not know which block the transaction will be included
// and therefore skip the check on the nonce upper bound.
None,
Expand All @@ -586,17 +597,8 @@ impl RuntimeAdapter for NightshadeRuntime {
Err(e) => Ok(Some(e)),
}
} else {
// Doing basic validation without a state root
match validate_transaction(
runtime_config,
gas_price,
transaction,
verify_signature,
current_protocol_version,
) {
Ok(_) => Ok(None),
Err(e) => Ok(Some(e)),
}
// Without a state root, verification is skipped
Ok(None)
}
}

Expand Down Expand Up @@ -772,27 +774,42 @@ impl RuntimeAdapter for NightshadeRuntime {
continue;
}

// Verifying the validity of the transaction based on the current state.
match verify_and_charge_transaction(
let res = validate_transaction(
runtime_config,
&mut state_update,
prev_block.next_gas_price,
&tx,
false,
Some(next_block_height),
true,
protocol_version,
) {
Ok(verification_result) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "including transaction that passed validation");
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_size += tx.get_size();
result.transactions.push(tx);
// Take one transaction from this group, no more.
break;
);
match res {
Ok(cost) => {
match verify_and_charge_transaction(
runtime_config,
&mut state_update,
prev_block.next_gas_price,
&tx,
&cost,
Some(next_block_height),
protocol_version,
) {
Ok(verification_result) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "including transaction that passed validation");
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_size += tx.get_size();
result.transactions.push(tx);
// Take one transaction from this group, no more.
break;
}
Err(err) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that failed verification");
rejected_invalid_tx += 1;
state_update.rollback();
}
}
}
Err(err) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that is invalid");
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that failed initial validation");
rejected_invalid_tx += 1;
state_update.rollback();
}
Expand Down
10 changes: 9 additions & 1 deletion runtime/runtime-params-estimator/src/estimator_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,20 @@ impl Testbed<'_> {
let verify_signature = true;

let clock = GasCost::measure(metric);
let cost = node_runtime::validate_transaction(
&self.apply_state.config,
gas_price,
tx,
verify_signature,
PROTOCOL_VERSION,
)
.expect("expected no validation error");
node_runtime::verify_and_charge_transaction(
&self.apply_state.config,
&mut state_update,
gas_price,
tx,
verify_signature,
&cost,
block_height,
PROTOCOL_VERSION,
)
Expand Down
56 changes: 54 additions & 2 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use crate::verifier::{
validate_transaction, verify_and_charge_transaction, ZERO_BALANCE_ACCOUNT_STORAGE_LIMIT,
};
use bandwidth_scheduler::{run_bandwidth_scheduler, BandwidthSchedulerOutput};
use config::total_prepaid_send_fees;
use config::{total_prepaid_send_fees, TransactionCost};
pub use congestion_control::bootstrap_congestion_info;
use congestion_control::ReceiptSink;
use itertools::Itertools;
Expand Down Expand Up @@ -70,6 +70,7 @@ use near_vm_runner::ContractCode;
use near_vm_runner::ContractRuntimeCache;
use near_vm_runner::ProfileDataV3;
use pipelining::ReceiptPreparationPipeline;
use rayon::prelude::*;
use std::cmp::max;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
Expand Down Expand Up @@ -288,6 +289,45 @@ impl Runtime {
debug!(target: "runtime", "{}", log_str);
}

/// Parallel validation for all transactions with early short-circuit on first error.
///
/// If all validations pass, returns a HashMap of tx_hash -> TransactionCost.
/// If any validation fails, returns the first InvalidTxError encountered.
fn parallel_validate_transactions(
config: &RuntimeConfig,
gas_price: Balance,
transactions: &[SignedTransaction],
current_protocol_version: ProtocolVersion,
) -> Result<HashMap<CryptoHash, TransactionCost>, InvalidTxError> {
tracing::debug!(target: "runtime", "parallel validation: starting threads");

let results = transactions
.par_iter()
.try_fold(
|| Vec::new(),
|mut acc, tx| {
let cost = validate_transaction(
config,
gas_price,
tx,
true,
current_protocol_version,
)?;
acc.push((tx.get_hash(), cost));
Ok::<_, InvalidTxError>(acc)
},
)
.try_reduce(
|| Vec::new(),
|mut acc1, mut acc2| {
acc1.append(&mut acc2);
Ok::<_, InvalidTxError>(acc1)
},
)?;

Ok(results.into_iter().collect())
}

/// Takes one signed transaction, verifies it and converts it to a receipt.
///
/// Add the produced receipt receipt either to the new local receipts if the signer is the same
Expand All @@ -311,6 +351,7 @@ impl Runtime {
state_update: &mut TrieUpdate,
apply_state: &ApplyState,
signed_transaction: &SignedTransaction,
transaction_cost: &TransactionCost,
stats: &mut ApplyStats,
) -> Result<(Receipt, ExecutionOutcomeWithId), InvalidTxError> {
let span = tracing::Span::current();
Expand All @@ -321,7 +362,7 @@ impl Runtime {
state_update,
apply_state.gas_price,
signed_transaction,
true,
transaction_cost,
Some(apply_state.block_height),
apply_state.current_protocol_version,
) {
Expand Down Expand Up @@ -1586,11 +1627,22 @@ impl Runtime {
let total = &mut processing_state.total;
let apply_state = &mut processing_state.apply_state;
let state_update = &mut processing_state.state_update;

let tx_costs = Self::parallel_validate_transactions(
&apply_state.config,
apply_state.gas_price,
processing_state.transactions,
apply_state.current_protocol_version,
)
.map_err(RuntimeError::from)?;

for signed_transaction in processing_state.transactions {
let cost = tx_costs.get(&signed_transaction.get_hash()).expect("Must have cost for tx");
let tx_result = self.process_transaction(
state_update,
apply_state,
signed_transaction,
cost,
&mut processing_state.stats,
);
let (receipt, outcome_with_id) = match tx_result {
Expand Down
Loading
Loading