From 1d0d4dcbf487cfc40f193d63d6cad0fb02783c40 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 12 Jan 2023 11:37:58 -0300 Subject: [PATCH] Add transactions to block message, new block meta message (#27) --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 4 +++ proto/geyser.proto | 13 ++++++++ src/bin/client.rs | 14 +++++++-- src/config.rs | 7 +++++ src/filters.rs | 45 ++++++++++++++++++++++++--- src/grpc.rs | 76 ++++++++++++++++++++++++++++++++++++++-------- src/plugin.rs | 51 ++++++++++++++++++++++++++++--- 9 files changed, 188 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1b90e9f..f89a4f0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2554,7 +2554,7 @@ dependencies = [ [[package]] name = "solana-geyser-grpc" -version = "0.2.0" +version = "0.3.0+solana.1.14.10" dependencies = [ "anyhow", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 633b1863..3af55407 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "solana-geyser-grpc" -version = "0.2.0" +version = "0.3.0+solana.1.14.10" authors = ["Triton One"] edition = "2021" diff --git a/README.md b/README.md index 4ea4ee1a..93646109 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,10 @@ If all fields are empty then all transactions are broadcasted. Otherwise fields Currently all blocks are broadcasted. +#### Blocks meta + +Same as `Blocks` but without `transactions`. + ### Limit filters It's possible to add limits for filters in config. If `filters` field is omitted then filters doesn't have any limits. diff --git a/proto/geyser.proto b/proto/geyser.proto index e2ce9632..672e7139 100644 --- a/proto/geyser.proto +++ b/proto/geyser.proto @@ -13,6 +13,7 @@ message SubscribeRequest { map slots = 2; map transactions = 3; map blocks = 4; + map blocks_meta = 5; } message SubscribeRequestFilterAccounts { @@ -31,6 +32,8 @@ message SubscribeRequestFilterTransactions { message SubscribeRequestFilterBlocks {} +message SubscribeRequestFilterBlocksMeta {} + message SubscribeUpdate { repeated string filters = 1; oneof update_oneof { @@ -39,6 +42,7 @@ message SubscribeUpdate { SubscribeUpdateTransaction transaction = 4; SubscribeUpdateBlock block = 5; SubscribeUpdatePing ping = 6; + SubscribeUpdateBlockMeta block_meta = 7; } } @@ -90,6 +94,15 @@ message SubscribeUpdateBlock { solana.storage.ConfirmedBlock.Rewards rewards = 3; solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4; solana.storage.ConfirmedBlock.BlockHeight block_height = 5; + repeated SubscribeUpdateTransactionInfo transactions = 6; +} + +message SubscribeUpdateBlockMeta { + uint64 slot = 1; + string blockhash = 2; + solana.storage.ConfirmedBlock.Rewards rewards = 3; + solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4; + solana.storage.ConfirmedBlock.BlockHeight block_height = 5; } message SubscribeUpdatePing {} diff --git a/src/bin/client.rs b/src/bin/client.rs index 802b56b1..d2d00078 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -3,8 +3,8 @@ use { futures::stream::{once, StreamExt}, solana_geyser_grpc::proto::{ geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts, - SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, + SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, }, std::collections::HashMap, tonic::transport::{channel::ClientTlsConfig, Endpoint, Uri}, @@ -56,6 +56,10 @@ struct Args { #[clap(long)] /// Subscribe on block updates blocks: bool, + + #[clap(long)] + /// Subscribe on block meta updates (without transactions) + blocks_meta: bool, } #[tokio::main] @@ -96,6 +100,11 @@ async fn main() -> anyhow::Result<()> { blocks.insert("client".to_owned(), SubscribeRequestFilterBlocks {}); } + let mut blocks_meta = HashMap::new(); + if args.blocks_meta { + blocks_meta.insert("client".to_owned(), SubscribeRequestFilterBlocksMeta {}); + } + let mut endpoint = Endpoint::from_shared(args.endpoint.clone())?; let uri: Uri = args.endpoint.parse()?; if uri.scheme_str() == Some("https") { @@ -108,6 +117,7 @@ async fn main() -> anyhow::Result<()> { accounts, transactions, blocks, + blocks_meta, }; println!("Going to send request: {:?}", request); diff --git a/src/config.rs b/src/config.rs index fcb8ea91..6932a7d3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -82,6 +82,7 @@ pub struct ConfigGrpcFilters { pub slots: ConfigGrpcFiltersSlots, pub transactions: ConfigGrpcFiltersTransactions, pub blocks: ConfigGrpcFiltersBlocks, + pub blocks_meta: ConfigGrpcFiltersBlocksMeta, } impl ConfigGrpcFilters { @@ -157,6 +158,12 @@ pub struct ConfigGrpcFiltersBlocks { pub max: usize, } +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ConfigGrpcFiltersBlocksMeta { + pub max: usize, +} + #[derive(Debug, Clone, Copy, Deserialize)] #[serde(deny_unknown_fields)] pub struct ConfigPrometheus { diff --git a/src/filters.rs b/src/filters.rs index a2d58aaa..cc39d7b4 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -2,12 +2,16 @@ use { crate::{ config::{ ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks, - ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions, + ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions, + }, + grpc::{ + Message, MessageAccount, MessageBlock, MessageBlockMeta, MessageSlot, + MessageTransaction, }, - grpc::{Message, MessageAccount, MessageBlock, MessageSlot, MessageTransaction}, proto::{ SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, - SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, }, }, solana_sdk::pubkey::Pubkey, @@ -25,6 +29,7 @@ pub struct Filter { slots: FilterSlots, transactions: FilterTransactions, blocks: FilterBlocks, + blocks_meta: FilterBlocksMeta, } impl Filter { @@ -40,6 +45,7 @@ impl Filter { limit.map(|v| &v.transactions), )?, blocks: FilterBlocks::new(&config.blocks, limit.map(|v| &v.blocks))?, + blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, limit.map(|v| &v.blocks_meta))?, }) } @@ -67,6 +73,7 @@ impl Filter { Message::Slot(message) => self.slots.get_filters(message), Message::Transaction(message) => self.transactions.get_filters(message), Message::Block(message) => self.blocks.get_filters(message), + Message::BlockMeta(message) => self.blocks_meta.get_filters(message), } } } @@ -223,7 +230,7 @@ impl FilterSlots { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; } - Ok(FilterSlots { + Ok(Self { filters: configs .iter() // .filter_map(|(name, _filter)| Some(name.clone())) @@ -356,7 +363,7 @@ impl FilterBlocks { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; } - Ok(FilterBlocks { + Ok(Self { filters: configs .iter() // .filter_map(|(name, _filter)| Some(name.clone())) @@ -369,3 +376,31 @@ impl FilterBlocks { self.filters.clone() } } + +#[derive(Debug, Default)] +struct FilterBlocksMeta { + filters: Vec, +} + +impl FilterBlocksMeta { + fn new( + configs: &HashMap, + limit: Option<&ConfigGrpcFiltersBlocksMeta>, + ) -> anyhow::Result { + if let Some(limit) = limit { + ConfigGrpcFilters::check_max(configs.len(), limit.max)?; + } + + Ok(Self { + filters: configs + .iter() + // .filter_map(|(name, _filter)| Some(name.clone())) + .map(|(name, _filter)| name.clone()) + .collect(), + }) + } + + fn get_filters(&self, _message: &MessageBlockMeta) -> Vec { + self.filters.clone() + } +} diff --git a/src/grpc.rs b/src/grpc.rs index b06f8578..27681522 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -7,8 +7,9 @@ use { geyser_server::{Geyser, GeyserServer}, subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, - SubscribeUpdateBlock, SubscribeUpdatePing, SubscribeUpdateSlot, - SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing, + SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, + SubscribeUpdateTransactionInfo, }, }, log::*, @@ -102,7 +103,7 @@ impl From<(u64, Option, SlotStatus)> for MessageSlot { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessageTransactionInfo { pub signature: Signature, pub is_vote: bool, @@ -111,6 +112,18 @@ pub struct MessageTransactionInfo { pub index: usize, } +impl From<&MessageTransactionInfo> for SubscribeUpdateTransactionInfo { + fn from(tx: &MessageTransactionInfo) -> Self { + Self { + signature: tx.signature.as_ref().into(), + is_vote: tx.is_vote, + transaction: Some((&tx.transaction).into()), + meta: Some((&tx.meta).into()), + index: tx.index as u64, + } + } +} + #[derive(Debug)] pub struct MessageTransaction { pub transaction: MessageTransactionInfo, @@ -144,10 +157,45 @@ pub struct MessageBlock { pub rewards: Vec, pub block_time: Option, pub block_height: Option, + pub transactions: Vec, } -impl<'a> From> for MessageBlock { - fn from(blockinfo: ReplicaBlockInfoVersions<'a>) -> Self { +impl<'a> + From<( + &'a ReplicaBlockInfoVersions<'a>, + Vec, + )> for MessageBlock +{ + fn from( + (blockinfo, transactions): ( + &'a ReplicaBlockInfoVersions<'a>, + Vec, + ), + ) -> Self { + match blockinfo { + ReplicaBlockInfoVersions::V0_0_1(info) => Self { + slot: info.slot, + blockhash: info.blockhash.to_string(), + rewards: info.rewards.into(), + block_time: info.block_time, + block_height: info.block_height, + transactions, + }, + } + } +} + +#[derive(Debug)] +pub struct MessageBlockMeta { + pub slot: u64, + pub blockhash: String, + pub rewards: Vec, + pub block_time: Option, + pub block_height: Option, +} + +impl<'a> From<&'a ReplicaBlockInfoVersions<'a>> for MessageBlockMeta { + fn from(blockinfo: &'a ReplicaBlockInfoVersions<'a>) -> Self { match blockinfo { ReplicaBlockInfoVersions::V0_0_1(info) => Self { slot: info.slot, @@ -166,6 +214,7 @@ pub enum Message { Account(MessageAccount), Transaction(MessageTransaction), Block(MessageBlock), + BlockMeta(MessageBlockMeta), } impl From<&Message> for UpdateOneof { @@ -191,13 +240,7 @@ impl From<&Message> for UpdateOneof { is_startup: message.is_startup, }), Message::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction { - transaction: Some(SubscribeUpdateTransactionInfo { - signature: message.transaction.signature.as_ref().into(), - is_vote: message.transaction.is_vote, - transaction: Some((&message.transaction.transaction).into()), - meta: Some((&message.transaction.meta).into()), - index: message.transaction.index as u64, - }), + transaction: Some((&message.transaction).into()), slot: message.slot, }), Message::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { @@ -206,6 +249,14 @@ impl From<&Message> for UpdateOneof { rewards: Some(message.rewards.as_slice().into()), block_time: message.block_time.map(|v| v.into()), block_height: message.block_height.map(|v| v.into()), + transactions: message.transactions.iter().map(Into::into).collect(), + }), + Message::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta { + slot: message.slot, + blockhash: message.blockhash.clone(), + rewards: Some(message.rewards.as_slice().into()), + block_time: message.block_time.map(|v| v.into()), + block_height: message.block_height.map(|v| v.into()), }), } } @@ -359,6 +410,7 @@ impl Geyser for GrpcService { slots: HashMap::new(), transactions: HashMap::new(), blocks: HashMap::new(), + blocks_meta: HashMap::new(), }, self.config.filters.as_ref(), ) diff --git a/src/plugin.rs b/src/plugin.rs index ca9c9aee..e013a86a 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -1,7 +1,7 @@ use { crate::{ config::Config, - grpc::{GrpcService, Message}, + grpc::{GrpcService, Message, MessageTransaction, MessageTransactionInfo}, prom::{PrometheusService, SLOT_STATUS}, }, solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -21,6 +21,7 @@ pub struct PluginInner { grpc_channel: mpsc::UnboundedSender, grpc_shutdown_tx: oneshot::Sender<()>, prometheus: PrometheusService, + transactions: Option<(u64, Vec)>, } #[derive(Debug, Default)] @@ -54,6 +55,7 @@ impl GeyserPlugin for Plugin { grpc_channel, grpc_shutdown_tx, prometheus, + transactions: None, }); Ok(()) @@ -107,8 +109,26 @@ impl GeyserPlugin for Plugin { transaction: ReplicaTransactionInfoVersions<'_>, slot: u64, ) -> PluginResult<()> { - let inner = self.inner.as_ref().expect("initialized"); - let message = Message::Transaction((transaction, slot).into()); + let inner = self.inner.as_mut().expect("initialized"); + + let msg_tx: MessageTransaction = (transaction, slot).into(); + match &mut inner.transactions { + Some((current_slot, transactions)) if *current_slot == slot => { + transactions.push(msg_tx.transaction.clone()); + } + Some((current_slot, _)) => { + log::error!( + "got tx from block {}, while current block is {}", + slot, + current_slot + ); + } + None => { + inner.transactions = Some((slot, vec![msg_tx.transaction.clone()])); + } + } + + let message = Message::Transaction(msg_tx); let _ = inner.grpc_channel.send(message); Ok(()) @@ -118,8 +138,29 @@ impl GeyserPlugin for Plugin { &mut self, blockinfo: ReplicaBlockInfoVersions<'_>, ) -> PluginResult<()> { - let inner = self.inner.as_ref().expect("initialized"); - let message = Message::Block(blockinfo.into()); + let inner = self.inner.as_mut().expect("initialized"); + + let ReplicaBlockInfoVersions::V0_0_1(block) = &blockinfo; + let transactions = match inner.transactions.take() { + Some((slot, transactions)) if slot == block.slot => transactions, + Some((slot, _)) => { + let msg = format!( + "invalid transactions for block {}, found {}", + block.slot, slot + ); + log::error!("{}", msg); + return Err(GeyserPluginError::Custom(msg.into())); + } + None => { + let msg = format!("no transactions for block {}", block.slot); + log::error!("{}", msg); + return Err(GeyserPluginError::Custom(msg.into())); + } + }; + + let message = Message::Block((&blockinfo, transactions).into()); + let _ = inner.grpc_channel.send(message); + let message = Message::BlockMeta((&blockinfo).into()); let _ = inner.grpc_channel.send(message); Ok(())