Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling relayers #14

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions db/migrations/002_relayers_table_update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE relayers
RENAME COLUMN gas_limits TO gas_price_limits;

ALTER TABLE relayers
ADD COLUMN enabled BOOL NOT NULL DEFAULT TRUE;
2 changes: 1 addition & 1 deletion manual_test_kms.nu
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ echo "Creating relayer"
let relayer = http post -t application/json $"($txSitter)/1/admin/relayer" { "name": "My Relayer", "chainId": 11155111 }

http post -t application/json $"($txSitter)/1/admin/relayer/($relayer.relayerId)" {
gasLimits: [
gasPriceLimits: [
{ chainId: 11155111, value: "0x123" }
]
}
Expand Down
15 changes: 12 additions & 3 deletions src/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use eyre::ContextCompat;

use self::gas_estimation::FeesEstimate;
use crate::app::App;
use crate::types::RelayerInfo;

pub mod gas_estimation;

Expand All @@ -21,11 +22,19 @@ pub fn calculate_gas_fees_from_estimates(

pub async fn should_send_transaction(
app: &App,
relayer_id: &str,
relayer: &RelayerInfo,
) -> eyre::Result<bool> {
let relayer = app.db.get_relayer(relayer_id).await?;
if !relayer.enabled {
tracing::warn!(
relayer_id = relayer.id,
chain_id = relayer.chain_id,
"Relayer is disabled, skipping transactions broadcast"
);

return Ok(false);
}

for gas_limit in &relayer.gas_limits.0 {
for gas_limit in &relayer.gas_price_limits.0 {
let chain_fees = app
.db
.get_latest_block_fees_by_chain_id(relayer.chain_id)
Expand Down
23 changes: 13 additions & 10 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ impl Database {
.await?;
}

if let Some(gas_limits) = &update.gas_limits {
if let Some(gas_price_limits) = &update.gas_price_limits {
sqlx::query(
r#"
UPDATE relayers
SET gas_limits = $2
SET gas_price_limits = $2
WHERE id = $1
"#,
)
.bind(id)
.bind(Json(gas_limits))
.bind(Json(gas_price_limits))
.execute(tx.as_mut())
.await?;
}
Expand All @@ -132,7 +132,8 @@ impl Database {
nonce,
current_nonce,
max_inflight_txs,
gas_limits
gas_price_limits,
enabled
FROM relayers
"#,
)
Expand All @@ -152,7 +153,8 @@ impl Database {
nonce,
current_nonce,
max_inflight_txs,
gas_limits
gas_price_limits,
enabled
FROM relayers
WHERE id = $1
"#,
Expand Down Expand Up @@ -1090,7 +1092,7 @@ mod tests {

use super::*;
use crate::db::data::U256Wrapper;
use crate::types::RelayerGasLimit;
use crate::types::RelayerGasPriceLimit;

async fn setup_db() -> eyre::Result<(Database, DockerContainerGuard)> {
let db_container = postgres_docker_utils::setup().await?;
Expand Down Expand Up @@ -1230,17 +1232,18 @@ mod tests {
assert_eq!(relayer.nonce, 0);
assert_eq!(relayer.current_nonce, 0);
assert_eq!(relayer.max_inflight_txs, 5);
assert_eq!(relayer.gas_limits.0, vec![]);
assert_eq!(relayer.gas_price_limits.0, vec![]);

db.update_relayer(
relayer_id,
&RelayerUpdate {
relayer_name: None,
max_inflight_txs: Some(10),
gas_limits: Some(vec![RelayerGasLimit {
gas_price_limits: Some(vec![RelayerGasPriceLimit {
chain_id: 1,
value: U256Wrapper(U256::from(10_123u64)),
}]),
enabled: None,
},
)
.await?;
Expand All @@ -1256,8 +1259,8 @@ mod tests {
assert_eq!(relayer.current_nonce, 0);
assert_eq!(relayer.max_inflight_txs, 10);
assert_eq!(
relayer.gas_limits.0,
vec![RelayerGasLimit {
relayer.gas_price_limits.0,
vec![RelayerGasPriceLimit {
chain_id: 1,
value: U256Wrapper(U256::from(10_123u64)),
}]
Expand Down
6 changes: 4 additions & 2 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ async fn broadcast_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<UnsentTx>,
) -> Result<(), eyre::Error> {
) -> eyre::Result<()> {
if txs.is_empty() {
return Ok(());
}

tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs");

if !should_send_transaction(app, &relayer_id).await? {
let relayer = app.db.get_relayer(&relayer_id).await?;

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer_id,
"Skipping transaction broadcasts"
Expand Down
210 changes: 130 additions & 80 deletions src/tasks/escalate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;

use ethers::providers::Middleware;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::transaction::eip2930::AccessList;
use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, U256};
use eyre::ContextCompat;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

use crate::app::App;
use crate::broadcast_utils::should_send_transaction;
use crate::db::TxForEscalation;

pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
Expand All @@ -16,90 +20,136 @@
.get_txs_for_escalation(app.config.service.escalation_interval)
.await?;

for tx in txs_for_escalation {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");
let txs_for_escalation = split_txs_per_relayer(txs_for_escalation);

if !should_send_transaction(&app, &tx.relayer_id).await? {
tracing::warn!(id = tx.id, "Skipping transaction broadcast");
continue;
}
let mut futures = FuturesUnordered::new();

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
for (relayer_id, txs) in txs_for_escalation {
futures.push(escalate_relayer_txs(&app, relayer_id, txs));
}

while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed escalating txs");
}
}

tokio::time::sleep(app.config.service.escalation_interval).await;
}
}

async fn escalate_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<TxForEscalation>,
) -> eyre::Result<()> {
let relayer = app.db.get_relayer(&relayer_id).await?;

for tx in txs {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");

if !should_send_transaction(&app, &relayer).await? {

Check failure on line 51 in src/tasks/escalate.rs

View workflow job for this annotation

GitHub Actions / cargo test

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 51 in src/tasks/escalate.rs

View workflow job for this annotation

GitHub Actions / cargo test

this expression creates a reference which is immediately dereferenced by the compiler
tracing::warn!(id = tx.id, "Skipping transaction broadcast");

return Ok(());
}

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");
0xKitsune marked this conversation as resolved.
Show resolved Hide resolved

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
0xKitsune marked this conversation as resolved.
Show resolved Hide resolved
pending_tx
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
0xKitsune marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
};

let tx_hash = pending_tx.tx_hash();

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
}

Ok(())
}

fn split_txs_per_relayer(
txs: Vec<TxForEscalation>,
) -> HashMap<String, Vec<TxForEscalation>> {
let mut txs_per_relayer = HashMap::new();

for tx in txs {
let relayer_id = tx.relayer_id.clone();

let txs_for_relayer =
txs_per_relayer.entry(relayer_id).or_insert_with(Vec::new);

txs_for_relayer.push(tx);
}

for (_, txs) in txs_per_relayer.iter_mut() {
txs.sort_by_key(|tx| tx.escalation_count);
}

txs_per_relayer
}
Loading
Loading