diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3983f5e4..f1335899 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,9 +16,9 @@ jobs: with: toolchain: stable components: rustfmt, clippy - - name: Setup third party dependencies + - name: Setup third-party dependencies run: | - make fetch-thirdparty + make setup-thirdparty - name: Install Geth and solc run: | sudo add-apt-repository ppa:ethereum/ethereum diff --git a/Makefile b/Makefile index 789472f8..833fc631 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,11 @@ build: - make fetch-thirdparty - cd thirdparty/account-abstraction && yarn install && yarn compile && cd ../.. cargo build run-bundler: cargo run -- --mnemonic-file ${HOME}/.aa-bundler/0x129D197b2a989C6798601A49D89a4AEC822A17a3 --beneficiary 0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990 --gas-factor 600 --min-balance 1 --entry-points 0x0000000000000000000000000000000000000000 --chain-id 5 --helper 0x0000000000000000000000000000000000000000 run-bundler-uopool: - cargo run --bin bundler-uopool -- --entry-points 0x0000000000000000000000000000000000000000 --chain-id 5 + cargo run --bin bundler-uopool -- --entry-points 0x0000000000000000000000000000000000000000 --chain-id 5 --min-stake 1 --min-unstake-delay 0 run-bundler-rpc: cargo run --bin bundler-rpc @@ -15,20 +13,20 @@ run-bundler-rpc: run-create-wallet: cargo run --bin create-wallet -- --output-path ${HOME}/.aa-bundler -fetch-thirdparty: - git submodule update --init - -test: +setup-thirdparty: + git submodule update --init + cd thirdparty/account-abstraction && yarn install && yarn compile && cd ../.. cd thirdparty/bundler && yarn install && yarn preprocess && cd ../.. + +test: cargo test format: cargo fmt --all lint: - cd thirdparty/bundler && yarn install && yarn preprocess && cd ../.. cargo fmt --all -- --check - cargo clippy -- -D warnings -A clippy::derive_partial_eq_without_eq -D clippy::unwrap_used + cargo clippy -- -D warnings -A clippy::derive_partial_eq_without_eq -D clippy::unwrap_used -D clippy::uninlined_format_args clean: cd thirdparty/account-abstraction && yarn clean && cd ../.. diff --git a/README.md b/README.md index 90505b04..5d1338d0 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,12 @@ For more information: https://hackmd.io/@Vid201/aa-bundler-rust ## How to run? +Set up third-party dependencies (EIP-4337 smart contracts and bundler tests): + +```bash +make setup-thirdparty +``` + Create wallet for bundler: ```bash @@ -32,7 +38,7 @@ cargo run -- --mnemonic-file ${HOME}/.aa-bundler/0x129D197b2a989C6798601A49D89a4 Run only user operation pool: ```bash -cargo run --bin bundler-uopool -- --entry-points 0x0000000000000000000000000000000000000000 --chain-id 5 +cargo run --bin bundler-uopool -- --entry-points 0x0000000000000000000000000000000000000000 --chain-id 5 --min-stake 1 --min-unstake-delay 0 ``` Run only JSON-RPC API: diff --git a/bin/bundler-rpc.rs b/bin/bundler-rpc.rs index c46b4c57..940ed7ad 100644 --- a/bin/bundler-rpc.rs +++ b/bin/bundler-rpc.rs @@ -4,7 +4,10 @@ use jsonrpsee::{core::server::rpc_module::Methods, server::ServerBuilder, tracin use std::future::pending; use aa_bundler::{ - rpc::{eth::EthApiServerImpl, eth_api::EthApiServer}, + rpc::{ + debug::DebugApiServerImpl, debug_api::DebugApiServer, eth::EthApiServerImpl, + eth_api::EthApiServer, + }, uopool::server::uopool::uo_pool_client::UoPoolClient, }; @@ -34,6 +37,14 @@ async fn main() -> Result<()> { let mut api = Methods::new(); let uopool_grpc_client = UoPoolClient::connect(format!("http://{}", opt.uopool_grpc_listen_address)).await?; + + #[cfg(debug_assertions)] + api.merge( + DebugApiServerImpl { + uopool_grpc_client: uopool_grpc_client.clone(), + } + .into_rpc(), + )?; api.merge( EthApiServerImpl { call_gas_limit: 100_000_000, diff --git a/bin/bundler.rs b/bin/bundler.rs index 11939bbe..4c55d683 100644 --- a/bin/bundler.rs +++ b/bin/bundler.rs @@ -1,7 +1,10 @@ use aa_bundler::{ bundler::Bundler, models::wallet::Wallet, - rpc::{eth::EthApiServerImpl, eth_api::EthApiServer}, + rpc::{ + debug::DebugApiServerImpl, debug_api::DebugApiServer, eth::EthApiServerImpl, + eth_api::EthApiServer, + }, uopool::server::uopool::uo_pool_client::UoPoolClient, utils::{parse_address, parse_u256}, }; @@ -87,6 +90,13 @@ fn main() -> Result<()> { )) .await?; + #[cfg(debug_assertions)] + api.merge( + DebugApiServerImpl { + uopool_grpc_client: uopool_grpc_client.clone(), + } + .into_rpc(), + )?; api.merge( EthApiServerImpl { call_gas_limit: 100_000_000, diff --git a/src/proto/types/types.proto b/src/proto/types/types.proto index 6d1b2bbd..4b8a0b12 100644 --- a/src/proto/types/types.proto +++ b/src/proto/types/types.proto @@ -34,3 +34,16 @@ message UserOperation { bytes paymaster_and_data = 10; bytes signature = 11; } + +enum ReputationStatus { + OK = 0; + THROTTLED = 1; + BANNED = 2; +} + +message ReputationEntry { + types.H160 address = 1; + uint64 uo_seen = 2; + uint64 uo_included = 3; + ReputationStatus status = 4; +} \ No newline at end of file diff --git a/src/proto/uopool/uopool.proto b/src/proto/uopool/uopool.proto index 41f03d81..5bf640a5 100644 --- a/src/proto/uopool/uopool.proto +++ b/src/proto/uopool/uopool.proto @@ -30,17 +30,68 @@ enum RemoveResult { message RemoveResponse { RemoveResult result = 1; - string data = 2; } -message AllRequest {} +enum GetAllResult { + GOT_ALL = 0; + NOT_GOT_ALL = 1; +} + +message GetAllRequest { + types.H160 ep = 1; +} + +message GetAllResponse { + GetAllResult result = 1; + repeated types.UserOperation uos = 2; +} + +enum ClearResult { + CLEARED = 0; + NOT_CLEARED = 1; +} + +message ClearRequest {} -message AllResponse { - repeated types.UserOperation uos = 1; +message ClearResponse { + ClearResult result = 1; +} + +enum GetAllReputationResult { + GOT_ALL_REPUTATION = 0; + NOT_GOT_ALL_REPUTATION = 1; +} + +message GetAllReputationRequest { + types.H160 ep = 1; +} + +message GetAllReputationResponse { + GetAllReputationResult result = 1; + repeated types.ReputationEntry res = 2; +} + +enum SetReputationResult { + SET_REPUTATION = 0; + NOT_SET_REPUTATION = 1; +} + +message SetReputationRequest { + repeated types.ReputationEntry res = 1; + types.H160 ep = 2; +} + +message SetReputationResponse { + SetReputationResult result = 1; } service UoPool { rpc Add(AddRequest) returns (AddResponse); rpc Remove(RemoveRequest) returns (RemoveResponse); - rpc All(AllRequest) returns (AllResponse); + + // debug + rpc GetAll(GetAllRequest) returns (GetAllResponse); + rpc Clear(ClearRequest) returns (ClearResponse); + rpc GetAllReputation(GetAllReputationRequest) returns (GetAllReputationResponse); + rpc SetReputation(SetReputationRequest) returns (SetReputationResponse); } diff --git a/src/rpc/debug.rs b/src/rpc/debug.rs new file mode 100644 index 00000000..3b8434d6 --- /dev/null +++ b/src/rpc/debug.rs @@ -0,0 +1,113 @@ +use super::debug_api::DebugApiServer; +use crate::{ + types::{reputation::ReputationEntry, user_operation::UserOperation}, + uopool::server::uopool::{ + uo_pool_client::UoPoolClient, ClearRequest, ClearResult, GetAllReputationRequest, + GetAllReputationResult, GetAllRequest, GetAllResult, SetReputationRequest, + SetReputationResult, + }, +}; +use anyhow::format_err; +use async_trait::async_trait; +use ethers::types::Address; +use jsonrpsee::core::RpcResult; + +#[cfg(debug_assertions)] +pub struct DebugApiServerImpl { + pub uopool_grpc_client: UoPoolClient, +} + +#[cfg(debug_assertions)] +#[async_trait] +impl DebugApiServer for DebugApiServerImpl { + async fn clear_state(&self) -> RpcResult<()> { + let mut uopool_grpc_client = self.uopool_grpc_client.clone(); + + let request = tonic::Request::new(ClearRequest {}); + + let response = uopool_grpc_client + .clear(request) + .await + .map_err(|status| format_err!("GRPC error (uopool): {}", status.message()))? + .into_inner(); + + if response.result == ClearResult::Cleared as i32 { + return Ok(()); + } + + Err(jsonrpsee::core::Error::Custom( + "error clearing state".to_string(), + )) + } + + async fn dump_mempool(&self, entry_point: Address) -> RpcResult> { + let mut uopool_grpc_client = self.uopool_grpc_client.clone(); + + let request = tonic::Request::new(GetAllRequest { + ep: Some(entry_point.into()), + }); + + let response = uopool_grpc_client + .get_all(request) + .await + .map_err(|status| format_err!("GRPC error (uopool): {}", status.message()))? + .into_inner(); + + if response.result == GetAllResult::GotAll as i32 { + return Ok(response.uos.iter().map(|uo| uo.clone().into()).collect()); + } + + Err(jsonrpsee::core::Error::Custom( + "error getting mempool".to_string(), + )) + } + + async fn set_reputation( + &self, + reputation_entries: Vec, + entry_point: Address, + ) -> RpcResult<()> { + let mut uopool_grpc_client = self.uopool_grpc_client.clone(); + + let request = tonic::Request::new(SetReputationRequest { + res: reputation_entries.iter().map(|re| (*re).into()).collect(), + ep: Some(entry_point.into()), + }); + + let response = uopool_grpc_client + .set_reputation(request) + .await + .map_err(|status| format_err!("GRPC error (uopool): {}", status.message()))? + .into_inner(); + + if response.result == SetReputationResult::SetReputation as i32 { + return Ok(()); + } + + Err(jsonrpsee::core::Error::Custom( + "error setting reputation".to_string(), + )) + } + + async fn dump_reputation(&self, entry_point: Address) -> RpcResult> { + let mut uopool_grpc_client = self.uopool_grpc_client.clone(); + + let request = tonic::Request::new(GetAllReputationRequest { + ep: Some(entry_point.into()), + }); + + let response = uopool_grpc_client + .get_all_reputation(request) + .await + .map_err(|status| format_err!("GRPC error (uopool): {}", status.message()))? + .into_inner(); + + if response.result == GetAllReputationResult::GotAllReputation as i32 { + return Ok(response.res.iter().map(|re| re.clone().into()).collect()); + } + + Err(jsonrpsee::core::Error::Custom( + "error getting reputation".to_string(), + )) + } +} diff --git a/src/rpc/debug_api.rs b/src/rpc/debug_api.rs new file mode 100644 index 00000000..976c7092 --- /dev/null +++ b/src/rpc/debug_api.rs @@ -0,0 +1,23 @@ +use crate::types::{reputation::ReputationEntry, user_operation::UserOperation}; +use ethers::types::Address; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; + +#[cfg(debug_assertions)] +#[rpc(server, namespace = "debug_bundler")] +pub trait DebugApi { + #[method(name = "clearState")] + async fn clear_state(&self) -> RpcResult<()>; + + #[method(name = "dumpMempool")] + async fn dump_mempool(&self, entry_point: Address) -> RpcResult>; + + #[method(name = "setReputation")] + async fn set_reputation( + &self, + reputation_entries: Vec, + entry_point: Address, + ) -> RpcResult<()>; + + #[method(name = "dumpReputation")] + async fn dump_reputation(&self, entry_point: Address) -> RpcResult>; +} diff --git a/src/rpc/eth_api.rs b/src/rpc/eth_api.rs index 9a5d59a2..33d458c3 100644 --- a/src/rpc/eth_api.rs +++ b/src/rpc/eth_api.rs @@ -15,20 +15,24 @@ pub struct EstimateUserOperationGasResponse { pub trait EthApi { #[method(name = "chainId")] async fn chain_id(&self) -> RpcResult; + #[method(name = "supportedEntryPoints")] async fn supported_entry_points(&self) -> RpcResult>; + #[method(name = "sendUserOperation")] async fn send_user_operation( &self, user_operation: UserOperation, entry_point: Address, ) -> RpcResult; + #[method(name = "estimateUserOperationGas")] async fn estimate_user_operation_gas( &self, user_operation: UserOperation, entry_point: Address, ) -> RpcResult; + #[method(name = "getUserOperationReceipt")] async fn get_user_operation_receipt( &self, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 4474ceff..d6489a25 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,2 +1,4 @@ +pub mod debug; +pub mod debug_api; pub mod eth; pub mod eth_api; diff --git a/src/types/mod.rs b/src/types/mod.rs index b7c34932..cc4efcc8 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1 +1,2 @@ +pub mod reputation; pub mod user_operation; diff --git a/src/types/reputation.rs b/src/types/reputation.rs new file mode 100644 index 00000000..0a94bce7 --- /dev/null +++ b/src/types/reputation.rs @@ -0,0 +1,110 @@ +use educe::Educe; +use ethers::{ + abi::AbiEncode, + types::{Address, U256}, +}; +use jsonrpsee::types::ErrorObject; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +pub const MIN_INCLUSION_RATE_DENOMINATOR: u64 = 10; +pub const THROTTLING_SLACK: u64 = 10; +pub const BAN_SLACK: u64 = 50; +const ENTITY_BANNED_ERROR_CODE: i32 = -32504; +const STAKE_TOO_LOW_ERROR_CODE: i32 = -32505; + +pub type ReputationError = ErrorObject<'static>; + +#[derive(Clone, Copy, Educe, PartialEq, Eq, Serialize, Deserialize)] +#[educe(Debug)] +pub enum ReputationStatus { + OK, + THROTTLED, + BANNED, +} + +#[derive(Clone, Copy, Educe, Eq, PartialEq, Serialize, Deserialize)] +#[educe(Debug)] +pub struct ReputationEntry { + pub address: Address, + pub uo_seen: u64, + pub uo_included: u64, + pub status: ReputationStatus, +} + +#[derive(Clone, Copy, Educe, Eq, PartialEq, Serialize, Deserialize)] +#[educe(Debug)] +pub struct StakeInfo { + pub address: Address, + pub stake: U256, + pub unstake_delay: U256, // seconds +} + +pub enum BadReputationError { + EntityBanned { + address: Address, + title: String, + }, + StakeTooLow { + address: Address, + title: String, + stake: U256, + min_stake: U256, + min_unstake_delay: U256, + }, + UnstakeDelayTooLow { + address: Address, + title: String, + unstake_delay: U256, + min_stake: U256, + min_unstake_delay: U256, + }, +} + +impl From for ReputationError { + fn from(error: BadReputationError) -> Self { + match error { + BadReputationError::EntityBanned { address, title } => ReputationError::owned( + ENTITY_BANNED_ERROR_CODE, + format!("{title} with address {address} is banned",), + Some(json!({ + title: address.to_string(), + })), + ), + BadReputationError::StakeTooLow { + address, + title, + stake, + min_stake, + min_unstake_delay, + } => ReputationError::owned( + STAKE_TOO_LOW_ERROR_CODE, + format!( + "{title} with address {address} stake {stake} is lower than {min_stake}", + ), + Some(json!({ + title: address.to_string(), + "minimumStake": AbiEncode::encode_hex(min_stake), + "minimumUnstakeDelay": AbiEncode::encode_hex(min_unstake_delay), + })), + ), + BadReputationError::UnstakeDelayTooLow { + address, + title, + unstake_delay, + min_stake, + min_unstake_delay, + } => ReputationError::owned( + STAKE_TOO_LOW_ERROR_CODE, + format!( + "{title} with address {address} unstake delay {unstake_delay} is lower than {min_unstake_delay}", + ), + Some(json!({ + title: address.to_string(), + "minimumStake": AbiEncode::encode_hex(min_stake), + "minimumUnstakeDelay": AbiEncode::encode_hex(min_unstake_delay), + })), + ), + } + } +} diff --git a/src/uopool/memory.rs b/src/uopool/memory_mempool.rs similarity index 93% rename from src/uopool/memory.rs rename to src/uopool/memory_mempool.rs index 153c74f3..28369886 100644 --- a/src/uopool/memory.rs +++ b/src/uopool/memory_mempool.rs @@ -50,10 +50,6 @@ impl Mempool for MemoryMempool { } } - async fn get_all(&self) -> anyhow::Result { - Ok(self.user_operations.read().values().cloned().collect()) - } - async fn get_all_by_sender(&self, sender: Address) -> anyhow::Result { let user_operations = self.user_operations.read(); @@ -93,14 +89,18 @@ impl Mempool for MemoryMempool { Ok(()) } - async fn clear(&mut self) -> anyhow::Result<()> { + #[cfg(debug_assertions)] + fn get_all(&self) -> Self::UserOperations { + self.user_operations.read().values().cloned().collect() + } + + #[cfg(debug_assertions)] + fn clear(&mut self) { let mut user_operations = self.user_operations.write(); let mut user_operations_by_sender = self.user_operations_by_sender.write(); user_operations.clear(); user_operations_by_sender.clear(); - - Ok(()) } } @@ -171,7 +171,7 @@ mod tests { ); } - assert_eq!(mempool.get_all().await.unwrap().len(), 7); + assert_eq!(mempool.get_all().len(), 7); assert_eq!( mempool.get_all_by_sender(senders[0]).await.unwrap().len(), 2 @@ -195,7 +195,7 @@ mod tests { anyhow::anyhow!("User operation not found").to_string() ); - assert_eq!(mempool.get_all().await.unwrap().len(), 6); + assert_eq!(mempool.get_all().len(), 6); assert_eq!( mempool.get_all_by_sender(senders[0]).await.unwrap().len(), 2 @@ -205,9 +205,9 @@ mod tests { 2 ); - assert_eq!(mempool.clear().await.unwrap(), ()); + assert_eq!(mempool.clear(), ()); - assert_eq!(mempool.get_all().await.unwrap().len(), 0); + assert_eq!(mempool.get_all().len(), 0); assert_eq!( mempool.get_all_by_sender(senders[0]).await.unwrap().len(), 0 diff --git a/src/uopool/memory_reputation.rs b/src/uopool/memory_reputation.rs new file mode 100644 index 00000000..6e7b3643 --- /dev/null +++ b/src/uopool/memory_reputation.rs @@ -0,0 +1,339 @@ +use async_trait::async_trait; +use educe::Educe; +use ethers::types::{Address, U256}; +use parking_lot::RwLock; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use crate::types::reputation::{ + BadReputationError, ReputationEntry, ReputationError, ReputationStatus, StakeInfo, +}; + +use super::Reputation; + +#[derive(Default, Educe)] +#[educe(Debug)] +pub struct MemoryReputation { + min_inclusion_denominator: u64, + throttling_slack: u64, + ban_slack: u64, + min_stake: U256, + min_unstake_delay: U256, + + entities: Arc>>, + whitelist: Arc>>, + blacklist: Arc>>, +} + +#[async_trait] +impl Reputation for MemoryReputation { + type ReputationEntries = Vec; + + fn init( + &mut self, + min_inclusion_denominator: u64, + throttling_slack: u64, + ban_slack: u64, + min_stake: U256, + min_unstake_delay: U256, + ) { + self.min_inclusion_denominator = min_inclusion_denominator; + self.throttling_slack = throttling_slack; + self.ban_slack = ban_slack; + self.min_stake = min_stake; + self.min_unstake_delay = min_unstake_delay; + } + + async fn get(&mut self, address: &Address) -> anyhow::Result { + let mut entities = self.entities.write(); + + if let Some(entity) = entities.get(address) { + return Ok(*entity); + } + + let entity = ReputationEntry { + address: *address, + uo_seen: 0, + uo_included: 0, + status: ReputationStatus::OK, + }; + + entities.insert(*address, entity); + + Ok(entity) + } + + async fn increment_seen(&mut self, address: &Address) -> anyhow::Result<()> { + let mut entities = self.entities.write(); + + if let Some(entity) = entities.get_mut(address) { + entity.uo_seen += 1; + return Ok(()); + } + + Err(anyhow::anyhow!("Entity not found")) + } + + async fn increment_included(&mut self, address: &Address) -> anyhow::Result<()> { + let mut entities = self.entities.write(); + + if let Some(entity) = entities.get_mut(address) { + entity.uo_included += 1; + return Ok(()); + } + + Err(anyhow::anyhow!("Entity not found")) + } + + fn update_hourly(&mut self) { + let mut entities = self.entities.write(); + for (_, entity) in entities.iter_mut() { + entity.uo_seen = entity.uo_seen * 23 / 24; + entity.uo_included = entity.uo_included * 23 / 24; + } + entities.retain(|_, entity| entity.uo_seen > 0 || entity.uo_included > 0); + } + + async fn add_whitelist(&mut self, address: &Address) -> anyhow::Result<()> { + self.whitelist.write().insert(*address); + Ok(()) + } + + async fn remove_whitelist(&mut self, address: &Address) -> anyhow::Result { + Ok(self.whitelist.write().remove(address)) + } + + async fn is_whitelist(&self, address: &Address) -> anyhow::Result { + Ok(self.whitelist.read().contains(address)) + } + + async fn add_blacklist(&mut self, address: &Address) -> anyhow::Result<()> { + self.blacklist.write().insert(*address); + Ok(()) + } + + async fn remove_blacklist(&mut self, address: &Address) -> anyhow::Result { + Ok(self.blacklist.write().remove(address)) + } + + async fn is_blacklist(&self, address: &Address) -> anyhow::Result { + Ok(self.blacklist.read().contains(address)) + } + + async fn get_status(&self, address: &Address) -> anyhow::Result { + if self.is_whitelist(address).await? { + return Ok(ReputationStatus::OK); + } + + if self.is_blacklist(address).await? { + return Ok(ReputationStatus::BANNED); + } + + let entities = self.entities.read(); + + match entities.get(address) { + Some(entity) => { + let min_expected_included = entity.uo_seen / self.min_inclusion_denominator; + if min_expected_included <= entity.uo_included + self.throttling_slack { + Ok(ReputationStatus::OK) + } else if min_expected_included <= entity.uo_included + self.ban_slack { + Ok(ReputationStatus::THROTTLED) + } else { + Ok(ReputationStatus::BANNED) + } + } + _ => Ok(ReputationStatus::OK), + } + } + + async fn update_handle_ops_reverted(&mut self, address: &Address) -> anyhow::Result<()> { + if let Ok(mut entity) = self.get(address).await { + entity.uo_seen = 100; + entity.uo_included = 0; + } + + Ok(()) + } + + async fn verify_stake(&self, title: &str, stake_info: Option) -> anyhow::Result<()> { + if let Some(stake_info) = stake_info { + if self.is_whitelist(&stake_info.address).await? { + return Ok(()); + } + + let entities = self.entities.read(); + + if let Some(entity) = entities.get(&stake_info.address) { + let error = if entity.status == ReputationStatus::BANNED { + BadReputationError::EntityBanned { + address: stake_info.address, + title: title.to_string(), + } + } else if stake_info.stake < self.min_stake { + BadReputationError::StakeTooLow { + address: stake_info.address, + title: title.to_string(), + stake: stake_info.stake, + min_stake: self.min_stake, + min_unstake_delay: self.min_unstake_delay, + } + } else if stake_info.unstake_delay < self.min_unstake_delay { + BadReputationError::UnstakeDelayTooLow { + address: stake_info.address, + title: title.to_string(), + unstake_delay: stake_info.unstake_delay, + min_stake: self.min_stake, + min_unstake_delay: self.min_unstake_delay, + } + } else { + return Ok(()); + }; + + return Err(anyhow::anyhow!(serde_json::to_string( + &ReputationError::from(error) + )?)); + } + } + + Ok(()) + } + + #[cfg(debug_assertions)] + fn set(&mut self, reputation_entries: Self::ReputationEntries) { + let mut entities = self.entities.write(); + + for reputation in reputation_entries { + entities.insert(reputation.address, reputation); + } + } + + #[cfg(debug_assertions)] + fn get_all(&self) -> Self::ReputationEntries { + self.entities.read().values().cloned().collect() + } + + #[cfg(debug_assertions)] + fn clear(&mut self) { + self.entities.write().clear(); + } +} + +#[cfg(test)] +mod tests { + use crate::uopool::{BAN_SLACK, MIN_INCLUSION_RATE_DENOMINATOR, THROTTLING_SLACK}; + + use super::*; + + #[tokio::test] + async fn memory_reputation() { + let mut reputation = MemoryReputation::default(); + reputation.init( + MIN_INCLUSION_RATE_DENOMINATOR, + THROTTLING_SLACK, + BAN_SLACK, + U256::from(1), + U256::from(0), + ); + + let mut addresses: Vec
= vec![]; + + for _ in 0..5 { + let address = Address::random(); + assert_eq!( + reputation.get(&address).await.unwrap(), + ReputationEntry { + address, + uo_seen: 0, + uo_included: 0, + status: ReputationStatus::OK, + } + ); + addresses.push(address); + } + + assert_eq!(reputation.add_whitelist(&addresses[2]).await.unwrap(), ()); + assert_eq!(reputation.add_blacklist(&addresses[1]).await.unwrap(), ()); + + assert_eq!(reputation.is_whitelist(&addresses[2]).await.unwrap(), true); + assert_eq!(reputation.is_whitelist(&addresses[1]).await.unwrap(), false); + assert_eq!(reputation.is_blacklist(&addresses[1]).await.unwrap(), true); + assert_eq!(reputation.is_blacklist(&addresses[2]).await.unwrap(), false); + + assert_eq!( + reputation.remove_whitelist(&addresses[2]).await.unwrap(), + true + ); + assert_eq!( + reputation.remove_whitelist(&addresses[1]).await.unwrap(), + false + ); + assert_eq!( + reputation.remove_blacklist(&addresses[1]).await.unwrap(), + true + ); + assert_eq!( + reputation.remove_blacklist(&addresses[2]).await.unwrap(), + false + ); + + assert_eq!(reputation.add_whitelist(&addresses[2]).await.unwrap(), ()); + assert_eq!(reputation.add_blacklist(&addresses[1]).await.unwrap(), ()); + + assert_eq!( + reputation.get_status(&addresses[2]).await.unwrap(), + ReputationStatus::OK + ); + assert_eq!( + reputation.get_status(&addresses[1]).await.unwrap(), + ReputationStatus::BANNED + ); + assert_eq!( + reputation.get_status(&addresses[3]).await.unwrap(), + ReputationStatus::OK + ); + + assert_eq!(reputation.increment_seen(&addresses[2]).await.unwrap(), ()); + assert_eq!(reputation.increment_seen(&addresses[2]).await.unwrap(), ()); + assert_eq!(reputation.increment_seen(&addresses[3]).await.unwrap(), ()); + assert_eq!(reputation.increment_seen(&addresses[3]).await.unwrap(), ()); + + assert_eq!( + reputation.increment_included(&addresses[2]).await.unwrap(), + () + ); + assert_eq!( + reputation.increment_included(&addresses[2]).await.unwrap(), + () + ); + assert_eq!( + reputation.increment_included(&addresses[3]).await.unwrap(), + () + ); + + assert_eq!( + reputation + .update_handle_ops_reverted(&addresses[3]) + .await + .unwrap(), + () + ); + + for _ in 0..250 { + assert_eq!(reputation.increment_seen(&addresses[3]).await.unwrap(), ()); + } + assert_eq!( + reputation.get_status(&addresses[3]).await.unwrap(), + ReputationStatus::THROTTLED + ); + + for _ in 0..500 { + assert_eq!(reputation.increment_seen(&addresses[3]).await.unwrap(), ()); + } + assert_eq!( + reputation.get_status(&addresses[3]).await.unwrap(), + ReputationStatus::BANNED + ); + } +} diff --git a/src/uopool/mod.rs b/src/uopool/mod.rs index d7000b98..b0c4ae3d 100644 --- a/src/uopool/mod.rs +++ b/src/uopool/mod.rs @@ -1,9 +1,16 @@ use crate::{ - types::user_operation::{UserOperation, UserOperationHash}, + types::{ + reputation::{ + ReputationEntry, ReputationStatus, StakeInfo, BAN_SLACK, + MIN_INCLUSION_RATE_DENOMINATOR, THROTTLING_SLACK, + }, + user_operation::{UserOperation, UserOperationHash}, + }, uopool::{ - memory::MemoryMempool, server::uopool::uo_pool_server::UoPoolServer, - services::UoPoolService, + memory_mempool::MemoryMempool, memory_reputation::MemoryReputation, + server::uopool::uo_pool_server::UoPoolServer, services::UoPoolService, }, + utils::parse_u256, }; use anyhow::Result; use async_trait::async_trait; @@ -18,18 +25,20 @@ use jsonrpsee::tracing::info; use parking_lot::RwLock; use std::{collections::HashMap, fmt::Debug, net::SocketAddr, sync::Arc, time::Duration}; -pub mod memory; +pub mod memory_mempool; +pub mod memory_reputation; pub mod server; pub mod services; pub type MempoolId = H256; +pub type MempoolBox = Box>; +pub type ReputationBox = Box>; + pub fn mempool_id(entry_point: Address, chain_id: U256) -> MempoolId { H256::from_slice(keccak256([entry_point.encode(), chain_id.encode()].concat()).as_slice()) } -pub type MempoolBox = Box>; - #[async_trait] pub trait Mempool: Debug + Send + Sync + 'static { type UserOperations: IntoIterator; @@ -41,10 +50,50 @@ pub trait Mempool: Debug + Send + Sync + 'static { chain_id: U256, ) -> anyhow::Result; async fn get(&self, user_operation_hash: UserOperationHash) -> anyhow::Result; - async fn get_all(&self) -> anyhow::Result; async fn get_all_by_sender(&self, sender: Address) -> anyhow::Result; async fn remove(&mut self, user_operation_hash: UserOperationHash) -> anyhow::Result<()>; - async fn clear(&mut self) -> anyhow::Result<()>; + + #[cfg(debug_assertions)] + fn get_all(&self) -> Self::UserOperations; + + #[cfg(debug_assertions)] + fn clear(&mut self); +} + +#[async_trait] +pub trait Reputation: Debug + Send + Sync + 'static { + type ReputationEntries: IntoIterator; + + fn init( + &mut self, + min_inclusion_denominator: u64, + throttling_slack: u64, + ban_slack: u64, + min_stake: U256, + min_unstake_delay: U256, + ); + async fn get(&mut self, address: &Address) -> anyhow::Result; + async fn increment_seen(&mut self, address: &Address) -> anyhow::Result<()>; + async fn increment_included(&mut self, address: &Address) -> anyhow::Result<()>; + fn update_hourly(&mut self); + async fn add_whitelist(&mut self, address: &Address) -> anyhow::Result<()>; + async fn remove_whitelist(&mut self, address: &Address) -> anyhow::Result; + async fn is_whitelist(&self, address: &Address) -> anyhow::Result; + async fn add_blacklist(&mut self, address: &Address) -> anyhow::Result<()>; + async fn remove_blacklist(&mut self, address: &Address) -> anyhow::Result; + async fn is_blacklist(&self, address: &Address) -> anyhow::Result; + async fn get_status(&self, address: &Address) -> anyhow::Result; + async fn update_handle_ops_reverted(&mut self, address: &Address) -> anyhow::Result<()>; + async fn verify_stake(&self, title: &str, stake_info: Option) -> anyhow::Result<()>; + + #[cfg(debug_assertions)] + fn set(&mut self, reputation_entries: Self::ReputationEntries); + + #[cfg(debug_assertions)] + fn get_all(&self) -> Self::ReputationEntries; + + #[cfg(debug_assertions)] + fn clear(&mut self); } #[derive(Educe)] @@ -53,11 +102,16 @@ pub struct UserOperationPool { pub pool: Arc, } -#[derive(Clone, Copy, Educe, Parser)] -#[educe(Debug)] +#[derive(Clone, Copy, Debug, Parser, PartialEq)] pub struct UoPoolOpts { #[clap(long, default_value = "127.0.0.1:3001")] pub uopool_grpc_listen_address: SocketAddr, + + #[clap(long, value_parser=parse_u256, default_value = "1")] + pub min_stake: U256, + + #[clap(long, value_parser=parse_u256, default_value = "0")] + pub min_unstake_delay: U256, } pub async fn run(opts: UoPoolOpts, entry_points: Vec
, chain_id: U256) -> Result<()> { @@ -65,12 +119,40 @@ pub async fn run(opts: UoPoolOpts, entry_points: Vec
, chain_id: U256) - let mut builder = tonic::transport::Server::builder(); let mut mempools = HashMap::>>::new(); + let mut reputations = HashMap::>>::new(); + for entry_point in entry_points { let id = mempool_id(entry_point, chain_id); mempools.insert(id, Box::::default()); + + reputations.insert(id, Box::::default()); + if let Some(reputation) = reputations.get_mut(&id) { + reputation.init( + MIN_INCLUSION_RATE_DENOMINATOR, + THROTTLING_SLACK, + BAN_SLACK, + opts.min_stake, + opts.min_unstake_delay, + ); + } } - let svc = UoPoolServer::new(UoPoolService::new(Arc::new(RwLock::new(mempools)))); + let reputations = Arc::new(RwLock::new(reputations)); + + let svc = UoPoolServer::new(UoPoolService::new( + Arc::new(RwLock::new(mempools)), + reputations.clone(), + chain_id, + )); + + tokio::spawn(async move { + loop { + for reputation in reputations.write().values_mut() { + reputation.update_hourly(); + } + tokio::time::sleep(Duration::from_secs(60 * 60)).await; + } + }); info!( "UoPool gRPC server starting on {}", diff --git a/src/uopool/server.rs b/src/uopool/server.rs index bfb30b80..5ba22b90 100644 --- a/src/uopool/server.rs +++ b/src/uopool/server.rs @@ -112,6 +112,51 @@ pub mod types { } } } + + impl From for ReputationEntry { + fn from(reputation_entry: crate::types::reputation::ReputationEntry) -> Self { + Self { + address: Some(reputation_entry.address.into()), + uo_seen: reputation_entry.uo_seen, + uo_included: reputation_entry.uo_included, + status: match reputation_entry.status { + crate::types::reputation::ReputationStatus::OK => ReputationStatus::Ok, + crate::types::reputation::ReputationStatus::THROTTLED => { + ReputationStatus::Throttled + } + crate::types::reputation::ReputationStatus::BANNED => ReputationStatus::Banned, + } as i32, + } + } + } + + impl From for crate::types::reputation::ReputationEntry { + fn from(reputation_entry: ReputationEntry) -> Self { + Self { + address: { + if let Some(address) = reputation_entry.address { + address.into() + } else { + Address::zero() + } + }, + uo_seen: reputation_entry.uo_seen, + uo_included: reputation_entry.uo_included, + status: match reputation_entry.status { + _ if reputation_entry.status == ReputationStatus::Ok as i32 => { + crate::types::reputation::ReputationStatus::OK + } + _ if reputation_entry.status == ReputationStatus::Throttled as i32 => { + crate::types::reputation::ReputationStatus::THROTTLED + } + _ if reputation_entry.status == ReputationStatus::Banned as i32 => { + crate::types::reputation::ReputationStatus::BANNED + } + _ => crate::types::reputation::ReputationStatus::OK, + }, + } + } + } } pub mod uopool { diff --git a/src/uopool/services/uopool.rs b/src/uopool/services/uopool.rs index d4c77679..7ceb99d1 100644 --- a/src/uopool/services/uopool.rs +++ b/src/uopool/services/uopool.rs @@ -1,14 +1,18 @@ use crate::{ - types::user_operation::UserOperation, + types::{reputation::ReputationEntry, user_operation::UserOperation}, uopool::{ + mempool_id, server::uopool::{ - uo_pool_server::UoPool, AddRequest, AddResponse, AddResult, AllRequest, AllResponse, - RemoveRequest, RemoveResponse, + uo_pool_server::UoPool, AddRequest, AddResponse, AddResult, ClearRequest, + ClearResponse, ClearResult, GetAllReputationRequest, GetAllReputationResponse, + GetAllReputationResult, GetAllRequest, GetAllResponse, GetAllResult, RemoveRequest, + RemoveResponse, SetReputationRequest, SetReputationResponse, SetReputationResult, }, - MempoolBox, MempoolId, + MempoolBox, MempoolId, ReputationBox, }, }; use async_trait::async_trait; +use ethers::types::{Address, U256}; use jsonrpsee::{tracing::info, types::ErrorObject}; use parking_lot::RwLock; use serde_json::json; @@ -18,13 +22,21 @@ use tonic::Response; pub type UoPoolError = ErrorObject<'static>; pub struct UoPoolService { - _mempools: Arc>>>>, + pub mempools: Arc>>>>, + pub reputations: Arc>>>>, + pub chain_id: U256, } impl UoPoolService { - pub fn new(mempools: Arc>>>>) -> Self { + pub fn new( + mempools: Arc>>>>, + reputations: Arc>>>>, + chain_id: U256, + ) -> Self { Self { - _mempools: mempools, + mempools, + reputations, + chain_id, } } } @@ -49,6 +61,8 @@ impl UoPool for UoPoolService { // TODO: sanity checks // TODO: simulation + // TODO: make something with reputation + let uo_pool_error = UoPoolError::owned( -32602, "user operation was not added", @@ -74,10 +88,115 @@ impl UoPool for UoPoolService { Err(tonic::Status::unimplemented("todo")) } - async fn all( + #[cfg(debug_assertions)] + async fn clear( &self, - _request: tonic::Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("todo")) + _request: tonic::Request, + ) -> Result, tonic::Status> { + for mempool in self.mempools.write().values_mut() { + mempool.clear(); + } + + for reputation in self.reputations.write().values_mut() { + reputation.clear(); + } + + Ok(tonic::Response::new(ClearResponse { + result: ClearResult::Cleared as i32, + })) + } + + #[cfg(debug_assertions)] + async fn get_all( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let req = request.into_inner(); + let mut res = GetAllResponse::default(); + + if let Some(entry_point) = req.ep { + let entry_point: Address = entry_point + .try_into() + .map_err(|_| tonic::Status::invalid_argument("invalid entry point"))?; + + if let Some(mempool) = self + .mempools + .read() + .get(&mempool_id(entry_point, self.chain_id)) + { + res.result = GetAllResult::GotAll as i32; + res.uos = mempool + .get_all() + .iter() + .map(|uo| uo.clone().into()) + .collect(); + } else { + res.result = GetAllResult::NotGotAll as i32; + } + + return Ok(tonic::Response::new(res)); + } + + Err(tonic::Status::invalid_argument("missing entry point")) + } + + #[cfg(debug_assertions)] + async fn set_reputation( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let req = request.into_inner(); + let mut res = SetReputationResponse::default(); + + if let Some(entry_point) = req.ep { + let entry_point: Address = entry_point + .try_into() + .map_err(|_| tonic::Status::invalid_argument("invalid entry point"))?; + + if let Some(reputation) = self + .reputations + .write() + .get_mut(&mempool_id(entry_point, self.chain_id)) + { + reputation.set(req.res.iter().map(|re| re.clone().into()).collect()); + res.result = SetReputationResult::SetReputation as i32; + } else { + res.result = SetReputationResult::NotSetReputation as i32; + } + + return Ok(tonic::Response::new(res)); + } + + Err(tonic::Status::invalid_argument("missing entry point")) + } + + #[cfg(debug_assertions)] + async fn get_all_reputation( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let req = request.into_inner(); + let mut res = GetAllReputationResponse::default(); + + if let Some(entry_point) = req.ep { + let entry_point: Address = entry_point + .try_into() + .map_err(|_| tonic::Status::invalid_argument("invalid entry point"))?; + + if let Some(reputation) = self + .reputations + .read() + .get(&mempool_id(entry_point, self.chain_id)) + { + res.result = GetAllReputationResult::GotAllReputation as i32; + res.res = reputation.get_all().iter().map(|re| (*re).into()).collect(); + } else { + res.result = GetAllReputationResult::NotGotAllReputation as i32; + } + + return Ok(tonic::Response::new(res)); + }; + + Err(tonic::Status::invalid_argument("missing entry point")) } }