Skip to content

Commit

Permalink
feat: don't add substates that are already being processed (#885)
Browse files Browse the repository at this point in the history
Description
---
Don't add transaction to a proposal if any of their substates is being
processed.

Motivation and Context
---
I left one TODO in the code. Because if we don't switch to the "no
version" transaction, them we can add aborts directly to the proposal.

How Has This Been Tested?
---
I tried 9 concurrent transaction 9 accounts to 1, and 1 accounts to 9.

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
Cifko authored Jan 12, 2024
1 parent 76bf409 commit 63a1563
Showing 1 changed file with 39 additions and 3 deletions.
42 changes: 39 additions & 3 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use tari_dan_storage::{
Block,
BlockId,
Command,
Decision,
Evidence,
ForeignProposal,
ForeignReceiveCounters,
ForeignSendCounters,
Expand Down Expand Up @@ -61,7 +63,7 @@ use tari_utilities::ByteArray;

use crate::{
error::SqliteStorageError,
serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex},
serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex},
sql_models,
sqlite_transaction::SqliteTransaction,
};
Expand Down Expand Up @@ -95,7 +97,7 @@ impl<'a, TAddr> SqliteStateStoreReadTransaction<'a, TAddr> {
}

impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned> SqliteStateStoreReadTransaction<'a, TAddr> {
pub(super) fn get_transaction_atom_state_updates_between_blocks<'i, ITx>(
pub fn get_transaction_atom_state_updates_between_blocks<'i, ITx>(
&mut self,
from_block_id: &BlockId,
to_block_id: &BlockId,
Expand Down Expand Up @@ -1181,14 +1183,48 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
updates.len()
);

let mut used_substates = HashSet::<ShardId>::new();
let mut processed_substates = HashMap::<TransactionId, HashSet<ShardId>>::new();
for (tx_id, update) in &updates {
if let Some(Decision::Abort) = update
.local_decision
.as_ref()
.map(|decision| parse_from_string(decision.as_str()))
.transpose()?
{
// The aborted transaction don't lock any substates
continue;
}
let evidence = deserialize_json::<Evidence>(&update.evidence)?;
let evidence = evidence.shards_iter().copied().collect::<HashSet<_>>();
processed_substates.insert(deserialize_hex_try_from(tx_id)?, evidence);
}

ready_txs
.into_iter()
.filter_map(|rec| {
let maybe_update = updates.remove(&rec.transaction_id);
match rec.try_convert(maybe_update) {
Ok(rec) => {
if rec.is_ready() {
Some(Ok(rec))
let tx_substates: HashSet<ShardId> = rec
.transaction()
.evidence
.shards_iter()
.copied()
.collect::<HashSet<_>>();
if tx_substates.is_disjoint(&used_substates) &&
processed_substates.iter().all(|(tx_id, substates)| {
tx_id == rec.transaction_id() || tx_substates.is_disjoint(substates)
})
{
used_substates.extend(tx_substates);
Some(Ok(rec))
} else {
// TODO: If we don't switch to "no version" transaction, then we can abort these here.
// That also requires changes to the on_ready_to_vote_on_local_block
None
}
} else {
None
}
Expand Down

0 comments on commit 63a1563

Please sign in to comment.