Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 21, 2024
1 parent 9bb4b91 commit 18affcc
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ impl InstanceManager {

let mut allocated_ports = ports.unwrap_or_else(|| self.port_allocator.create());

let base_path = self
.base_path
.join("processes")
.join(format!("{instance_id}-{instance_type}"));
let base_path = self.base_path.join("processes").join(&instance_name);
fs::create_dir_all(&base_path).await?;

let context = ProcessContext::new(
Expand Down
3 changes: 1 addition & 2 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ pub async fn spawn_services(
handles.push(join_handle);

info!(target: LOG_TARGET, "Message logging initializing");
// Spawn messaging
let message_logger = SqliteMessageLogger::new(config.validator_node.data_dir.join("message_log.sqlite"));

info!(target: LOG_TARGET, "State store initializing");
// Connect to shard db
Expand Down Expand Up @@ -230,6 +228,7 @@ pub async fn spawn_services(
};

// Messaging
let message_logger = SqliteMessageLogger::new(config.validator_node.data_dir.join("message_log.sqlite"));
let local_address = PeerAddress::from(keypair.public_key().clone());
let (loopback_sender, loopback_receiver) = mpsc::unbounded_channel();
let inbound_messaging = ConsensusInboundMessaging::new(
Expand Down
27 changes: 17 additions & 10 deletions dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
let has_local_only_rules = existing.is_local_only() && is_local_only;
let same_transaction = existing.transaction_id() == transaction_id;

// Duplicate lock requests on the same transaction are idempotent
if same_transaction {
return Ok(());
}

match existing.substate_lock() {
// If a substate is already locked as READ:
// - it MAY be locked as READ
Expand All @@ -214,10 +219,10 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
// - it MAY be locked as requested.
SubstateLockType::Read => {
// Cannot write to or create an output for a substate that is already read locked
if !same_transaction && !has_local_only_rules && !requested_lock_type.is_read() {
if !has_local_only_rules && !requested_lock_type.is_read() {
warn!(
target: LOG_TARGET,
"⚠️ Lock conflict: [{}] Read lock(local={}) is present. Requested lock is {}(local={})",
"⚠️ Lock conflict: [{}] Read lock(local_only={}) is present. Requested lock is {}(local_only={})",
requested_lock.versioned_substate_id(),
existing.is_local_only(),
requested_lock_type,
Expand Down Expand Up @@ -246,11 +251,11 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
// - if Same-Transaction OR Local-Only-Rules:
// - it MAY be locked as OUTPUT
SubstateLockType::Write => {
// Cannot lock a non-local WRITE locked substate
if !has_local_only_rules && !same_transaction {
// Cannot lock a non-local_only WRITE locked substate
if !has_local_only_rules {
warn!(
target: LOG_TARGET,
"⚠️ Lock conflict: [{}] Write lock(local={}) is present. Requested lock is {}(local={})",
"⚠️ Lock conflict: [{}] Write lock(local_only={}) is present. Requested lock is {}(local_only={})",
requested_lock.versioned_substate_id(),
existing.is_local_only(),
requested_lock_type,
Expand All @@ -266,7 +271,7 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
if !requested_lock_type.is_output() {
warn!(
target: LOG_TARGET,
"⚠️ Lock conflict: [{}] Write lock(local={}) is present. Requested lock is {}(local={})",
"⚠️ Lock conflict: [{}] Write lock(local_only={}) is present. Requested lock is {}(local_only={})",
requested_lock.versioned_substate_id(),
existing.is_local_only(),
requested_lock_type,
Expand Down Expand Up @@ -295,10 +300,11 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
// - it MAY be locked as WRITE or READ
// - it MUST NOT be locked as OUTPUT
SubstateLockType::Output => {
if !same_transaction && !has_local_only_rules {
if !has_local_only_rules {
warn!(
target: LOG_TARGET,
"⚠️ Lock conflict: [{}] Output lock(local={}) is present. Requested lock is {}(local={})",
"⚠️ Lock conflict: [{}, {}] Output lock(local_only={}) is present. Requested lock is {}(local_only={})",
transaction_id,
requested_lock.versioned_substate_id(),
existing.is_local_only(),
requested_lock_type,
Expand All @@ -311,10 +317,11 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
});
}

if !same_transaction && requested_lock_type.is_output() {
if requested_lock_type.is_output() {
warn!(
target: LOG_TARGET,
"⚠️ Lock conflict: [{}] Output lock(local={}) is present. Requested lock is Output(local={})",
"⚠️ Lock conflict: [{}, {}] Output lock(local_only={}) is present. Requested lock is Output(local_only={})",
transaction_id,
requested_lock.versioned_substate_id(),
existing.is_local_only(),
is_local_only
Expand Down
10 changes: 9 additions & 1 deletion dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,21 @@ async fn multishard_local_inputs_and_outputs_foreign_outputs() {
async fn multishard_output_conflict_abort() {
setup_logger();
let mut test = Test::builder()
.debug_sql("/tmp/test{}.db")
.add_committee(0, vec!["1", "2"])
.add_committee(1, vec!["3", "4"])
.start()
.await;

let tx1 = test.build_transaction(Decision::Commit, 1, 5, 2);
let resulting_outputs = tx1.resulting_outputs().unwrap().to_vec();
let resulting_outputs = tx1
.resulting_outputs()
.unwrap()
.to_vec()
.into_iter()
// Dont use the transaction receipt as an output in tx2
.filter(|s| !s.substate_id().is_transaction_receipt())
.collect();
test.send_transaction_to_destination(TestVnDestination::All, tx1.clone())
.await;

Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/substate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn it_disallows_more_than_one_write_lock_non_local_only() {
}

#[test]
fn it_allows_locks_within_one_transaction() {
fn it_allows_requesting_the_same_lock_within_one_transaction() {
let store = create_store();

let id = add_substate(&store, 0, 0);
Expand Down Expand Up @@ -192,7 +192,7 @@ fn it_allows_locks_within_one_transaction() {
.unwrap();

let n = store.new_locks().get(id.substate_id()).unwrap().len();
assert_eq!(n, 2);
assert_eq!(n, 1);
}

fn add_substate(store: &TestStore, seed: u8, version: u32) -> VersionedSubstateId {
Expand Down
8 changes: 4 additions & 4 deletions dan_layer/consensus_tests/src/support/executions_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::{
sync::{Arc, RwLock},
};

use tari_dan_storage::consensus_models::BlockTransactionExecution;
use tari_dan_storage::consensus_models::TransactionExecution;
use tari_transaction::TransactionId;

type TestExecutionStore = HashMap<TransactionId, BlockTransactionExecution>;
type TestExecutionStore = HashMap<TransactionId, TransactionExecution>;

#[derive(Debug, Clone, Default)]
pub struct TestTransactionExecutionsStore {
Expand All @@ -23,15 +23,15 @@ impl TestTransactionExecutionsStore {
}
}

pub fn insert(&self, execution: BlockTransactionExecution) -> &Self {
pub fn insert(&self, execution: TransactionExecution) -> &Self {
self.transactions
.write()
.unwrap()
.insert(execution.transaction_id, execution);
self
}

pub fn get(&self, transaction_id: &TransactionId) -> Option<BlockTransactionExecution> {
pub fn get(&self, transaction_id: &TransactionId) -> Option<TransactionExecution> {
self.transactions.read().unwrap().get(transaction_id).cloned()
}
}
1 change: 1 addition & 0 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor
use crate::schema::transaction_pool;

let ready_txs = transaction_pool::table
// Important: Order by transaction_id to ensure deterministic ordering
.order_by(transaction_pool::transaction_id.asc())
.get_results::<sql_models::TransactionPoolRecord>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
Expand Down

0 comments on commit 18affcc

Please sign in to comment.