Skip to content

Commit

Permalink
Merge pull request #9 from worldcoin/0xkitsune/fetch-missing-txs
Browse files Browse the repository at this point in the history
Feat: Fetch missing txs
  • Loading branch information
0xKitsune authored Dec 14, 2023
2 parents dc962f0 + 9cb1e3e commit 59ea3c4
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
target/
.env
.env
92 changes: 46 additions & 46 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,50 @@ default-run = "tx-sitter"
members = ["crates/*"]

[dependencies]
# Third Party
async-trait = "0.1.74"

## AWS
aws-config = { version = "1.0.1" }
aws-sdk-kms = "1.3.0"
aws-smithy-types = "1.0.2"
aws-smithy-runtime-api = "1.0.2"
aws-types = "1.0.1"
aws-credential-types = { version = "1.0.1", features = [
"hardcoded-credentials",
] }

## Other
serde = "1.0.136"
aws-sdk-kms = "1.3.0"
aws-smithy-runtime-api = "1.0.2"
aws-smithy-types = "1.0.2"
aws-types = "1.0.1"
axum = { version = "0.6.20", features = ["headers"] }
thiserror = "1.0.50"
headers = "0.3.9"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.27"
dotenv = "0.15.0"
base64 = "0.21.5"
bigdecimal = "0.4.2"
chrono = "0.4"
clap = { version = "4.3.0", features = ["env", "derive"] }
config = "0.13.3"
dotenv = "0.15.0"
ethers = { version = "2.0.11", features = ["ws"] }
eyre = "0.6.5"
futures = "0.3"
headers = "0.3.9"
hex = "0.4.3"
hex-literal = "0.4.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.27"
itertools = "0.12.0"
metrics = "0.21.1"
num-bigint = "0.4.4"
# telemetry-batteries = { path = "../telemetry-batteries" }

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
rand = "0.8.5"
reqwest = { version = "0.11.13", default-features = false, features = [
"rustls-tls",
] }

## Other
serde = "1.0.136"
serde_json = "1.0.91"
strum = { version = "0.25.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"std",
"fmt",
"json",
"ansi",
] }
tower-http = { version = "0.4.4", features = [ "trace", "auth" ] }
uuid = { version = "0.8", features = ["v4"] }
futures = "0.3"
chrono = "0.4"
rand = "0.8.5"
sha3 = "0.10.8"
config = "0.13.3"
toml = "0.8.8"
url = "2.4.1"
spki = "0.7.2"
sqlx = { version = "0.7.2", features = [
"time",
"chrono",
Expand All @@ -65,26 +61,30 @@ sqlx = { version = "0.7.2", features = [
"migrate",
"bigdecimal",
] }
metrics = "0.21.1"
num-bigint = "0.4.4"
bigdecimal = "0.4.2"
spki = "0.7.2"
async-trait = "0.1.74"
itertools = "0.12.0"
base64 = "0.21.5"
strum = { version = "0.25.0", features = ["derive"] }

# Company
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", branch = "dzejkop/unnest-fields" }
# telemetry-batteries = { path = "../telemetry-batteries" }

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
thiserror = "1.0.50"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
toml = "0.8.8"
tower-http = { version = "0.4.4", features = ["trace", "auth"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"std",
"fmt",
"json",
"ansi",
] }
url = "2.4.1"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
test-case = "3.1.0"
indoc = "2.0.3"
fake-rpc = { path = "crates/fake-rpc" }
indoc = "2.0.3"
test-case = "3.1.0"

[features]
default = [ "default-config" ]
default = ["default-config"]
default-config = []
15 changes: 15 additions & 0 deletions src/keys/universal_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction;
use ethers::core::types::transaction::eip712::Eip712;
use ethers::core::types::{Address, Signature as EthSig};
use ethers::signers::{Signer, Wallet, WalletError};
use ethers::types::Bytes;
use thiserror::Error;

use crate::aws::ethers_signer::AwsSigner;
Expand All @@ -13,6 +14,20 @@ pub enum UniversalSigner {
Local(Wallet<SigningKey>),
}

impl UniversalSigner {
pub async fn raw_signed_tx(
&self,
tx: &TypedTransaction,
) -> eyre::Result<Bytes> {
let signature = match self {
Self::Aws(signer) => signer.sign_transaction(tx).await?,
Self::Local(signer) => signer.sign_transaction(tx).await?,
};

Ok(tx.rlp_signed(&signature))
}
}

#[derive(Debug, Error)]
pub enum UniversalError {
#[error("AWS Signer Error: {0}")]
Expand Down
115 changes: 70 additions & 45 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
use ethers::providers::Middleware;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::transaction::eip2930::AccessList;
use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress};
use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, H256};
use eyre::ContextCompat;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand All @@ -19,23 +19,9 @@ use crate::db::UnsentTx;

pub async fn broadcast_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
let mut txs = app.db.get_unsent_txs().await?;

txs.sort_unstable_by_key(|tx| tx.relayer_id.clone());

let txs_by_relayer =
txs.into_iter().group_by(|tx| tx.relayer_id.clone());

let txs_by_relayer: HashMap<_, _> = txs_by_relayer
.into_iter()
.map(|(relayer_id, txs)| {
let mut txs = txs.collect_vec();

txs.sort_unstable_by_key(|tx| tx.nonce);

(relayer_id, txs)
})
.collect();
// Get all unsent txs and broadcast
let txs = app.db.get_unsent_txs().await?;
let txs_by_relayer = sort_txs_by_relayer(txs);

let mut futures = FuturesUnordered::new();

Expand Down Expand Up @@ -96,41 +82,45 @@ async fn broadcast_relayer_txs(
max_base_fee_per_gas,
);

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};
let mut typed_transaction =
TypedTransaction::Eip1559(Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
});

// Fill and simulate the transaction
middleware
.fill_transaction(&mut typed_transaction, None)
.await?;

tracing::debug!(?eip1559_tx, "Sending tx");
tracing::debug!(?tx.id, "Simulating tx");

// TODO: Is it possible that we send a tx but don't store it in the DB?
// TODO: Be smarter about error handling - a tx can fail to be sent
// e.g. because the relayer is out of funds
// but we don't want to retry it forever
let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
// Simulate the transaction
match middleware.call(&typed_transaction, None).await {
Ok(_) => {
tracing::info!(?tx.id, "Tx simulated successfully");
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
tracing::error!(?tx.id, error = ?err, "Failed to simulate tx");
continue;
}
};

let tx_hash = pending_tx.tx_hash();
// Get the raw signed tx and derive the tx hash
let raw_signed_tx = middleware
.signer()
.raw_signed_tx(&typed_transaction)
.await?;

let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx));

app.db
.insert_tx_broadcast(
Expand All @@ -141,8 +131,43 @@ async fn broadcast_relayer_txs(
)
.await?;

tracing::debug!(?tx.id, "Sending tx");

// TODO: Be smarter about error handling - a tx can fail to be sent
// e.g. because the relayer is out of funds
// but we don't want to retry it forever
let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await;

match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
}
Err(err) => {
tracing::error!(?tx.id, error = ?err, "Failed to send tx");
continue;
}
};

tracing::info!(id = tx.id, hash = ?tx_hash, "Tx broadcast");
}

Ok(())
}

fn sort_txs_by_relayer(
mut txs: Vec<UnsentTx>,
) -> HashMap<String, Vec<UnsentTx>> {
txs.sort_unstable_by_key(|tx| tx.relayer_id.clone());
let txs_by_relayer = txs.into_iter().group_by(|tx| tx.relayer_id.clone());

txs_by_relayer
.into_iter()
.map(|(relayer_id, txs)| {
let mut txs = txs.collect_vec();

txs.sort_unstable_by_key(|tx| tx.nonce);

(relayer_id, txs)
})
.collect()
}
6 changes: 3 additions & 3 deletions src/tasks/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ const TIME_BETWEEN_FINALIZATIONS_SECONDS: i64 = 60;

pub async fn finalize_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
let finalization_timestmap =
let finalization_timestamp =
chrono::Utc::now() - chrono::Duration::seconds(60 * 60);

tracing::info!(
"Finalizing txs mined before {}",
finalization_timestmap
finalization_timestamp
);

app.db.finalize_txs(finalization_timestmap).await?;
app.db.finalize_txs(finalization_timestamp).await?;

tokio::time::sleep(Duration::from_secs(
TIME_BETWEEN_FINALIZATIONS_SECONDS as u64,
Expand Down

0 comments on commit 59ea3c4

Please sign in to comment.