diff --git a/Cargo.lock b/Cargo.lock index 84e714ce..cbb113d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,7 @@ name = "aa-bundler" version = "0.1.0" dependencies = [ "anyhow", + "arrayref", "async-trait", "clap", "dirs", diff --git a/Cargo.toml b/Cargo.toml index 6893c3c6..8a62cabb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ rust-version = "1.66.1" [dependencies] anyhow = "1" +arrayref = "0.3" async-trait = "0.1" clap = { version = "4", features = ["derive"] } dirs = "4.0" diff --git a/bin/bundler-rpc.rs b/bin/bundler-rpc.rs index 9847cbb1..b30718f8 100644 --- a/bin/bundler-rpc.rs +++ b/bin/bundler-rpc.rs @@ -3,7 +3,10 @@ use clap::Parser; use jsonrpsee::{core::server::rpc_module::Methods, server::ServerBuilder, tracing::info}; use std::future::pending; -use aa_bundler::rpc::{eth::EthApiServerImpl, eth_api::EthApiServer}; +use aa_bundler::{ + rpc::{eth::EthApiServerImpl, eth_api::EthApiServer}, + uopool::server::uopool::uo_pool_client::UoPoolClient, +}; #[derive(Parser)] #[clap( @@ -13,6 +16,9 @@ use aa_bundler::rpc::{eth::EthApiServerImpl, eth_api::EthApiServer}; pub struct Opt { #[clap(long, default_value = "127.0.0.1:4337")] pub rpc_listen_address: String, + + #[clap(long, default_value = "127.0.0.1:3001")] + pub uopool_grpc_listen_address: String, } #[tokio::main] @@ -26,9 +32,12 @@ async fn main() -> Result<()> { .await?; let mut api = Methods::new(); + let uopool_grpc_client = + UoPoolClient::connect(format!("http://{}", opt.uopool_grpc_listen_address)).await?; api.merge( EthApiServerImpl { call_gas_limit: 100_000_000, + uopool_grpc_client, } .into_rpc(), ) diff --git a/bin/bundler.rs b/bin/bundler.rs index 7a987616..6f56b01d 100644 --- a/bin/bundler.rs +++ b/bin/bundler.rs @@ -2,6 +2,7 @@ use aa_bundler::{ bundler::Bundler, models::wallet::Wallet, rpc::{eth::EthApiServerImpl, eth_api::EthApiServer}, + uopool::server::uopool::uo_pool_client::UoPoolClient, utils::{parse_address, parse_u256}, }; use anyhow::Result; @@ -81,9 +82,16 @@ fn main() -> Result<()> { .unwrap(); let mut api = Methods::new(); + let uopool_grpc_client = UoPoolClient::connect(format!( + "http://{}", + opt.uopool_opts.uopool_grpc_listen_address + )) + .await + .unwrap(); api.merge( EthApiServerImpl { call_gas_limit: 100_000_000, + uopool_grpc_client, } .into_rpc(), ) diff --git a/src/proto/uopool/uopool.proto b/src/proto/uopool/uopool.proto index 9b96b79c..41f03d81 100644 --- a/src/proto/uopool/uopool.proto +++ b/src/proto/uopool/uopool.proto @@ -7,7 +7,6 @@ package uopool; message AddRequest { types.UserOperation uo = 1; types.H160 ep = 2; - uint64 cid = 3; } enum AddResult { @@ -17,7 +16,7 @@ enum AddResult { message AddResponse { AddResult result = 1; - string error = 2; + string data = 2; } message RemoveRequest { @@ -31,7 +30,7 @@ enum RemoveResult { message RemoveResponse { RemoveResult result = 1; - string error = 2; + string data = 2; } message AllRequest {} diff --git a/src/rpc/eth.rs b/src/rpc/eth.rs index b35c3b54..756e0c3b 100644 --- a/src/rpc/eth.rs +++ b/src/rpc/eth.rs @@ -1,20 +1,21 @@ use crate::{ rpc::eth_api::{EstimateUserOperationGasResponse, EthApiServer}, types::user_operation::{UserOperation, UserOperationHash, UserOperationReceipt}, + uopool::server::uopool::{uo_pool_client::UoPoolClient, AddRequest, AddResult}, }; +use anyhow::format_err; use async_trait::async_trait; use ethers::types::{Address, U256, U64}; use jsonrpsee::{ core::RpcResult, tracing::info, - types::{ - error::{CallError, ErrorCode}, - ErrorObject, - }, + types::{error::CallError, ErrorObject}, }; +use std::str::FromStr; pub struct EthApiServerImpl { pub call_gas_limit: u64, + pub uopool_grpc_client: UoPoolClient, } #[async_trait] @@ -32,16 +33,31 @@ impl EthApiServer for EthApiServerImpl { user_operation: UserOperation, entry_point: Address, ) -> RpcResult { - info!("{:?}", user_operation); info!("{:?}", entry_point); - // Ok(SendUserOperationResponse::Success(H256::default())) - let data = serde_json::value::to_raw_value(&"{\"a\": 100, \"b\": 200}").unwrap(); + info!("{:?}", user_operation); + + let mut uopool_grpc_client = self.uopool_grpc_client.clone(); + + let request = tonic::Request::new(AddRequest { + uo: Some(user_operation.into()), + ep: Some(entry_point.into()), + }); + + let response = uopool_grpc_client + .add(request) + .await + .map_err(|status| format_err!("GRPC error (uopool): {}", status.message()))? + .into_inner(); + + if response.result == AddResult::Added as i32 { + let user_operation_hash = UserOperationHash::from_str(&response.data) + .map_err(|err| format_err!("error parsing user operation hash: {}", err))?; + return Ok(user_operation_hash); + } + Err(jsonrpsee::core::Error::Call(CallError::Custom( - ErrorObject::owned( - ErrorCode::ServerError(-32000).code(), - "Not implemented", - Some(data), - ), + serde_json::from_str::(&response.data) + .map_err(|err| format_err!("error parsing error object: {}", err))?, ))) } diff --git a/src/uopool/mod.rs b/src/uopool/mod.rs index a05d1d7a..2d0c44f2 100644 --- a/src/uopool/mod.rs +++ b/src/uopool/mod.rs @@ -1,7 +1,7 @@ use crate::{ types::user_operation::{UserOperation, UserOperationHash}, uopool::{ - memory::MemoryMempool, server::uopool_server::uo_pool_server::UoPoolServer, + memory::MemoryMempool, server::uopool::uo_pool_server::UoPoolServer, services::UoPoolService, }, }; @@ -53,7 +53,7 @@ pub struct UserOperationPool { pub pool: Arc, } -#[derive(Educe, Parser)] +#[derive(Clone, Copy, Educe, Parser)] #[educe(Debug)] pub struct UoPoolOpts { #[clap(long, default_value = "127.0.0.1:3001")] diff --git a/src/uopool/server.rs b/src/uopool/server.rs index 823dd49a..bfb30b80 100644 --- a/src/uopool/server.rs +++ b/src/uopool/server.rs @@ -1,7 +1,119 @@ -mod types { +// Code adapted from: https://github.com/ledgerwatch/interfaces/blob/master/src/lib.rs#L1 +pub mod types { + use arrayref::array_ref; + use ethers::types::{Address, Bytes, U256}; + tonic::include_proto!("types"); + + impl From for H128 { + fn from(value: ethers::types::H128) -> Self { + Self { + hi: u64::from_be_bytes(*array_ref!(value, 0, 8)), + lo: u64::from_be_bytes(*array_ref!(value, 8, 8)), + } + } + } + + impl From for H160 { + fn from(value: ethers::types::H160) -> Self { + Self { + hi: Some(ethers::types::H128::from_slice(&value[..16]).into()), + lo: u32::from_be_bytes(*array_ref!(value, 16, 4)), + } + } + } + + impl From for H256 { + fn from(value: ethers::types::H256) -> Self { + Self { + hi: Some(ethers::types::H128::from_slice(&value[..16]).into()), + lo: Some(ethers::types::H128::from_slice(&value[16..]).into()), + } + } + } + + impl From for ethers::types::H128 { + fn from(value: H128) -> Self { + let mut v = [0; Self::len_bytes()]; + v[..8].copy_from_slice(&value.hi.to_be_bytes()); + v[8..].copy_from_slice(&value.lo.to_be_bytes()); + + v.into() + } + } + + impl From for ethers::types::H160 { + fn from(value: H160) -> Self { + type H = ethers::types::H128; + + let mut v = [0; Self::len_bytes()]; + v[..H::len_bytes()] + .copy_from_slice(H::from(value.hi.unwrap_or_default()).as_fixed_bytes()); + v[H::len_bytes()..].copy_from_slice(&value.lo.to_be_bytes()); + + v.into() + } + } + + impl From for ethers::types::H256 { + fn from(value: H256) -> Self { + type H = ethers::types::H128; + + let mut v = [0; Self::len_bytes()]; + v[..H::len_bytes()] + .copy_from_slice(H::from(value.hi.unwrap_or_default()).as_fixed_bytes()); + v[H::len_bytes()..] + .copy_from_slice(H::from(value.lo.unwrap_or_default()).as_fixed_bytes()); + + v.into() + } + } + + impl From for UserOperation { + fn from(user_operation: crate::types::user_operation::UserOperation) -> Self { + Self { + sender: Some(user_operation.sender.into()), + nonce: user_operation.nonce.as_u64(), + init_code: prost::bytes::Bytes::copy_from_slice(user_operation.init_code.as_ref()), + call_data: prost::bytes::Bytes::copy_from_slice(user_operation.call_data.as_ref()), + call_gas_limit: user_operation.call_gas_limit.as_u64(), + verification_gas_limit: user_operation.verification_gas_limit.as_u64(), + pre_verification_gas: user_operation.pre_verification_gas.as_u64(), + max_fee_per_gas: user_operation.max_fee_per_gas.as_u64(), + max_priority_fee_per_gas: user_operation.max_priority_fee_per_gas.as_u64(), + paymaster_and_data: prost::bytes::Bytes::copy_from_slice( + user_operation.paymaster_and_data.as_ref(), + ), + signature: prost::bytes::Bytes::copy_from_slice(user_operation.signature.as_ref()), + } + } + } + + impl From for crate::types::user_operation::UserOperation { + fn from(user_operation: UserOperation) -> Self { + Self { + sender: { + if let Some(sender) = user_operation.sender { + sender.into() + } else { + Address::zero() + } + }, + nonce: U256::from(user_operation.nonce), + init_code: Bytes::from(user_operation.init_code), + call_data: Bytes::from(user_operation.call_data), + call_gas_limit: U256::from(user_operation.call_gas_limit), + verification_gas_limit: U256::from(user_operation.verification_gas_limit), + pre_verification_gas: U256::from(user_operation.pre_verification_gas), + max_fee_per_gas: U256::from(user_operation.max_fee_per_gas), + max_priority_fee_per_gas: U256::from(user_operation.max_priority_fee_per_gas), + paymaster_and_data: Bytes::from(user_operation.paymaster_and_data), + signature: Bytes::from(user_operation.signature), + } + } + } } -pub mod uopool_server { +pub mod uopool { tonic::include_proto!("uopool"); } diff --git a/src/uopool/services/uopool.rs b/src/uopool/services/uopool.rs index 0c5cd8a2..d4c77679 100644 --- a/src/uopool/services/uopool.rs +++ b/src/uopool/services/uopool.rs @@ -1,19 +1,22 @@ -use std::{collections::HashMap, sync::Arc}; - use crate::{ types::user_operation::UserOperation, uopool::{ - server::uopool_server::{ - uo_pool_server::UoPool, AddRequest, AddResponse, AllRequest, AllResponse, + server::uopool::{ + uo_pool_server::UoPool, AddRequest, AddResponse, AddResult, AllRequest, AllResponse, RemoveRequest, RemoveResponse, }, MempoolBox, MempoolId, }, }; use async_trait::async_trait; +use jsonrpsee::{tracing::info, types::ErrorObject}; use parking_lot::RwLock; +use serde_json::json; +use std::{collections::HashMap, sync::Arc}; use tonic::Response; +pub type UoPoolError = ErrorObject<'static>; + pub struct UoPoolService { _mempools: Arc>>>>, } @@ -30,12 +33,38 @@ impl UoPoolService { impl UoPool for UoPoolService { async fn add( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - // let req = request.into_inner(); - // TODO: sanity checks - // TODO: simulation - Err(tonic::Status::unimplemented("todo")) + let req = request.into_inner(); + let mut res = AddResponse::default(); + + if let Some(user_operation) = req.uo { + let user_operation: UserOperation = user_operation + .try_into() + .map_err(|_| tonic::Status::invalid_argument("invalid user operation"))?; + + info!("{:?}", user_operation); + + // TODO: validate user operation + // TODO: sanity checks + // TODO: simulation + + let uo_pool_error = UoPoolError::owned( + -32602, + "user operation was not added", + Some(json!({ + "reason": "this is error", + })), + ); + + res.set_result(AddResult::NotAdded); + res.data = serde_json::to_string(&uo_pool_error) + .map_err(|_| tonic::Status::internal("error adding user operation"))?; + + return Ok(tonic::Response::new(res)); + } + + Err(tonic::Status::invalid_argument("missing user operation")) } async fn remove(