From e431248de14b73f8c625088e6873a6d795182793 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 17 Oct 2023 16:43:52 +0400 Subject: [PATCH] v1.16: kafka: rename to tools (backport of #203) (#209) (cherry picked from commit f5f0c3cd8dac580248d2b308a300a69e5067f5a0) Co-authored-by: Kirill Fomichev --- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 4 +- CHANGELOG.md | 2 + Cargo.lock | 34 +-- Cargo.toml | 6 +- README.md | 11 +- yellowstone-grpc-kafka/src/prom.rs | 242 ------------------ .../Cargo.toml | 6 +- .../build.rs | 0 .../config-kafka.json | 0 .../docker-kafka.yml | 0 .../src/bin/grpc-kafka.rs | 47 ++-- .../src/kafka}/config.rs | 2 +- .../src/kafka}/dedup.rs | 0 .../src/kafka}/grpc.rs | 0 yellowstone-grpc-tools/src/kafka/mod.rs | 4 + yellowstone-grpc-tools/src/kafka/prom.rs | 155 +++++++++++ .../src/lib.rs | 4 +- yellowstone-grpc-tools/src/prom.rs | 89 +++++++ .../src/version.rs | 0 20 files changed, 307 insertions(+), 301 deletions(-) delete mode 100644 yellowstone-grpc-kafka/src/prom.rs rename {yellowstone-grpc-kafka => yellowstone-grpc-tools}/Cargo.toml (89%) rename {yellowstone-grpc-kafka => yellowstone-grpc-tools}/build.rs (100%) rename yellowstone-grpc-kafka/config.json => yellowstone-grpc-tools/config-kafka.json (100%) rename {yellowstone-grpc-kafka => yellowstone-grpc-tools}/docker-kafka.yml (100%) rename {yellowstone-grpc-kafka => yellowstone-grpc-tools}/src/bin/grpc-kafka.rs (91%) rename {yellowstone-grpc-kafka/src => yellowstone-grpc-tools/src/kafka}/config.rs (99%) rename {yellowstone-grpc-kafka/src => yellowstone-grpc-tools/src/kafka}/dedup.rs (100%) rename {yellowstone-grpc-kafka/src => yellowstone-grpc-tools/src/kafka}/grpc.rs (100%) create mode 100644 yellowstone-grpc-tools/src/kafka/mod.rs create mode 100644 yellowstone-grpc-tools/src/kafka/prom.rs rename {yellowstone-grpc-kafka => yellowstone-grpc-tools}/src/lib.rs (77%) create mode 100644 yellowstone-grpc-tools/src/prom.rs rename {yellowstone-grpc-kafka => yellowstone-grpc-tools}/src/version.rs (100%) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b9947160..a96adc82 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,7 +10,7 @@ on: - 'v1.16' tags: - 'v*' - - 'kafka-v*' + - 'tools-v*' workflow_dispatch: env: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 65ed0db2..e7b90886 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -74,10 +74,10 @@ jobs: run: cargo check -p yellowstone-grpc-client-simple --all-targets --tests - name: check features in `geyser` run: cargo check -p yellowstone-grpc-geyser --all-targets --tests - - name: check features in `kafka` - run: cargo check -p yellowstone-grpc-kafka --all-targets --tests - name: check features in `proto` run: cargo check -p yellowstone-grpc-proto --all-targets --tests + - name: check features in `tools` + run: cargo check -p yellowstone-grpc-tools --all-targets --tests - name: Build run: ./ci/cargo-build-test.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 943f54b9..3df932dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +- kafka: rename to tools ([#209](https://github.com/rpcpool/yellowstone-grpc/pull/209)). + ## 2023-10-14 - yellowstone-grpc-client-1.11.1+solana.1.16.17 diff --git a/Cargo.lock b/Cargo.lock index a74b7c71..f87e251c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4665,8 +4665,23 @@ dependencies = [ ] [[package]] -name = "yellowstone-grpc-kafka" -version = "1.0.0-rc.3+solana.1.16.17" +name = "yellowstone-grpc-proto" +version = "1.10.0+solana.1.16.17" +dependencies = [ + "anyhow", + "bincode", + "prost", + "protobuf-src", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", +] + +[[package]] +name = "yellowstone-grpc-tools" +version = "1.0.0-rc.4+solana.1.16.17" dependencies = [ "anyhow", "async-trait", @@ -4696,21 +4711,6 @@ dependencies = [ "yellowstone-grpc-proto", ] -[[package]] -name = "yellowstone-grpc-proto" -version = "1.10.0+solana.1.16.17" -dependencies = [ - "anyhow", - "bincode", - "prost", - "protobuf-src", - "solana-account-decoder", - "solana-sdk", - "solana-transaction-status", - "tonic", - "tonic-build", -] - [[package]] name = "zeroize" version = "1.3.0" diff --git a/Cargo.toml b/Cargo.toml index bbda1618..2bca263f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ "examples/rust", # 1.10.0+solana.1.16.17 - "yellowstone-grpc-client", # 1.11.1+solana.1.16.17 - "yellowstone-grpc-geyser", # 1.10.0+solana.1.16.17 - "yellowstone-grpc-kafka", # 1.0.0-rc.3+solana.1.16.17 + "yellowstone-grpc-client", # 1.11.1+solana.1.6.17 + "yellowstone-grpc-geyser", # 1.10.0+solana.1.6.17 "yellowstone-grpc-proto", # 1.10.0+solana.1.16.17 + "yellowstone-grpc-tools", # 1.0.0-rc.4+solana.1.16.17 ] [profile.release] diff --git a/README.md b/README.md index 453c0839..c8dd0094 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,9 @@ It's possible to add limits for filters in config. If `filters` field is omitted - [Rust](examples/rust) - [TypeScript](examples/typescript) -### Kafka producer / consumer +### gRPC Tools + +#### Kafka In addition to gRPC Geyser Plugin we provide Kafka tool. This tool can works in 3 modes: @@ -114,7 +116,7 @@ In addition to gRPC Geyser Plugin we provide Kafka tool. This tool can works in ```bash $ cargo run --bin grpc-kafka -- --help -Yellowstone gRPC Kafka Producer/Dedup/Consumer +Yellowstone gRPC Kafka Tool Usage: grpc-kafka [OPTIONS] --config @@ -126,7 +128,6 @@ Commands: Options: -c, --config Path to config file - --prometheus Prometheus listen address -h, --help Print help -V, --version Print version ``` @@ -135,11 +136,11 @@ Options: ```bash # run kafka locally -docker-compose -f ./yellowstone-grpc-kafka/docker-kafka.yml up +docker-compose -f ./yellowstone-grpc-tools/docker-kafka.yml up # create topic kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:29092 --create --topic grpc1 # send messages from gRPC to Kafka -cargo run --bin grpc-kafka -- --config yellowstone-grpc-kafka/config.json --prometheus 127.0.0.1:8873 grpc2kafka +cargo run --bin grpc-kafka -- --config yellowstone-grpc-tools/config-kafka.json grpc2kafka # read messages from Kafka kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic grpc1 ``` diff --git a/yellowstone-grpc-kafka/src/prom.rs b/yellowstone-grpc-kafka/src/prom.rs deleted file mode 100644 index f066036e..00000000 --- a/yellowstone-grpc-kafka/src/prom.rs +++ /dev/null @@ -1,242 +0,0 @@ -use { - crate::version::VERSION as VERSION_INFO, - hyper::{ - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, - }, - prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts, Registry, TextEncoder}, - std::{net::SocketAddr, sync::Once}, - tracing::{error, info}, -}; - -lazy_static::lazy_static! { - static ref REGISTRY: Registry = Registry::new(); - - static ref VERSION: IntCounterVec = IntCounterVec::new( - Opts::new("version", "Plugin version info"), - &["buildts", "git", "package", "proto", "rustc", "solana", "version"] - ).unwrap(); - - static ref KAFKA_STATS: GaugeVec = GaugeVec::new( - Opts::new("kafka_stats", "librdkafka metrics"), - &["broker", "metric"] - ).unwrap(); - - static ref KAFKA_DEDUP_TOTAL: IntCounter = IntCounter::new( - "kafka_dedup_total", "Total number of deduplicated messages" - ).unwrap(); - - static ref KAFKA_RECV_TOTAL: IntCounter = IntCounter::new( - "kafka_recv_total", "Total number of received messages" - ).unwrap(); - - static ref KAFKA_SENT_TOTAL: IntCounterVec = IntCounterVec::new( - Opts::new("kafka_sent_total", "Total number of uploaded messages by type"), - &["kind"] - ).unwrap(); -} - -pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { - static REGISTER: Once = Once::new(); - REGISTER.call_once(|| { - macro_rules! register { - ($collector:ident) => { - REGISTRY - .register(Box::new($collector.clone())) - .expect("collector can't be registered"); - }; - } - register!(VERSION); - register!(KAFKA_STATS); - register!(KAFKA_DEDUP_TOTAL); - register!(KAFKA_RECV_TOTAL); - register!(KAFKA_SENT_TOTAL); - - VERSION - .with_label_values(&[ - VERSION_INFO.buildts, - VERSION_INFO.git, - VERSION_INFO.package, - VERSION_INFO.proto, - VERSION_INFO.rustc, - VERSION_INFO.solana, - VERSION_INFO.version, - ]) - .inc(); - }); - - let make_service = make_service_fn(move |_: &AddrStream| async move { - Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { - let response = match req.uri().path() { - "/metrics" => metrics_handler(), - _ => not_found_handler(), - }; - Ok::<_, hyper::Error>(response) - })) - }); - let server = Server::try_bind(&address)?.serve(make_service); - info!("prometheus server started: {address:?}"); - tokio::spawn(async move { - if let Err(error) = server.await { - error!("prometheus server failed: {error:?}"); - } - }); - - Ok(()) -} - -fn metrics_handler() -> Response { - let metrics = TextEncoder::new() - .encode_to_string(®ISTRY.gather()) - .unwrap_or_else(|error| { - error!("could not encode custom metrics: {}", error); - String::new() - }); - Response::builder().body(Body::from(metrics)).unwrap() -} - -fn not_found_handler() -> Response { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap() -} - -pub mod kafka { - use { - super::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS}, - rdkafka::{ - client::ClientContext, - config::{ClientConfig, FromClientConfigAndContext}, - consumer::{ConsumerContext, StreamConsumer}, - error::KafkaResult, - producer::FutureProducer, - statistics::Statistics, - }, - yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, - }; - - #[derive(Debug, Default, Clone, Copy)] - pub struct StatsContext; - - impl ClientContext for StatsContext { - fn stats(&self, statistics: Statistics) { - for (name, broker) in statistics.brokers { - macro_rules! set_value { - ($name:expr, $value:expr) => { - KAFKA_STATS - .with_label_values(&[&name, $name]) - .set($value as f64); - }; - } - - set_value!("outbuf_cnt", broker.outbuf_cnt); - set_value!("outbuf_msg_cnt", broker.outbuf_msg_cnt); - set_value!("waitresp_cnt", broker.waitresp_cnt); - set_value!("waitresp_msg_cnt", broker.waitresp_msg_cnt); - set_value!("tx", broker.tx); - set_value!("txerrs", broker.txerrs); - set_value!("txretries", broker.txretries); - set_value!("req_timeouts", broker.req_timeouts); - - if let Some(window) = broker.int_latency { - set_value!("int_latency.min", window.min); - set_value!("int_latency.max", window.max); - set_value!("int_latency.avg", window.avg); - set_value!("int_latency.sum", window.sum); - set_value!("int_latency.cnt", window.cnt); - set_value!("int_latency.stddev", window.stddev); - set_value!("int_latency.hdrsize", window.hdrsize); - set_value!("int_latency.p50", window.p50); - set_value!("int_latency.p75", window.p75); - set_value!("int_latency.p90", window.p90); - set_value!("int_latency.p95", window.p95); - set_value!("int_latency.p99", window.p99); - set_value!("int_latency.p99_99", window.p99_99); - set_value!("int_latency.outofrange", window.outofrange); - } - - if let Some(window) = broker.outbuf_latency { - set_value!("outbuf_latency.min", window.min); - set_value!("outbuf_latency.max", window.max); - set_value!("outbuf_latency.avg", window.avg); - set_value!("outbuf_latency.sum", window.sum); - set_value!("outbuf_latency.cnt", window.cnt); - set_value!("outbuf_latency.stddev", window.stddev); - set_value!("outbuf_latency.hdrsize", window.hdrsize); - set_value!("outbuf_latency.p50", window.p50); - set_value!("outbuf_latency.p75", window.p75); - set_value!("outbuf_latency.p90", window.p90); - set_value!("outbuf_latency.p95", window.p95); - set_value!("outbuf_latency.p99", window.p99); - set_value!("outbuf_latency.p99_99", window.p99_99); - set_value!("outbuf_latency.outofrange", window.outofrange); - } - } - } - } - - impl ConsumerContext for StatsContext {} - - impl StatsContext { - pub fn create_future_producer(config: &ClientConfig) -> KafkaResult> { - FutureProducer::from_config_and_context(config, Self) - } - - pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult> { - StreamConsumer::from_config_and_context(config, Self) - } - } - - #[derive(Debug, Clone, Copy)] - pub enum GprcMessageKind { - Account, - Slot, - Transaction, - Block, - BlockMeta, - Entry, - Unknown, - } - - impl From<&UpdateOneof> for GprcMessageKind { - fn from(msg: &UpdateOneof) -> Self { - match msg { - UpdateOneof::Account(_) => Self::Account, - UpdateOneof::Slot(_) => Self::Slot, - UpdateOneof::Transaction(_) => Self::Transaction, - UpdateOneof::Block(_) => Self::Block, - UpdateOneof::Ping(_) => unreachable!(), - UpdateOneof::BlockMeta(_) => Self::BlockMeta, - UpdateOneof::Entry(_) => Self::Entry, - } - } - } - - impl GprcMessageKind { - const fn as_str(self) -> &'static str { - match self { - GprcMessageKind::Account => "account", - GprcMessageKind::Slot => "slot", - GprcMessageKind::Transaction => "transaction", - GprcMessageKind::Block => "block", - GprcMessageKind::BlockMeta => "blockmeta", - GprcMessageKind::Entry => "entry", - GprcMessageKind::Unknown => "unknown", - } - } - } - - pub fn sent_inc(kind: GprcMessageKind) { - KAFKA_SENT_TOTAL.with_label_values(&[kind.as_str()]).inc() - } - - pub fn recv_inc() { - KAFKA_RECV_TOTAL.inc(); - } - - pub fn dedup_inc() { - KAFKA_DEDUP_TOTAL.inc(); - } -} diff --git a/yellowstone-grpc-kafka/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml similarity index 89% rename from yellowstone-grpc-kafka/Cargo.toml rename to yellowstone-grpc-tools/Cargo.toml index 2c624fbe..5fee045f 100644 --- a/yellowstone-grpc-kafka/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "yellowstone-grpc-kafka" -version = "1.0.0-rc.3+solana.1.16.17" +name = "yellowstone-grpc-tools" +version = "1.0.0-rc.4+solana.1.16.17" authors = ["Triton One"] edition = "2021" -description = "Yellowstone gRPC Kafka Producer/Dedup/Consumer" +description = "Yellowstone gRPC Tools" publish = false [dependencies] diff --git a/yellowstone-grpc-kafka/build.rs b/yellowstone-grpc-tools/build.rs similarity index 100% rename from yellowstone-grpc-kafka/build.rs rename to yellowstone-grpc-tools/build.rs diff --git a/yellowstone-grpc-kafka/config.json b/yellowstone-grpc-tools/config-kafka.json similarity index 100% rename from yellowstone-grpc-kafka/config.json rename to yellowstone-grpc-tools/config-kafka.json diff --git a/yellowstone-grpc-kafka/docker-kafka.yml b/yellowstone-grpc-tools/docker-kafka.yml similarity index 100% rename from yellowstone-grpc-kafka/docker-kafka.yml rename to yellowstone-grpc-tools/docker-kafka.yml diff --git a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs similarity index 91% rename from yellowstone-grpc-kafka/src/bin/grpc-kafka.rs rename to yellowstone-grpc-tools/src/bin/grpc-kafka.rs index 726e65d0..1a8e4af2 100644 --- a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs @@ -7,7 +7,7 @@ use { }, rdkafka::{config::ClientConfig, consumer::Consumer, message::Message, producer::FutureRecord}, sha2::{Digest, Sha256}, - std::{net::SocketAddr, sync::Arc, time::Duration}, + std::{sync::Arc, time::Duration}, tokio::{ signal::unix::{signal, SignalKind}, task::JoinSet, @@ -19,29 +19,28 @@ use { util::SubscriberInitExt, }, yellowstone_grpc_client::GeyserGrpcClient, - yellowstone_grpc_kafka::{ - config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc, GrpcRequestToProto}, - dedup::KafkaDedup, - grpc::GrpcService, - prom, - }, yellowstone_grpc_proto::{ prelude::{subscribe_update::UpdateOneof, SubscribeUpdate}, prost::Message as _, }, + yellowstone_grpc_tools::{ + kafka::{ + config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc, GrpcRequestToProto}, + dedup::KafkaDedup, + grpc::GrpcService, + prom, + }, + prom::run_server as prometheus_run_server, + }, }; #[derive(Debug, Clone, Parser)] -#[clap(author, version, about)] +#[clap(author, version, about = "Yellowstone gRPC Kafka Tool")] struct Args { /// Path to config file #[clap(short, long)] config: String, - /// [DEPRECATED: use config] Prometheus listen address - #[clap(long)] - prometheus: Option, - #[command(subcommand)] action: ArgsAction, } @@ -97,12 +96,12 @@ impl ArgsAction { } // input - let consumer = prom::kafka::StatsContext::create_stream_consumer(&kafka_config) + let consumer = prom::StatsContext::create_stream_consumer(&kafka_config) .context("failed to create kafka consumer")?; consumer.subscribe(&[&config.kafka_input])?; // output - let kafka = prom::kafka::StatsContext::create_future_producer(&kafka_config) + let kafka = prom::StatsContext::create_future_producer(&kafka_config) .context("failed to create kafka producer")?; // dedup @@ -126,7 +125,7 @@ impl ArgsAction { }, message = consumer.recv() => message, }?; - prom::kafka::recv_inc(); + prom::recv_inc(); trace!( "received message with key: {:?}", message.key().and_then(|k| std::str::from_utf8(k).ok()) @@ -167,13 +166,13 @@ impl ArgsAction { debug!("kafka send message with key: {key}, result: {result:?}"); result?.map_err(|(error, _message)| error)?; - prom::kafka::sent_inc(prom::kafka::GprcMessageKind::Unknown); + prom::sent_inc(prom::GprcMessageKind::Unknown); Ok::<(), anyhow::Error>(()) } Err(error) => Err(error.0.into()), } } else { - prom::kafka::dedup_inc(); + prom::dedup_inc(); Ok(()) } }); @@ -205,7 +204,7 @@ impl ArgsAction { } // Connect to kafka - let kafka = prom::kafka::StatsContext::create_future_producer(&kafka_config) + let kafka = prom::StatsContext::create_future_producer(&kafka_config) .context("failed to create kafka producer")?; // Create gRPC client & subscribe @@ -257,7 +256,7 @@ impl ArgsAction { }; let hash = Sha256::digest(&payload); let key = format!("{slot}_{}", const_hex::encode(hash)); - let prom_kind = prom::kafka::GprcMessageKind::from(message); + let prom_kind = prom::GprcMessageKind::from(message); let record = FutureRecord::to(&config.kafka_topic) .key(&key) @@ -270,7 +269,7 @@ impl ArgsAction { debug!("kafka send message with key: {key}, result: {result:?}"); let result = result?.map_err(|(error, _message)| error)?; - prom::kafka::sent_inc(prom_kind); + prom::sent_inc(prom_kind); Ok::<(i32, i64), anyhow::Error>(result) }); if send_tasks.len() >= config.kafka_queue_size { @@ -308,7 +307,7 @@ impl ArgsAction { let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?; - let consumer = prom::kafka::StatsContext::create_stream_consumer(&kafka_config) + let consumer = prom::StatsContext::create_stream_consumer(&kafka_config) .context("failed to create kafka consumer")?; consumer.subscribe(&[&config.kafka_topic])?; @@ -317,7 +316,7 @@ impl ArgsAction { _ = &mut shutdown => break, message = consumer.recv() => message?, }; - prom::kafka::recv_inc(); + prom::recv_inc(); debug!( "received message with key: {:?}", message.key().and_then(|k| std::str::from_utf8(k).ok()) @@ -358,8 +357,8 @@ async fn main() -> anyhow::Result<()> { let config = Config::load(&args.config).await?; // Run prometheus server - if let Some(address) = config.prometheus.or(args.prometheus) { - prom::run_server(address)?; + if let Some(address) = config.prometheus { + prometheus_run_server(address)?; } // Create kafka config diff --git a/yellowstone-grpc-kafka/src/config.rs b/yellowstone-grpc-tools/src/kafka/config.rs similarity index 99% rename from yellowstone-grpc-kafka/src/config.rs rename to yellowstone-grpc-tools/src/kafka/config.rs index f4d2b87d..f01d754d 100644 --- a/yellowstone-grpc-kafka/src/config.rs +++ b/yellowstone-grpc-tools/src/kafka/config.rs @@ -1,5 +1,5 @@ use { - crate::dedup::{KafkaDedup, KafkaDedupMemory}, + super::dedup::{KafkaDedup, KafkaDedupMemory}, anyhow::Context, serde::{ de::{self, Deserializer}, diff --git a/yellowstone-grpc-kafka/src/dedup.rs b/yellowstone-grpc-tools/src/kafka/dedup.rs similarity index 100% rename from yellowstone-grpc-kafka/src/dedup.rs rename to yellowstone-grpc-tools/src/kafka/dedup.rs diff --git a/yellowstone-grpc-kafka/src/grpc.rs b/yellowstone-grpc-tools/src/kafka/grpc.rs similarity index 100% rename from yellowstone-grpc-kafka/src/grpc.rs rename to yellowstone-grpc-tools/src/kafka/grpc.rs diff --git a/yellowstone-grpc-tools/src/kafka/mod.rs b/yellowstone-grpc-tools/src/kafka/mod.rs new file mode 100644 index 00000000..f6d97398 --- /dev/null +++ b/yellowstone-grpc-tools/src/kafka/mod.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod dedup; +pub mod grpc; +pub mod prom; diff --git a/yellowstone-grpc-tools/src/kafka/prom.rs b/yellowstone-grpc-tools/src/kafka/prom.rs new file mode 100644 index 00000000..939d98b5 --- /dev/null +++ b/yellowstone-grpc-tools/src/kafka/prom.rs @@ -0,0 +1,155 @@ +use { + prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts}, + rdkafka::{ + client::ClientContext, + config::{ClientConfig, FromClientConfigAndContext}, + consumer::{ConsumerContext, StreamConsumer}, + error::KafkaResult, + producer::FutureProducer, + statistics::Statistics, + }, + yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, +}; + +lazy_static::lazy_static! { + pub(crate) static ref KAFKA_STATS: GaugeVec = GaugeVec::new( + Opts::new("kafka_stats", "librdkafka metrics"), + &["broker", "metric"] + ).unwrap(); + + pub(crate) static ref KAFKA_DEDUP_TOTAL: IntCounter = IntCounter::new( + "kafka_dedup_total", "Total number of deduplicated messages" + ).unwrap(); + + pub(crate) static ref KAFKA_RECV_TOTAL: IntCounter = IntCounter::new( + "kafka_recv_total", "Total number of received messages" + ).unwrap(); + + pub(crate) static ref KAFKA_SENT_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("kafka_sent_total", "Total number of uploaded messages by type"), + &["kind"] + ).unwrap(); +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct StatsContext; + +impl ClientContext for StatsContext { + fn stats(&self, statistics: Statistics) { + for (name, broker) in statistics.brokers { + macro_rules! set_value { + ($name:expr, $value:expr) => { + KAFKA_STATS + .with_label_values(&[&name, $name]) + .set($value as f64); + }; + } + + set_value!("outbuf_cnt", broker.outbuf_cnt); + set_value!("outbuf_msg_cnt", broker.outbuf_msg_cnt); + set_value!("waitresp_cnt", broker.waitresp_cnt); + set_value!("waitresp_msg_cnt", broker.waitresp_msg_cnt); + set_value!("tx", broker.tx); + set_value!("txerrs", broker.txerrs); + set_value!("txretries", broker.txretries); + set_value!("req_timeouts", broker.req_timeouts); + + if let Some(window) = broker.int_latency { + set_value!("int_latency.min", window.min); + set_value!("int_latency.max", window.max); + set_value!("int_latency.avg", window.avg); + set_value!("int_latency.sum", window.sum); + set_value!("int_latency.cnt", window.cnt); + set_value!("int_latency.stddev", window.stddev); + set_value!("int_latency.hdrsize", window.hdrsize); + set_value!("int_latency.p50", window.p50); + set_value!("int_latency.p75", window.p75); + set_value!("int_latency.p90", window.p90); + set_value!("int_latency.p95", window.p95); + set_value!("int_latency.p99", window.p99); + set_value!("int_latency.p99_99", window.p99_99); + set_value!("int_latency.outofrange", window.outofrange); + } + + if let Some(window) = broker.outbuf_latency { + set_value!("outbuf_latency.min", window.min); + set_value!("outbuf_latency.max", window.max); + set_value!("outbuf_latency.avg", window.avg); + set_value!("outbuf_latency.sum", window.sum); + set_value!("outbuf_latency.cnt", window.cnt); + set_value!("outbuf_latency.stddev", window.stddev); + set_value!("outbuf_latency.hdrsize", window.hdrsize); + set_value!("outbuf_latency.p50", window.p50); + set_value!("outbuf_latency.p75", window.p75); + set_value!("outbuf_latency.p90", window.p90); + set_value!("outbuf_latency.p95", window.p95); + set_value!("outbuf_latency.p99", window.p99); + set_value!("outbuf_latency.p99_99", window.p99_99); + set_value!("outbuf_latency.outofrange", window.outofrange); + } + } + } +} + +impl ConsumerContext for StatsContext {} + +impl StatsContext { + pub fn create_future_producer(config: &ClientConfig) -> KafkaResult> { + FutureProducer::from_config_and_context(config, Self) + } + + pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult> { + StreamConsumer::from_config_and_context(config, Self) + } +} + +#[derive(Debug, Clone, Copy)] +pub enum GprcMessageKind { + Account, + Slot, + Transaction, + Block, + BlockMeta, + Entry, + Unknown, +} + +impl From<&UpdateOneof> for GprcMessageKind { + fn from(msg: &UpdateOneof) -> Self { + match msg { + UpdateOneof::Account(_) => Self::Account, + UpdateOneof::Slot(_) => Self::Slot, + UpdateOneof::Transaction(_) => Self::Transaction, + UpdateOneof::Block(_) => Self::Block, + UpdateOneof::Ping(_) => unreachable!(), + UpdateOneof::BlockMeta(_) => Self::BlockMeta, + UpdateOneof::Entry(_) => Self::Entry, + } + } +} + +impl GprcMessageKind { + const fn as_str(self) -> &'static str { + match self { + GprcMessageKind::Account => "account", + GprcMessageKind::Slot => "slot", + GprcMessageKind::Transaction => "transaction", + GprcMessageKind::Block => "block", + GprcMessageKind::BlockMeta => "blockmeta", + GprcMessageKind::Entry => "entry", + GprcMessageKind::Unknown => "unknown", + } + } +} + +pub fn dedup_inc() { + KAFKA_DEDUP_TOTAL.inc(); +} + +pub fn recv_inc() { + KAFKA_RECV_TOTAL.inc(); +} + +pub fn sent_inc(kind: GprcMessageKind) { + KAFKA_SENT_TOTAL.with_label_values(&[kind.as_str()]).inc() +} diff --git a/yellowstone-grpc-kafka/src/lib.rs b/yellowstone-grpc-tools/src/lib.rs similarity index 77% rename from yellowstone-grpc-kafka/src/lib.rs rename to yellowstone-grpc-tools/src/lib.rs index fd288ea1..2d1092c3 100644 --- a/yellowstone-grpc-kafka/src/lib.rs +++ b/yellowstone-grpc-tools/src/lib.rs @@ -2,8 +2,6 @@ #![deny(clippy::missing_const_for_fn)] #![deny(clippy::trivially_copy_pass_by_ref)] -pub mod config; -pub mod dedup; -pub mod grpc; +pub mod kafka; pub mod prom; pub mod version; diff --git a/yellowstone-grpc-tools/src/prom.rs b/yellowstone-grpc-tools/src/prom.rs new file mode 100644 index 00000000..af93f782 --- /dev/null +++ b/yellowstone-grpc-tools/src/prom.rs @@ -0,0 +1,89 @@ +use { + crate::{ + kafka::prom::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS}, + version::VERSION as VERSION_INFO, + }, + hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, StatusCode, + }, + prometheus::{IntCounterVec, Opts, Registry, TextEncoder}, + std::{net::SocketAddr, sync::Once}, + tracing::{error, info}, +}; + +lazy_static::lazy_static! { + static ref REGISTRY: Registry = Registry::new(); + + static ref VERSION: IntCounterVec = IntCounterVec::new( + Opts::new("version", "Plugin version info"), + &["buildts", "git", "package", "proto", "rustc", "solana", "version"] + ).unwrap(); +} + +pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { + static REGISTER: Once = Once::new(); + REGISTER.call_once(|| { + macro_rules! register { + ($collector:ident) => { + REGISTRY + .register(Box::new($collector.clone())) + .expect("collector can't be registered"); + }; + } + register!(VERSION); + register!(KAFKA_STATS); + register!(KAFKA_DEDUP_TOTAL); + register!(KAFKA_RECV_TOTAL); + register!(KAFKA_SENT_TOTAL); + + VERSION + .with_label_values(&[ + VERSION_INFO.buildts, + VERSION_INFO.git, + VERSION_INFO.package, + VERSION_INFO.proto, + VERSION_INFO.rustc, + VERSION_INFO.solana, + VERSION_INFO.version, + ]) + .inc(); + }); + + let make_service = make_service_fn(move |_: &AddrStream| async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { + let response = match req.uri().path() { + "/metrics" => metrics_handler(), + _ => not_found_handler(), + }; + Ok::<_, hyper::Error>(response) + })) + }); + let server = Server::try_bind(&address)?.serve(make_service); + info!("prometheus server started: {address:?}"); + tokio::spawn(async move { + if let Err(error) = server.await { + error!("prometheus server failed: {error:?}"); + } + }); + + Ok(()) +} + +fn metrics_handler() -> Response { + let metrics = TextEncoder::new() + .encode_to_string(®ISTRY.gather()) + .unwrap_or_else(|error| { + error!("could not encode custom metrics: {}", error); + String::new() + }); + Response::builder().body(Body::from(metrics)).unwrap() +} + +fn not_found_handler() -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} diff --git a/yellowstone-grpc-kafka/src/version.rs b/yellowstone-grpc-tools/src/version.rs similarity index 100% rename from yellowstone-grpc-kafka/src/version.rs rename to yellowstone-grpc-tools/src/version.rs