Skip to content

Commit

Permalink
geyser: add panic config option on failed block reconstruction (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jul 22, 2023
1 parent b482e67 commit 9dfe79f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

geyser: add panic config option on failed block reconstruction ([#162](https://github.com/rpcpool/yellowstone-grpc/pull/162)).

### Fixes

### Breaking
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ $ solana-validator --geyser-plugin-config yellowstone-grpc-geyser/config.json
cargo-fmt && cargo run --bin config-check -- --config yellowstone-grpc-geyser/config.json
```

### Block reconstruction

Geyser interface on block update do not provide detailed information about transactions and accounts updates. To provide this information with block message we need to collect all messages and expect specified order. By default if we failed to reconstruct full block we log error message and increase `invalid_full_blocks_total` counter in prometheus metrics. If you want to panic on invalid reconstruction you can change option `block_fail_action` in config to `panic` (default value is `log`).

### Filters

See [yellowstone-grpc-proto/proto/geyser.proto](yellowstone-grpc-proto/proto/geyser.proto).
Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@
},
"prometheus": {
"address": "0.0.0.0:8999"
}
},
"block_fail_action": "log"
}
16 changes: 16 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct Config {
pub grpc: ConfigGrpc,
#[serde(default)]
pub prometheus: Option<ConfigPrometheus>,
/// Action on block re-construction error
#[serde(default)]
pub block_fail_action: ConfigBlockFailAction,
}

impl Config {
Expand Down Expand Up @@ -251,6 +254,19 @@ pub struct ConfigPrometheus {
pub address: SocketAddr,
}

#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ConfigBlockFailAction {
Log,
Panic,
}

impl Default for ConfigBlockFailAction {
fn default() -> Self {
Self::Log
}
}

fn deserialize_usize_str<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
Expand Down
41 changes: 32 additions & 9 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
config::ConfigGrpc,
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{CONNECTIONS_TOTAL, INVALID_FULL_BLOCKS, MESSAGE_QUEUE_SIZE},
proto::{
Expand Down Expand Up @@ -586,6 +586,7 @@ pub struct GrpcService {
impl GrpcService {
pub fn create(
config: ConfigGrpc,
block_fail_action: ConfigBlockFailAction,
) -> Result<
(mpsc::UnboundedSender<Message>, oneshot::Sender<()>),
Box<dyn std::error::Error + Send + Sync>,
Expand Down Expand Up @@ -615,7 +616,12 @@ impl GrpcService {

// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
tokio::spawn(Self::geyser_loop(messages_rx, blocks_meta_tx, broadcast_tx));
tokio::spawn(Self::geyser_loop(
messages_rx,
blocks_meta_tx,
broadcast_tx,
block_fail_action,
));

// Run Server
let (shutdown_tx, shutdown_rx) = oneshot::channel();
Expand All @@ -641,6 +647,7 @@ impl GrpcService {
mut messages_rx: mpsc::UnboundedReceiver<Message>,
blocks_meta_tx: mpsc::UnboundedSender<Message>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
block_fail_action: ConfigBlockFailAction,
) {
const PROCESSED_MESSAGES_MAX: usize = 31;
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
Expand Down Expand Up @@ -720,7 +727,15 @@ impl GrpcService {
let processed_message = $message.clone();
let (vec, map, collected) = messages.entry($message.get_slot()).or_default();
if *collected && !matches!(&$message, Message::Block(_) | Message::BlockMeta(_)) {
error!("unexpected message order for slot {}", $message.get_slot());
match block_fail_action {
ConfigBlockFailAction::Log => {
INVALID_FULL_BLOCKS.inc();
error!("unexpected message order for slot {}", $message.get_slot());
}
ConfigBlockFailAction::Panic => {
panic!("unexpected message order for slot {}", $message.get_slot());
}
}
}
if let Message::Account(message) = &$message {
let write_version = message.account.write_version;
Expand Down Expand Up @@ -780,10 +795,11 @@ impl GrpcService {
transactions.get(&slot),
Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len()
) {
let (block_meta, mut transactions) = transactions.remove(&slot).expect("checked");
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let mut message = Message::Block((block_meta.expect("checked"), transactions).into());
process_message!(message);
if let Some((Some(block_meta), mut transactions)) = transactions.remove(&slot) {
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let mut message = Message::Block((block_meta, transactions).into());
process_message!(message);
}
}

// remove outdated transactions
Expand All @@ -797,8 +813,15 @@ impl GrpcService {
// Maybe log error
Some(kslot) if kslot == slot => {
if let Some((Some(_), vec)) = transactions.remove(&kslot) {
INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
match block_fail_action {
ConfigBlockFailAction::Log => {
INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
}
ConfigBlockFailAction::Panic => {
panic!("{} transactions left for block {kslot}", vec.len());
}
}
}
}
_ => break,
Expand Down
5 changes: 3 additions & 2 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl GeyserPlugin for Plugin {
let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;

let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move {
let (grpc_channel, grpc_shutdown_tx) = GrpcService::create(config.grpc)
.map_err(|error| GeyserPluginError::Custom(error))?;
let (grpc_channel, grpc_shutdown_tx) =
GrpcService::create(config.grpc, config.block_fail_action)
.map_err(|error| GeyserPluginError::Custom(error))?;
let prometheus = PrometheusService::new(config.prometheus)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus))
Expand Down

0 comments on commit 9dfe79f

Please sign in to comment.