diff --git a/CHANGELOG.md b/CHANGELOG.md index 9885a4a3..edbb96fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,12 +12,19 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features -geyser: add panic config option on failed block reconstruction ([#162](https://github.com/rpcpool/yellowstone-grpc/pull/162)). - ### Fixes ### Breaking +## 2023-07-22 + +- yellowstone-grpc-geyser-1.6.0+solana.1.16.1 + +### Features + +geyser: add panic config option on failed block reconstruction ([#165](https://github.com/rpcpool/yellowstone-grpc/pull/165)). +geyser: allow to disable unary methods ([#166](https://github.com/rpcpool/yellowstone-grpc/pull/166)). + ## 2023-07-20 - @triton-one/yellowstone-grpc:0.2.0 diff --git a/Cargo.lock b/Cargo.lock index fae7b10b..f43b5ece 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4293,7 +4293,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.5.0+solana.1.16.1" +version = "1.6.0+solana.1.16.1" dependencies = [ "anyhow", "base64 0.21.2", diff --git a/Cargo.toml b/Cargo.toml index ccf06336..15bd06fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ "examples/rust", # 1.8.0+solana.1.16.1 "yellowstone-grpc-client", # 1.8.0+solana.1.16.1 - "yellowstone-grpc-geyser", # 1.5.0+solana.1.16.1 + "yellowstone-grpc-geyser", # 1.6.0+solana.1.16.1 "yellowstone-grpc-proto", # 1.8.0+solana.1.16.1 ] diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index a6b35b34..0be80b40 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -1,10 +1,14 @@ use { backoff::{future::retry, ExponentialBackoff}, clap::{Parser, Subcommand, ValueEnum}, - futures::{sink::SinkExt, stream::StreamExt}, + futures::{future::TryFutureExt, sink::SinkExt, stream::StreamExt}, log::{error, info}, solana_sdk::pubkey::Pubkey, - std::{collections::HashMap, env}, + std::{ + collections::HashMap, + env, + sync::{Arc, Mutex}, + }, yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}, yellowstone_grpc_proto::{ prelude::{ @@ -352,15 +356,22 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); let args = Args::parse(); + let zero_attempts = Arc::new(Mutex::new(true)); // The default exponential backoff strategy intervals: // [500ms, 750ms, 1.125s, 1.6875s, 2.53125s, 3.796875s, 5.6953125s, // 8.5s, 12.8s, 19.2s, 28.8s, 43.2s, 64.8s, 97s, ... ] retry(ExponentialBackoff::default(), move || { let args = args.clone(); + let zero_attempts = Arc::clone(&zero_attempts); async move { - info!("Retry to connect to the server"); + let mut zero_attempts = zero_attempts.lock().unwrap(); + if *zero_attempts { + *zero_attempts = false; + } else { + info!("Retry to connect to the server"); + } let commitment = args.get_commitment(); let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None) @@ -417,6 +428,7 @@ async fn main() -> anyhow::Result<()> { Ok::<(), backoff::Error>(()) } + .inspect_err(|error| error!("failed to connect: {error}")) }) .await .map_err(Into::into) diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 0dcb154f..79011b4e 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.5.0+solana.1.16.1" +version = "1.6.0+solana.1.16.1" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index b62ebd73..539f5ee3 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -6,6 +6,8 @@ "grpc": { "address": "0.0.0.0:10000", "channel_capacity": "100_000", + "unary_concurrency_limit": 100, + "unary_disabled": false, "filters": { "accounts": { "max": 1, diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index 4956598f..65f20560 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -5,6 +5,7 @@ use { }, solana_sdk::pubkey::Pubkey, std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path}, + tokio::sync::Semaphore, }; #[derive(Debug, Clone, Deserialize)] @@ -67,6 +68,15 @@ pub struct ConfigGrpc { deserialize_with = "deserialize_usize_str" )] pub channel_capacity: usize, + /// Concurrency limit for unary requests + #[serde( + default = "ConfigGrpc::unary_concurrency_limit_default", + deserialize_with = "deserialize_usize_str" + )] + pub unary_concurrency_limit: usize, + /// Enable/disable unary methods + #[serde(default)] + pub unary_disabled: bool, /// Limits for possible filters #[serde(default)] pub filters: ConfigGrpcFilters, @@ -76,6 +86,10 @@ impl ConfigGrpc { const fn channel_capacity_default() -> usize { 250_000 } + + const fn unary_concurrency_limit_default() -> usize { + Semaphore::MAX_PERMITS + } } #[derive(Debug, Default, Clone, Deserialize)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 6d05f329..55a07bd4 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -37,7 +37,7 @@ use { }, }, tokio::{ - sync::{broadcast, mpsc, oneshot, RwLock}, + sync::{broadcast, mpsc, oneshot, RwLock, Semaphore}, time::{sleep, Duration, Instant}, }, tokio_stream::wrappers::ReceiverStream, @@ -444,11 +444,12 @@ struct BlockMetaStorageInner { #[derive(Debug)] struct BlockMetaStorage { + read_sem: Semaphore, inner: Arc>, } impl BlockMetaStorage { - fn new() -> (Self, mpsc::UnboundedSender) { + fn new(unary_concurrency_limit: usize) -> (Self, mpsc::UnboundedSender) { let inner = Arc::new(RwLock::new(BlockMetaStorageInner::default())); let (tx, mut rx) = mpsc::unbounded_channel(); @@ -505,7 +506,13 @@ impl BlockMetaStorage { } }); - (Self { inner }, tx) + ( + Self { + read_sem: Semaphore::new(unary_concurrency_limit), + inner, + }, + tx, + ) } fn parse_commitment(commitment: Option) -> Result { @@ -525,6 +532,7 @@ impl BlockMetaStorage { F: FnOnce(&MessageBlockMeta) -> Option, { let commitment = Self::parse_commitment(commitment)?; + let _permit = self.read_sem.acquire().await; let storage = self.inner.read().await; let slot = match commitment { @@ -548,6 +556,7 @@ impl BlockMetaStorage { commitment: Option, ) -> Result, Status> { let commitment = Self::parse_commitment(commitment)?; + let _permit = self.read_sem.acquire().await; let storage = self.inner.read().await; if storage.blockhashes.len() < MAX_RECENT_BLOCKHASHES + 32 { @@ -578,7 +587,7 @@ impl BlockMetaStorage { #[derive(Debug)] pub struct GrpcService { config: ConfigGrpc, - blocks_meta: BlockMetaStorage, + blocks_meta: Option, subscribe_id: AtomicUsize, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, } @@ -599,7 +608,13 @@ impl GrpcService { )?; // Blocks meta storage - let (blocks_meta, blocks_meta_tx) = BlockMetaStorage::new(); + let (blocks_meta, blocks_meta_tx) = if config.unary_disabled { + (None, None) + } else { + let (blocks_meta, blocks_meta_tx) = + BlockMetaStorage::new(config.unary_concurrency_limit); + (Some(blocks_meta), Some(blocks_meta_tx)) + }; // Messages to clients combined by commitment let (broadcast_tx, _) = broadcast::channel(config.channel_capacity); @@ -645,7 +660,7 @@ impl GrpcService { async fn geyser_loop( mut messages_rx: mpsc::UnboundedReceiver, - blocks_meta_tx: mpsc::UnboundedSender, + blocks_meta_tx: Option>, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, block_fail_action: ConfigBlockFailAction, ) { @@ -773,8 +788,10 @@ impl GrpcService { Some(mut message) = messages_rx.recv() => { MESSAGE_QUEUE_SIZE.dec(); - if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { - let _ = blocks_meta_tx.send(message.clone()); + if let Some(blocks_meta_tx) = &blocks_meta_tx { + if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { + let _ = blocks_meta_tx.send(message.clone()); + } } // consctruct Block message @@ -1013,58 +1030,74 @@ impl Geyser for GrpcService { &self, request: Request, ) -> Result, Status> { - self.blocks_meta - .get_block( - |block| { - block - .block_height - .map(|last_valid_block_height| GetLatestBlockhashResponse { - slot: block.slot, - blockhash: block.blockhash.clone(), - last_valid_block_height, + if let Some(blocks_meta) = &self.blocks_meta { + blocks_meta + .get_block( + |block| { + block.block_height.map(|last_valid_block_height| { + GetLatestBlockhashResponse { + slot: block.slot, + blockhash: block.blockhash.clone(), + last_valid_block_height, + } }) - }, - request.get_ref().commitment, - ) - .await + }, + request.get_ref().commitment, + ) + .await + } else { + Err(Status::unimplemented("method disabled")) + } } async fn get_block_height( &self, request: Request, ) -> Result, Status> { - self.blocks_meta - .get_block( - |block| { - block - .block_height - .map(|block_height| GetBlockHeightResponse { block_height }) - }, - request.get_ref().commitment, - ) - .await + if let Some(blocks_meta) = &self.blocks_meta { + blocks_meta + .get_block( + |block| { + block + .block_height + .map(|block_height| GetBlockHeightResponse { block_height }) + }, + request.get_ref().commitment, + ) + .await + } else { + Err(Status::unimplemented("method disabled")) + } } async fn get_slot( &self, request: Request, ) -> Result, Status> { - self.blocks_meta - .get_block( - |block| Some(GetSlotResponse { slot: block.slot }), - request.get_ref().commitment, - ) - .await + if let Some(blocks_meta) = &self.blocks_meta { + blocks_meta + .get_block( + |block| Some(GetSlotResponse { slot: block.slot }), + request.get_ref().commitment, + ) + .await + } else { + Err(Status::unimplemented("method disabled")) + } } async fn is_blockhash_valid( &self, request: Request, ) -> Result, Status> { - let req = request.get_ref(); - self.blocks_meta - .is_blockhash_valid(&req.blockhash, req.commitment) - .await + if let Some(blocks_meta) = &self.blocks_meta { + let req = request.get_ref(); + blocks_meta + .is_blockhash_valid(&req.blockhash, req.commitment) + .await + } else { + Err(Status::unimplemented("method disabled")) + } } async fn get_version(