Skip to content

Commit

Permalink
Merge branch 'dev' into 0xkitsune/fetch-missing-txs
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune authored Dec 14, 2023
2 parents eb6b1fc + dc962f0 commit 36a47c7
Show file tree
Hide file tree
Showing 16 changed files with 470 additions and 277 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
RUST_LOG=info,tx_sitter=debug,fake_rpc=debug,tower_http=debug
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/
target/
.env
10 changes: 10 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ escalation_interval = "1m"
datadog_enabled = false
statsd_enabled = false

[predefined.network]
chain_id = 31337
http_url = "http://127.0.0.1:8545"
ws_url = "ws://127.0.0.1:8545"

[predefined.relayer]
id = "1b908a34-5dc1-4d2d-a146-5eb46e975830"
chain_id = 31337
key_id = "d10607662a85424f02a33fb1e6d095bd0ac7154396ff09762e41f82ff2233aaa"

[server]
host = "127.0.0.1:3000"
disable_auth = false
Expand Down
42 changes: 39 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ pub struct TxSitterConfig {

#[serde(default)]
pub statsd_enabled: bool,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub predefined: Option<Predefined>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct Predefined {
pub network: PredefinedNetwork,
pub relayer: PredefinedRelayer,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct PredefinedNetwork {
pub chain_id: u64,
pub name: String,
pub http_rpc: String,
pub ws_rpc: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct PredefinedRelayer {
pub id: String,
pub name: String,
pub key_id: String,
pub chain_id: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -101,10 +129,16 @@ pub enum KeysConfig {
#[serde(rename_all = "snake_case")]
pub struct KmsKeysConfig {}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct LocalKeysConfig {}

impl KeysConfig {
pub fn is_local(&self) -> bool {
matches!(self, Self::Local(_))
}
}

#[cfg(test)]
mod tests {
use indoc::indoc;
Expand Down Expand Up @@ -156,6 +190,7 @@ mod tests {
escalation_interval: Duration::from_secs(60 * 60),
datadog_enabled: false,
statsd_enabled: false,
predefined: None,
},
server: ServerConfig {
host: SocketAddr::from(([127, 0, 0, 1], 3000)),
Expand All @@ -166,7 +201,7 @@ mod tests {
"postgres://postgres:[email protected]:52804/database"
.to_string(),
),
keys: KeysConfig::Local(LocalKeysConfig {}),
keys: KeysConfig::Local(LocalKeysConfig::default()),
};

let toml = toml::to_string_pretty(&config).unwrap();
Expand All @@ -181,6 +216,7 @@ mod tests {
escalation_interval: Duration::from_secs(60 * 60),
datadog_enabled: false,
statsd_enabled: false,
predefined: None,
},
server: ServerConfig {
host: SocketAddr::from(([127, 0, 0, 1], 3000)),
Expand All @@ -194,7 +230,7 @@ mod tests {
password: "pass".to_string(),
database: "db".to_string(),
}),
keys: KeysConfig::Local(LocalKeysConfig {}),
keys: KeysConfig::Local(LocalKeysConfig::default()),
};

let toml = toml::to_string_pretty(&config).unwrap();
Expand Down
68 changes: 62 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,26 @@ impl Database {
Ok(block_number.map(|(n,)| n as u64))
}

pub async fn get_latest_block_number(
&self,
chain_id: u64,
) -> eyre::Result<Option<u64>> {
let block_number: Option<(i64,)> = sqlx::query_as(
r#"
SELECT block_number
FROM blocks
WHERE chain_id = $1
ORDER BY block_number DESC
LIMIT 1
"#,
)
.bind(chain_id as i64)
.fetch_optional(&self.pool)
.await?;

Ok(block_number.map(|(n,)| n as u64))
}

pub async fn get_latest_block_fees_by_chain_id(
&self,
chain_id: u64,
Expand Down Expand Up @@ -734,7 +754,13 @@ impl Database {
pub async fn read_txs(
&self,
relayer_id: &str,
tx_status_filter: Option<Option<TxStatus>>,
) -> eyre::Result<Vec<ReadTxData>> {
let (should_filter, status_filter) = match tx_status_filter {
Some(status) => (true, status),
None => (false, None),
};

Ok(sqlx::query_as(
r#"
SELECT t.id as tx_id, t.tx_to as to, t.data, t.value, t.gas_limit, t.nonce,
Expand All @@ -743,9 +769,12 @@ impl Database {
LEFT JOIN sent_transactions s ON t.id = s.tx_id
LEFT JOIN tx_hashes h ON s.valid_tx_hash = h.tx_hash
WHERE t.relayer_id = $1
AND ($2 = true AND s.status = $3) OR $2 = false
"#,
)
.bind(relayer_id)
.bind(should_filter)
.bind(status_filter)
.fetch_all(&self.pool)
.await?)
}
Expand Down Expand Up @@ -1074,7 +1103,7 @@ mod tests {
{
Ok(db) => return Ok((db, db_container)),
Err(_) => {
tokio::time::sleep(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Expand Down Expand Up @@ -1290,6 +1319,9 @@ mod tests {
assert_eq!(tx.nonce, 0);
assert_eq!(tx.tx_hash, None);

let unsent_txs = db.read_txs(relayer_id, None).await?;
assert_eq!(unsent_txs.len(), 1, "1 unsent tx");

let tx_hash_1 = H256::from_low_u64_be(1);
let tx_hash_2 = H256::from_low_u64_be(2);
let initial_max_fee_per_gas = U256::from(1);
Expand All @@ -1308,6 +1340,18 @@ mod tests {
assert_eq!(tx.tx_hash.unwrap().0, tx_hash_1);
assert_eq!(tx.status, Some(TxStatus::Pending));

let unsent_txs = db.read_txs(relayer_id, Some(None)).await?;
assert_eq!(unsent_txs.len(), 0, "0 unsent tx");

let pending_txs = db
.read_txs(relayer_id, Some(Some(TxStatus::Pending)))
.await?;
assert_eq!(pending_txs.len(), 1, "1 pending tx");

let all_txs = db.read_txs(relayer_id, None).await?;

assert_eq!(all_txs, pending_txs);

db.escalate_tx(
tx_id,
tx_hash_2,
Expand Down Expand Up @@ -1394,7 +1438,7 @@ mod tests {
async fn blocks() -> eyre::Result<()> {
let (db, _db_container) = setup_db().await?;

let block_number = 1;
let block_numbers = [0, 1];
let chain_id = 1;
let timestamp = ymd_hms(2023, 11, 23, 12, 32, 2);
let txs = &[
Expand All @@ -1403,7 +1447,10 @@ mod tests {
H256::from_low_u64_be(3),
];

db.save_block(block_number, chain_id, timestamp, txs)
db.save_block(block_numbers[0], chain_id, timestamp, txs)
.await?;

db.save_block(block_numbers[1], chain_id, timestamp, txs)
.await?;

let fee_estimates = FeesEstimate {
Expand All @@ -1413,13 +1460,22 @@ mod tests {

let gas_price = U256::from(1_000_000_007);

db.save_block_fees(block_number, chain_id, &fee_estimates, gas_price)
.await?;
db.save_block_fees(
block_numbers[1],
chain_id,
&fee_estimates,
gas_price,
)
.await?;

let latest_block_number =
db.get_latest_block_number(chain_id)
.await?
.context("Could not get latest block number")?;
let block_fees = db.get_latest_block_fees_by_chain_id(chain_id).await?;

let block_fees = block_fees.context("Missing fees")?;

assert_eq!(latest_block_number, block_numbers[1]);
assert_eq!(
block_fees.fee_estimates.base_fee_per_gas,
fee_estimates.base_fee_per_gas
Expand Down
2 changes: 1 addition & 1 deletion src/db/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct TxForEscalation {
pub escalation_count: usize,
}

#[derive(Debug, Clone, FromRow)]
#[derive(Debug, Clone, FromRow, PartialEq, Eq)]
pub struct ReadTxData {
pub tx_id: String,
pub to: AddressWrapper,
Expand Down
125 changes: 6 additions & 119 deletions src/keys.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use aws_config::BehaviorVersion;
use aws_sdk_kms::types::{KeySpec, KeyUsageType};
use ethers::core::k256::ecdsa::SigningKey;
use ethers::signers::Wallet;
use eyre::{Context, ContextCompat};
pub use universal_signer::UniversalSigner;

use crate::aws::ethers_signer::AwsSigner;
use crate::config::{KmsKeysConfig, LocalKeysConfig};
pub mod kms_keys;
pub mod local_keys;
pub mod universal_signer;

mod universal_signer;
pub use kms_keys::KmsKeys;
pub use local_keys::LocalKeys;
pub use universal_signer::UniversalSigner;

#[async_trait::async_trait]
pub trait KeysSource: Send + Sync + 'static {
Expand All @@ -18,112 +14,3 @@ pub trait KeysSource: Send + Sync + 'static {
/// Loads the key using the provided id
async fn load_signer(&self, id: String) -> eyre::Result<UniversalSigner>;
}

pub struct KmsKeys {
kms_client: aws_sdk_kms::Client,
}

impl KmsKeys {
pub async fn new(_config: &KmsKeysConfig) -> eyre::Result<Self> {
let aws_config =
aws_config::load_defaults(BehaviorVersion::latest()).await;

let kms_client = aws_sdk_kms::Client::new(&aws_config);

Ok(Self { kms_client })
}
}

#[async_trait::async_trait]
impl KeysSource for KmsKeys {
async fn new_signer(&self) -> eyre::Result<(String, UniversalSigner)> {
let kms_key = self
.kms_client
.create_key()
.key_spec(KeySpec::EccSecgP256K1)
.key_usage(KeyUsageType::SignVerify)
.send()
.await
.context("AWS Error")?;

let key_id =
kms_key.key_metadata.context("Missing key metadata")?.key_id;

let signer = AwsSigner::new(
self.kms_client.clone(),
key_id.clone(),
1, // TODO: get chain id from provider
)
.await?;

Ok((key_id, UniversalSigner::Aws(signer)))
}

async fn load_signer(&self, id: String) -> eyre::Result<UniversalSigner> {
let signer = AwsSigner::new(
self.kms_client.clone(),
id.clone(),
1, // TODO: get chain id from provider
)
.await?;

Ok(UniversalSigner::Aws(signer))
}
}

pub struct LocalKeys {
rng: rand::rngs::OsRng,
}

impl LocalKeys {
pub fn new(_config: &LocalKeysConfig) -> Self {
Self {
rng: rand::rngs::OsRng,
}
}
}

#[async_trait::async_trait]
impl KeysSource for LocalKeys {
async fn new_signer(&self) -> eyre::Result<(String, UniversalSigner)> {
let signing_key = SigningKey::random(&mut self.rng.clone());

let key_id = signing_key.to_bytes().to_vec();
let key_id = hex::encode(key_id);

let signer = Wallet::from(signing_key);

Ok((key_id, UniversalSigner::Local(signer)))
}

async fn load_signer(&self, id: String) -> eyre::Result<UniversalSigner> {
let key_id = hex::decode(id)?;
let signing_key = SigningKey::from_slice(key_id.as_slice())?;

let signer = Wallet::from(signing_key);

Ok(UniversalSigner::Local(signer))
}
}

#[cfg(test)]
mod tests {
use ethers::signers::Signer;

use super::*;

#[tokio::test]
async fn local_roundtrip() -> eyre::Result<()> {
let keys_source = LocalKeys::new(&LocalKeysConfig {});

let (id, signer) = keys_source.new_signer().await?;

let address = signer.address();

let signer = keys_source.load_signer(id).await?;

assert_eq!(address, signer.address());

Ok(())
}
}
Loading

0 comments on commit 36a47c7

Please sign in to comment.