Skip to content

Commit

Permalink
Add transactions to block message, new block meta message (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jan 12, 2023
1 parent 7295fb3 commit 1d0d4dc
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "solana-geyser-grpc"
version = "0.2.0"
version = "0.3.0+solana.1.14.10"
authors = ["Triton One"]
edition = "2021"

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ If all fields are empty then all transactions are broadcasted. Otherwise fields

Currently all blocks are broadcasted.

#### Blocks meta

Same as `Blocks` but without `transactions`.

### Limit filters

It's possible to add limits for filters in config. If `filters` field is omitted then filters doesn't have any limits.
Expand Down
13 changes: 13 additions & 0 deletions proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ message SubscribeRequest {
map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterBlocks> blocks = 4;
map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5;
}

message SubscribeRequestFilterAccounts {
Expand All @@ -31,6 +32,8 @@ message SubscribeRequestFilterTransactions {

message SubscribeRequestFilterBlocks {}

message SubscribeRequestFilterBlocksMeta {}

message SubscribeUpdate {
repeated string filters = 1;
oneof update_oneof {
Expand All @@ -39,6 +42,7 @@ message SubscribeUpdate {
SubscribeUpdateTransaction transaction = 4;
SubscribeUpdateBlock block = 5;
SubscribeUpdatePing ping = 6;
SubscribeUpdateBlockMeta block_meta = 7;
}
}

Expand Down Expand Up @@ -90,6 +94,15 @@ message SubscribeUpdateBlock {
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
repeated SubscribeUpdateTransactionInfo transactions = 6;
}

message SubscribeUpdateBlockMeta {
uint64 slot = 1;
string blockhash = 2;
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
}

message SubscribeUpdatePing {}
14 changes: 12 additions & 2 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use {
futures::stream::{once, StreamExt},
solana_geyser_grpc::proto::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
},
std::collections::HashMap,
tonic::transport::{channel::ClientTlsConfig, Endpoint, Uri},
Expand Down Expand Up @@ -56,6 +56,10 @@ struct Args {
#[clap(long)]
/// Subscribe on block updates
blocks: bool,

#[clap(long)]
/// Subscribe on block meta updates (without transactions)
blocks_meta: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -96,6 +100,11 @@ async fn main() -> anyhow::Result<()> {
blocks.insert("client".to_owned(), SubscribeRequestFilterBlocks {});
}

let mut blocks_meta = HashMap::new();
if args.blocks_meta {
blocks_meta.insert("client".to_owned(), SubscribeRequestFilterBlocksMeta {});
}

let mut endpoint = Endpoint::from_shared(args.endpoint.clone())?;
let uri: Uri = args.endpoint.parse()?;
if uri.scheme_str() == Some("https") {
Expand All @@ -108,6 +117,7 @@ async fn main() -> anyhow::Result<()> {
accounts,
transactions,
blocks,
blocks_meta,
};
println!("Going to send request: {:?}", request);

Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct ConfigGrpcFilters {
pub slots: ConfigGrpcFiltersSlots,
pub transactions: ConfigGrpcFiltersTransactions,
pub blocks: ConfigGrpcFiltersBlocks,
pub blocks_meta: ConfigGrpcFiltersBlocksMeta,
}

impl ConfigGrpcFilters {
Expand Down Expand Up @@ -157,6 +158,12 @@ pub struct ConfigGrpcFiltersBlocks {
pub max: usize,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersBlocksMeta {
pub max: usize,
}

#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigPrometheus {
Expand Down
45 changes: 40 additions & 5 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use {
crate::{
config::{
ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks,
ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions,
ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions,
},
grpc::{
Message, MessageAccount, MessageBlock, MessageBlockMeta, MessageSlot,
MessageTransaction,
},
grpc::{Message, MessageAccount, MessageBlock, MessageSlot, MessageTransaction},
proto::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
},
},
solana_sdk::pubkey::Pubkey,
Expand All @@ -25,6 +29,7 @@ pub struct Filter {
slots: FilterSlots,
transactions: FilterTransactions,
blocks: FilterBlocks,
blocks_meta: FilterBlocksMeta,
}

impl Filter {
Expand All @@ -40,6 +45,7 @@ impl Filter {
limit.map(|v| &v.transactions),
)?,
blocks: FilterBlocks::new(&config.blocks, limit.map(|v| &v.blocks))?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, limit.map(|v| &v.blocks_meta))?,
})
}

Expand Down Expand Up @@ -67,6 +73,7 @@ impl Filter {
Message::Slot(message) => self.slots.get_filters(message),
Message::Transaction(message) => self.transactions.get_filters(message),
Message::Block(message) => self.blocks.get_filters(message),
Message::BlockMeta(message) => self.blocks_meta.get_filters(message),
}
}
}
Expand Down Expand Up @@ -223,7 +230,7 @@ impl FilterSlots {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}

Ok(FilterSlots {
Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
Expand Down Expand Up @@ -356,7 +363,7 @@ impl FilterBlocks {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}

Ok(FilterBlocks {
Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
Expand All @@ -369,3 +376,31 @@ impl FilterBlocks {
self.filters.clone()
}
}

#[derive(Debug, Default)]
struct FilterBlocksMeta {
filters: Vec<String>,
}

impl FilterBlocksMeta {
fn new(
configs: &HashMap<String, SubscribeRequestFilterBlocksMeta>,
limit: Option<&ConfigGrpcFiltersBlocksMeta>,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}

Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
.map(|(name, _filter)| name.clone())
.collect(),
})
}

fn get_filters(&self, _message: &MessageBlockMeta) -> Vec<String> {
self.filters.clone()
}
}
76 changes: 64 additions & 12 deletions src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use {
geyser_server::{Geyser, GeyserServer},
subscribe_update::UpdateOneof,
SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
SubscribeUpdateBlock, SubscribeUpdatePing, SubscribeUpdateSlot,
SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo,
SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing,
SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction,
SubscribeUpdateTransactionInfo,
},
},
log::*,
Expand Down Expand Up @@ -102,7 +103,7 @@ impl From<(u64, Option<u64>, SlotStatus)> for MessageSlot {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MessageTransactionInfo {
pub signature: Signature,
pub is_vote: bool,
Expand All @@ -111,6 +112,18 @@ pub struct MessageTransactionInfo {
pub index: usize,
}

impl From<&MessageTransactionInfo> for SubscribeUpdateTransactionInfo {
fn from(tx: &MessageTransactionInfo) -> Self {
Self {
signature: tx.signature.as_ref().into(),
is_vote: tx.is_vote,
transaction: Some((&tx.transaction).into()),
meta: Some((&tx.meta).into()),
index: tx.index as u64,
}
}
}

#[derive(Debug)]
pub struct MessageTransaction {
pub transaction: MessageTransactionInfo,
Expand Down Expand Up @@ -144,10 +157,45 @@ pub struct MessageBlock {
pub rewards: Vec<Reward>,
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
pub transactions: Vec<MessageTransactionInfo>,
}

impl<'a> From<ReplicaBlockInfoVersions<'a>> for MessageBlock {
fn from(blockinfo: ReplicaBlockInfoVersions<'a>) -> Self {
impl<'a>
From<(
&'a ReplicaBlockInfoVersions<'a>,
Vec<MessageTransactionInfo>,
)> for MessageBlock
{
fn from(
(blockinfo, transactions): (
&'a ReplicaBlockInfoVersions<'a>,
Vec<MessageTransactionInfo>,
),
) -> Self {
match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(info) => Self {
slot: info.slot,
blockhash: info.blockhash.to_string(),
rewards: info.rewards.into(),
block_time: info.block_time,
block_height: info.block_height,
transactions,
},
}
}
}

#[derive(Debug)]
pub struct MessageBlockMeta {
pub slot: u64,
pub blockhash: String,
pub rewards: Vec<Reward>,
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
}

impl<'a> From<&'a ReplicaBlockInfoVersions<'a>> for MessageBlockMeta {
fn from(blockinfo: &'a ReplicaBlockInfoVersions<'a>) -> Self {
match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(info) => Self {
slot: info.slot,
Expand All @@ -166,6 +214,7 @@ pub enum Message {
Account(MessageAccount),
Transaction(MessageTransaction),
Block(MessageBlock),
BlockMeta(MessageBlockMeta),
}

impl From<&Message> for UpdateOneof {
Expand All @@ -191,13 +240,7 @@ impl From<&Message> for UpdateOneof {
is_startup: message.is_startup,
}),
Message::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction {
transaction: Some(SubscribeUpdateTransactionInfo {
signature: message.transaction.signature.as_ref().into(),
is_vote: message.transaction.is_vote,
transaction: Some((&message.transaction.transaction).into()),
meta: Some((&message.transaction.meta).into()),
index: message.transaction.index as u64,
}),
transaction: Some((&message.transaction).into()),
slot: message.slot,
}),
Message::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock {
Expand All @@ -206,6 +249,14 @@ impl From<&Message> for UpdateOneof {
rewards: Some(message.rewards.as_slice().into()),
block_time: message.block_time.map(|v| v.into()),
block_height: message.block_height.map(|v| v.into()),
transactions: message.transactions.iter().map(Into::into).collect(),
}),
Message::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta {
slot: message.slot,
blockhash: message.blockhash.clone(),
rewards: Some(message.rewards.as_slice().into()),
block_time: message.block_time.map(|v| v.into()),
block_height: message.block_height.map(|v| v.into()),
}),
}
}
Expand Down Expand Up @@ -359,6 +410,7 @@ impl Geyser for GrpcService {
slots: HashMap::new(),
transactions: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
},
self.config.filters.as_ref(),
)
Expand Down
Loading

0 comments on commit 1d0d4dc

Please sign in to comment.