Skip to content

Commit

Permalink
node: Set tx timestamp in add_tx
Browse files Browse the repository at this point in the history
  • Loading branch information
Goshawk committed Aug 20, 2024
1 parent 9688224 commit 57dc406
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 24 deletions.
9 changes: 2 additions & 7 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use node_data::message::payload::Vote;
use node_data::{Serializable, StepName};
use std::collections::BTreeMap;
use std::sync::{Arc, LazyLock};
use std::time::{self, Duration, UNIX_EPOCH};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -845,16 +845,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Delete any rocksdb record related to this block
t.delete_block(&b)?;

let now = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|n| n.as_secs())
.expect("valid timestamp");

// Attempt to resubmit transactions back to mempool.
// An error here is not considered critical.
// Txs timestamp is reset here
for tx in b.txs().iter() {
if let Err(e) = Mempool::add_tx(t, tx, now) {
if let Err(e) = Mempool::add_tx(t, tx) {
warn!("failed to resubmit transactions: {e}")
};
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub trait Candidate {

pub trait Mempool {
/// Adds a transaction to the mempool with a timestamp.
fn add_tx(&self, tx: &ledger::Transaction, timestamp: u64) -> Result<()>;
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()>;

/// Gets a transaction from the mempool.
fn get_tx(&self, tx_id: [u8; 32]) -> Result<Option<ledger::Transaction>>;
Expand Down
23 changes: 14 additions & 9 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::cell::RefCell;

use node_data::ledger::{self, Fault, Header, Label, SpentTransaction};
use node_data::Serializable;
use std::time::{self, UNIX_EPOCH};

use crate::database::Mempool;

Expand Down Expand Up @@ -679,7 +680,7 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {
}

impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
fn add_tx(&self, tx: &ledger::Transaction, timestamp: u64) -> Result<()> {
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()> {
// Map Hash to serialized transaction
let mut tx_data = vec![];
tx.write(&mut tx_data)?;
Expand All @@ -694,6 +695,11 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
self.put_cf(self.nullifiers_cf, key, hash)?;
}

let timestamp = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|n| n.as_secs())
.expect("valid timestamp");

let timestamp = timestamp.to_be_bytes();

// Map Fee_Hash to Timestamp
Expand Down Expand Up @@ -1207,7 +1213,7 @@ mod tests {
Backend::create_or_open(path, DatabaseOptions::default());
let t: ledger::Transaction = Faker.fake();

assert!(db.update(|txn| { txn.add_tx(&t, 0) }).is_ok());
assert!(db.update(|txn| { txn.add_tx(&t) }).is_ok());

db.view(|vq| {
assert!(Mempool::get_tx_exists(&vq, t.id()).unwrap());
Expand Down Expand Up @@ -1241,7 +1247,7 @@ mod tests {
db.update(|txn| {
for _i in 0..10u32 {
let t: ledger::Transaction = Faker.fake();
txn.add_tx(&t, 0)?;
txn.add_tx(&t)?;
}
Ok(())
})
Expand Down Expand Up @@ -1280,9 +1286,8 @@ mod tests {

db.update(|db| {
assert_eq!(db.txs_count(), 0);
txs.iter().for_each(|t| {
db.add_tx(&t, 0).expect("tx should be added")
});
txs.iter()
.for_each(|t| db.add_tx(&t).expect("tx should be added"));
Ok(())
})
.unwrap();
Expand Down Expand Up @@ -1319,7 +1324,7 @@ mod tests {
db.update(|txn| {
for i in 0..10u32 {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, 0)?;
txn.add_tx(&t)?;
}
Ok(())
})
Expand Down Expand Up @@ -1348,13 +1353,13 @@ mod tests {
let _ = db.update(|txn| {
(1..101).for_each(|i| {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, i).expect("tx should be added");
txn.add_tx(&t).expect("tx should be added");
expiry_list.insert(t.id());
});

(1000..1100).for_each(|i| {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, i).expect("tx should be added");
txn.add_tx(&t).expect("tx should be added");
});

Ok(())
Expand Down
10 changes: 3 additions & 7 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
_ = on_idle_event.tick() => {
info!(event = "mempool_idle", interval = ?idle_interval);

let dur = time::SystemTime::now()
let expiration_time = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("valid timestamp")
.checked_sub(mempool_expiry)
.expect("valid duration");

// Remove expired transactions from the mempool
db.read().await.update(|db| {
let expired_txs = db.get_expired_txs(dur.as_secs())?;
let expired_txs = db.get_expired_txs(expiration_time.as_secs())?;
for tx_id in expired_txs {
info!(event = "expired_tx", hash = hex::encode(tx_id));
if db.delete_tx(tx_id)? {
Expand Down Expand Up @@ -216,11 +216,7 @@ impl MempoolSrv {
events.push(TransactionEvent::Included(tx));
// Persist transaction in mempool storage

let now = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("valid timestamp");

db.add_tx(tx, now.as_secs())
db.add_tx(tx)
})?;

tracing::info!(
Expand Down

0 comments on commit 57dc406

Please sign in to comment.