Skip to content

Commit

Permalink
geyser: allow to disable unary methods (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jul 22, 2023
1 parent 9dfe79f commit 5ae7536
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 49 deletions.
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

geyser: add panic config option on failed block reconstruction ([#162](https://github.com/rpcpool/yellowstone-grpc/pull/162)).

### Fixes

### Breaking

## 2023-07-22

- yellowstone-grpc-geyser-1.6.0+solana.1.16.1

### Features

geyser: add panic config option on failed block reconstruction ([#165](https://github.com/rpcpool/yellowstone-grpc/pull/165)).
geyser: allow to disable unary methods ([#166](https://github.com/rpcpool/yellowstone-grpc/pull/166)).

## 2023-07-20

- @triton-one/yellowstone-grpc:0.2.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
members = [
"examples/rust", # 1.8.0+solana.1.16.1
"yellowstone-grpc-client", # 1.8.0+solana.1.16.1
"yellowstone-grpc-geyser", # 1.5.0+solana.1.16.1
"yellowstone-grpc-geyser", # 1.6.0+solana.1.16.1
"yellowstone-grpc-proto", # 1.8.0+solana.1.16.1
]

Expand Down
18 changes: 15 additions & 3 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use {
backoff::{future::retry, ExponentialBackoff},
clap::{Parser, Subcommand, ValueEnum},
futures::{sink::SinkExt, stream::StreamExt},
futures::{future::TryFutureExt, sink::SinkExt, stream::StreamExt},
log::{error, info},
solana_sdk::pubkey::Pubkey,
std::{collections::HashMap, env},
std::{
collections::HashMap,
env,
sync::{Arc, Mutex},
},
yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError},
yellowstone_grpc_proto::{
prelude::{
Expand Down Expand Up @@ -352,15 +356,22 @@ async fn main() -> anyhow::Result<()> {
env_logger::init();

let args = Args::parse();
let zero_attempts = Arc::new(Mutex::new(true));

// The default exponential backoff strategy intervals:
// [500ms, 750ms, 1.125s, 1.6875s, 2.53125s, 3.796875s, 5.6953125s,
// 8.5s, 12.8s, 19.2s, 28.8s, 43.2s, 64.8s, 97s, ... ]
retry(ExponentialBackoff::default(), move || {
let args = args.clone();
let zero_attempts = Arc::clone(&zero_attempts);

async move {
info!("Retry to connect to the server");
let mut zero_attempts = zero_attempts.lock().unwrap();
if *zero_attempts {
*zero_attempts = false;
} else {
info!("Retry to connect to the server");
}

let commitment = args.get_commitment();
let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)
Expand Down Expand Up @@ -417,6 +428,7 @@ async fn main() -> anyhow::Result<()> {

Ok::<(), backoff::Error<anyhow::Error>>(())
}
.inspect_err(|error| error!("failed to connect: {error}"))
})
.await
.map_err(Into::into)
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.5.0+solana.1.16.1"
version = "1.6.0+solana.1.16.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
2 changes: 2 additions & 0 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"grpc": {
"address": "0.0.0.0:10000",
"channel_capacity": "100_000",
"unary_concurrency_limit": 100,
"unary_disabled": false,
"filters": {
"accounts": {
"max": 1,
Expand Down
14 changes: 14 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
solana_sdk::pubkey::Pubkey,
std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path},
tokio::sync::Semaphore,
};

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -67,6 +68,15 @@ pub struct ConfigGrpc {
deserialize_with = "deserialize_usize_str"
)]
pub channel_capacity: usize,
/// Concurrency limit for unary requests
#[serde(
default = "ConfigGrpc::unary_concurrency_limit_default",
deserialize_with = "deserialize_usize_str"
)]
pub unary_concurrency_limit: usize,
/// Enable/disable unary methods
#[serde(default)]
pub unary_disabled: bool,
/// Limits for possible filters
#[serde(default)]
pub filters: ConfigGrpcFilters,
Expand All @@ -76,6 +86,10 @@ impl ConfigGrpc {
const fn channel_capacity_default() -> usize {
250_000
}

const fn unary_concurrency_limit_default() -> usize {
Semaphore::MAX_PERMITS
}
}

#[derive(Debug, Default, Clone, Deserialize)]
Expand Down
115 changes: 74 additions & 41 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use {
},
},
tokio::{
sync::{broadcast, mpsc, oneshot, RwLock},
sync::{broadcast, mpsc, oneshot, RwLock, Semaphore},
time::{sleep, Duration, Instant},
},
tokio_stream::wrappers::ReceiverStream,
Expand Down Expand Up @@ -444,11 +444,12 @@ struct BlockMetaStorageInner {

#[derive(Debug)]
struct BlockMetaStorage {
read_sem: Semaphore,
inner: Arc<RwLock<BlockMetaStorageInner>>,
}

impl BlockMetaStorage {
fn new() -> (Self, mpsc::UnboundedSender<Message>) {
fn new(unary_concurrency_limit: usize) -> (Self, mpsc::UnboundedSender<Message>) {
let inner = Arc::new(RwLock::new(BlockMetaStorageInner::default()));
let (tx, mut rx) = mpsc::unbounded_channel();

Expand Down Expand Up @@ -505,7 +506,13 @@ impl BlockMetaStorage {
}
});

(Self { inner }, tx)
(
Self {
read_sem: Semaphore::new(unary_concurrency_limit),
inner,
},
tx,
)
}

fn parse_commitment(commitment: Option<i32>) -> Result<CommitmentLevel, Status> {
Expand All @@ -525,6 +532,7 @@ impl BlockMetaStorage {
F: FnOnce(&MessageBlockMeta) -> Option<T>,
{
let commitment = Self::parse_commitment(commitment)?;
let _permit = self.read_sem.acquire().await;
let storage = self.inner.read().await;

let slot = match commitment {
Expand All @@ -548,6 +556,7 @@ impl BlockMetaStorage {
commitment: Option<i32>,
) -> Result<Response<IsBlockhashValidResponse>, Status> {
let commitment = Self::parse_commitment(commitment)?;
let _permit = self.read_sem.acquire().await;
let storage = self.inner.read().await;

if storage.blockhashes.len() < MAX_RECENT_BLOCKHASHES + 32 {
Expand Down Expand Up @@ -578,7 +587,7 @@ impl BlockMetaStorage {
#[derive(Debug)]
pub struct GrpcService {
config: ConfigGrpc,
blocks_meta: BlockMetaStorage,
blocks_meta: Option<BlockMetaStorage>,
subscribe_id: AtomicUsize,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
}
Expand All @@ -599,7 +608,13 @@ impl GrpcService {
)?;

// Blocks meta storage
let (blocks_meta, blocks_meta_tx) = BlockMetaStorage::new();
let (blocks_meta, blocks_meta_tx) = if config.unary_disabled {
(None, None)
} else {
let (blocks_meta, blocks_meta_tx) =
BlockMetaStorage::new(config.unary_concurrency_limit);
(Some(blocks_meta), Some(blocks_meta_tx))
};

// Messages to clients combined by commitment
let (broadcast_tx, _) = broadcast::channel(config.channel_capacity);
Expand Down Expand Up @@ -645,7 +660,7 @@ impl GrpcService {

async fn geyser_loop(
mut messages_rx: mpsc::UnboundedReceiver<Message>,
blocks_meta_tx: mpsc::UnboundedSender<Message>,
blocks_meta_tx: Option<mpsc::UnboundedSender<Message>>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
block_fail_action: ConfigBlockFailAction,
) {
Expand Down Expand Up @@ -773,8 +788,10 @@ impl GrpcService {
Some(mut message) = messages_rx.recv() => {
MESSAGE_QUEUE_SIZE.dec();

if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
}
}

// consctruct Block message
Expand Down Expand Up @@ -1013,58 +1030,74 @@ impl Geyser for GrpcService {
&self,
request: Request<GetLatestBlockhashRequest>,
) -> Result<Response<GetLatestBlockhashResponse>, Status> {
self.blocks_meta
.get_block(
|block| {
block
.block_height
.map(|last_valid_block_height| GetLatestBlockhashResponse {
slot: block.slot,
blockhash: block.blockhash.clone(),
last_valid_block_height,
if let Some(blocks_meta) = &self.blocks_meta {
blocks_meta
.get_block(
|block| {
block.block_height.map(|last_valid_block_height| {
GetLatestBlockhashResponse {
slot: block.slot,
blockhash: block.blockhash.clone(),
last_valid_block_height,
}
})
},
request.get_ref().commitment,
)
.await
},
request.get_ref().commitment,
)
.await
} else {
Err(Status::unimplemented("method disabled"))
}
}

async fn get_block_height(
&self,
request: Request<GetBlockHeightRequest>,
) -> Result<Response<GetBlockHeightResponse>, Status> {
self.blocks_meta
.get_block(
|block| {
block
.block_height
.map(|block_height| GetBlockHeightResponse { block_height })
},
request.get_ref().commitment,
)
.await
if let Some(blocks_meta) = &self.blocks_meta {
blocks_meta
.get_block(
|block| {
block
.block_height
.map(|block_height| GetBlockHeightResponse { block_height })
},
request.get_ref().commitment,
)
.await
} else {
Err(Status::unimplemented("method disabled"))
}
}

async fn get_slot(
&self,
request: Request<GetSlotRequest>,
) -> Result<Response<GetSlotResponse>, Status> {
self.blocks_meta
.get_block(
|block| Some(GetSlotResponse { slot: block.slot }),
request.get_ref().commitment,
)
.await
if let Some(blocks_meta) = &self.blocks_meta {
blocks_meta
.get_block(
|block| Some(GetSlotResponse { slot: block.slot }),
request.get_ref().commitment,
)
.await
} else {
Err(Status::unimplemented("method disabled"))
}
}

async fn is_blockhash_valid(
&self,
request: Request<IsBlockhashValidRequest>,
) -> Result<Response<IsBlockhashValidResponse>, Status> {
let req = request.get_ref();
self.blocks_meta
.is_blockhash_valid(&req.blockhash, req.commitment)
.await
if let Some(blocks_meta) = &self.blocks_meta {
let req = request.get_ref();
blocks_meta
.is_blockhash_valid(&req.blockhash, req.commitment)
.await
} else {
Err(Status::unimplemented("method disabled"))
}
}

async fn get_version(
Expand Down

0 comments on commit 5ae7536

Please sign in to comment.