Skip to content

Commit

Permalink
add tx submitter
Browse files Browse the repository at this point in the history
  • Loading branch information
tkporter committed Dec 11, 2024
1 parent ba3efa1 commit e0d8da2
Show file tree
Hide file tree
Showing 6 changed files with 420 additions and 219 deletions.
1 change: 1 addition & 0 deletions rust/main/chains/hyperlane-sealevel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ mod priority_fee;
mod provider;
mod rpc;
mod trait_builder;
mod tx_submitter;
mod utils;
mod validator_announce;
295 changes: 139 additions & 156 deletions rust/main/chains/hyperlane-sealevel/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use hyperlane_core::{
TxCostEstimate, TxOutcome, H256, H512, U256,
};

use crate::error::HyperlaneSealevelError;
use crate::log_meta_composer::{
is_interchain_payment_instruction, is_message_delivery_instruction,
is_message_dispatch_instruction, LogMetaComposer,
Expand All @@ -72,6 +71,7 @@ use crate::{
priority_fee::PriorityFeeOracle,
PriorityFeeOracleConfig,
};
use crate::{error::HyperlaneSealevelError, tx_submitter::TransactionSubmitter};
use crate::{ConnectionConf, SealevelProvider, SealevelRpcClient};

const SYSTEM_PROGRAM: &str = "11111111111111111111111111111111";
Expand Down Expand Up @@ -111,6 +111,7 @@ pub struct SealevelMailbox {
payer: Option<Keypair>,
priority_fee_oracle: Box<dyn PriorityFeeOracle>,
priority_fee_oracle_config: PriorityFeeOracleConfig,
tx_submitter: Box<dyn TransactionSubmitter>,
}

impl SealevelMailbox {
Expand All @@ -135,10 +136,13 @@ impl SealevelMailbox {
program_id,
inbox,
outbox,
provider,
payer,
priority_fee_oracle: conf.priority_fee_oracle.create_oracle(),
priority_fee_oracle_config: conf.priority_fee_oracle.clone(),
tx_submitter: conf
.transaction_submitter
.create_submitter(provider.rpc().url()),
provider,
})
}

Expand Down Expand Up @@ -368,38 +372,17 @@ impl SealevelMailbox {
) -> ChainResult<Transaction> {
let payer = self.get_payer()?;

let mut instructions = Vec::with_capacity(3);
// Set the compute unit limit.
instructions.push(ComputeBudgetInstruction::set_compute_unit_limit(
compute_unit_limit,
));

let prospective_tip: u64 = compute_unit_price_micro_lamports * compute_unit_limit as u64;
tracing::warn!(prospective_tip, "Prospective tip");

// If we're using Jito, we need to send a tip to the Jito fee account.
// Otherwise, we need to set the compute unit price.
if self.is_solana() {
// Tip in lamports
let tip: u64 =
(compute_unit_price_micro_lamports * compute_unit_limit as u64) / 1e6 as u64;

// The tip is a standalone transfer to a Jito fee account.
// See https://github.com/jito-labs/mev-protos/blob/master/json_rpc/http.md#sendbundle.
instructions.push(solana_sdk::system_instruction::transfer(
&payer.pubkey(),
// A random Jito fee account, taken from the getFeeAccount RPC response:
// https://github.com/jito-labs/mev-protos/blob/master/json_rpc/http.md#gettipaccounts
&solana_sdk::pubkey!("DfXygSm4jCyNCybVYYK6DwvWqjKee8pbDmJGcLWNDXjh"),
tip,
));
} else {
instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
let instructions = vec![
// Set the compute unit limit.
ComputeBudgetInstruction::set_compute_unit_limit(compute_unit_limit),
// Set the priority fee / tip
self.tx_submitter.get_priority_fee_instruction(
compute_unit_price_micro_lamports,
));
}

instructions.push(instruction);
compute_unit_limit.into(),
&payer.pubkey(),
),
instruction,
];

let tx = if sign {
let recent_blockhash = self
Expand Down Expand Up @@ -506,128 +489,128 @@ impl SealevelMailbox {
Ok(process_instruction)
}

async fn send_and_confirm_transaction(
&self,
transaction: &Transaction,
) -> ChainResult<Signature> {
if self.is_solana() {
self.send_and_confirm_transaction_with_jito(transaction)
.await
} else {
self.provider
.rpc()
.send_transaction(transaction, true)
.await
}

// if self.is_solana() {
// if let PriorityFeeOracleConfig::Helius(helius) = &self.priority_fee_oracle_config {
// let rpc = SealevelRpcClient::new(helius.url.clone().into());
// return rpc.send_transaction(transaction, true).await;
// } else {
// tracing::warn!("Priority fee oracle is not Helius, falling back to normal RPC");
// }
// }
// self.provider
// .rpc()
// .send_transaction(transaction, true)
// .await
}

// Stolen from Solana's non-blocking client, but with Jito!
pub async fn send_and_confirm_transaction_with_jito(
&self,
transaction: &impl SerializableTransaction,
) -> ChainResult<Signature> {
let signature = transaction.get_signature();

let base58_txn = bs58::encode(
bincode::serialize(&transaction).map_err(ChainCommunicationError::from_other)?,
)
.into_string();

const SEND_RETRIES: usize = 1;
const GET_STATUS_RETRIES: usize = usize::MAX;

'sending: for _ in 0..SEND_RETRIES {
let jito_request_body = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "sendBundle",
"params": [
[base58_txn]
],
});

tracing::info!(
?jito_request_body,
?signature,
"Sending sealevel transaction to Jito as bundle"
);

let jito_response = reqwest::Client::new()
.post("https://mainnet.block-engine.jito.wtf:443/api/v1/bundles")
.json(&jito_request_body)
.send()
.await
.map_err(ChainCommunicationError::from_other)?;
let jito_response_text = jito_response.text().await;

tracing::info!(
?signature,
?jito_response_text,
"Got Jito response for sealevel transaction bundle"
);

// let recent_blockhash = if transaction.uses_durable_nonce() {
// self.provider
// .rpc()
// .get_latest_blockhash_with_commitment(CommitmentConfig::processed())
// .await?
// } else {
// *transaction.get_recent_blockhash()
// };

// for status_retry in 0..GET_STATUS_RETRIES {
// let signature_statuses: Response<Vec<Option<TransactionStatus>>> = self
// .provider
// .rpc()
// .get_signature_statuses(&[*signature])
// .await?;
// let signature_status = signature_statuses.value.first().cloned().flatten();
// match signature_status {
// Some(_) => return Ok(*signature),
// None => {
// if !self
// .provider
// .rpc()
// .is_blockhash_valid(&recent_blockhash)
// .await?
// {
// // Block hash is not found by some reason
// break 'sending;
// } else if cfg!(not(test))
// // Ignore sleep at last step.
// && status_retry < GET_STATUS_RETRIES
// {
// // Retry twice a second
// tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// continue;
// }
// }
// }
// }
}

Err(ChainCommunicationError::from_other(
solana_client::rpc_request::RpcError::ForUser(
"unable to confirm transaction. \
This can happen in situations such as transaction expiration \
and insufficient fee-payer funds"
.to_string(),
),
))
}
// async fn send_and_confirm_transaction(
// &self,
// transaction: &Transaction,
// ) -> ChainResult<Signature> {
// if self.is_solana() {
// self.send_and_confirm_transaction_with_jito(transaction)
// .await
// } else {
// self.provider
// .rpc()
// .send_transaction(transaction, true)
// .await
// }

// // if self.is_solana() {
// // if let PriorityFeeOracleConfig::Helius(helius) = &self.priority_fee_oracle_config {
// // let rpc = SealevelRpcClient::new(helius.url.clone().into());
// // return rpc.send_transaction(transaction, true).await;
// // } else {
// // tracing::warn!("Priority fee oracle is not Helius, falling back to normal RPC");
// // }
// // }
// // self.provider
// // .rpc()
// // .send_transaction(transaction, true)
// // .await
// }

// // Stolen from Solana's non-blocking client, but with Jito!
// pub async fn send_and_confirm_transaction_with_jito(
// &self,
// transaction: &impl SerializableTransaction,
// ) -> ChainResult<Signature> {
// let signature = transaction.get_signature();

// let base58_txn = bs58::encode(
// bincode::serialize(&transaction).map_err(ChainCommunicationError::from_other)?,
// )
// .into_string();

// const SEND_RETRIES: usize = 1;
// const GET_STATUS_RETRIES: usize = usize::MAX;

// 'sending: for _ in 0..SEND_RETRIES {
// let jito_request_body = serde_json::json!({
// "jsonrpc": "2.0",
// "id": 1,
// "method": "sendBundle",
// "params": [
// [base58_txn]
// ],
// });

// tracing::info!(
// ?jito_request_body,
// ?signature,
// "Sending sealevel transaction to Jito as bundle"
// );

// let jito_response = reqwest::Client::new()
// .post("https://mainnet.block-engine.jito.wtf:443/api/v1/bundles")
// .json(&jito_request_body)
// .send()
// .await
// .map_err(ChainCommunicationError::from_other)?;
// let jito_response_text = jito_response.text().await;

// tracing::info!(
// ?signature,
// ?jito_response_text,
// "Got Jito response for sealevel transaction bundle"
// );

// // let recent_blockhash = if transaction.uses_durable_nonce() {
// // self.provider
// // .rpc()
// // .get_latest_blockhash_with_commitment(CommitmentConfig::processed())
// // .await?
// // } else {
// // *transaction.get_recent_blockhash()
// // };

// // for status_retry in 0..GET_STATUS_RETRIES {
// // let signature_statuses: Response<Vec<Option<TransactionStatus>>> = self
// // .provider
// // .rpc()
// // .get_signature_statuses(&[*signature])
// // .await?;
// // let signature_status = signature_statuses.value.first().cloned().flatten();
// // match signature_status {
// // Some(_) => return Ok(*signature),
// // None => {
// // if !self
// // .provider
// // .rpc()
// // .is_blockhash_valid(&recent_blockhash)
// // .await?
// // {
// // // Block hash is not found by some reason
// // break 'sending;
// // } else if cfg!(not(test))
// // // Ignore sleep at last step.
// // && status_retry < GET_STATUS_RETRIES
// // {
// // // Retry twice a second
// // tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// // continue;
// // }
// // }
// // }
// // }
// }

// Err(ChainCommunicationError::from_other(
// solana_client::rpc_request::RpcError::ForUser(
// "unable to confirm transaction. \
// This can happen in situations such as transaction expiration \
// and insufficient fee-payer funds"
// .to_string(),
// ),
// ))
// }

async fn get_inbox(&self) -> ChainResult<Box<Inbox>> {
let account = self
Expand Down Expand Up @@ -740,7 +723,7 @@ impl Mailbox for SealevelMailbox {

tracing::info!(?tx, "Created sealevel transaction to process message");

let signature = self.send_and_confirm_transaction(&tx).await?;
let signature = self.tx_submitter.send_transaction(&tx, true).await?;

tracing::info!(?tx, ?signature, "Sealevel transaction sent");

Expand Down
4 changes: 4 additions & 0 deletions rust/main/chains/hyperlane-sealevel/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ impl SealevelRpcClient {

Ok(result)
}

pub fn url(&self) -> String {
self.0.url()
}
}

impl std::fmt::Debug for SealevelRpcClient {
Expand Down
Loading

0 comments on commit e0d8da2

Please sign in to comment.