Skip to content

Commit

Permalink
tools: correctly handle SIGINT in kafka (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Oct 26, 2023
1 parent 024f510 commit c604138
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 27 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Fixes

- Trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207))
- geyser: trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207))
- tools: correctly handle SIGINT in kafka ([#219](https://github.com/rpcpool/yellowstone-grpc/pull/219))

### Features

Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ hyper = { version = "0.14.27", features = ["server"] }
json5 = "0.4.1"
lazy_static = "1.4.0"
prometheus = "0.13.2"
rdkafka = { version = "0.33.2", features = ["ssl", "sasl"] }
rdkafka = { version = "0.34.0", features = ["ssl", "sasl"] }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.86"
serde_yaml = "0.9.25"
Expand Down
82 changes: 68 additions & 14 deletions yellowstone-grpc-tools/src/bin/grpc-kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,18 @@ impl ArgsAction {
}

// input
let consumer = prom::StatsContext::create_stream_consumer(&kafka_config)
let (consumer, kafka_error_rx1) = prom::StatsContext::create_stream_consumer(&kafka_config)
.context("failed to create kafka consumer")?;
consumer.subscribe(&[&config.kafka_input])?;

// output
let kafka = prom::StatsContext::create_future_producer(&kafka_config)
let (kafka, kafka_error_rx2) = prom::StatsContext::create_future_producer(&kafka_config)
.context("failed to create kafka producer")?;

let mut kafka_error = false;
let kafka_error_rx = futures::future::join(kafka_error_rx1, kafka_error_rx2);
tokio::pin!(kafka_error_rx);

// dedup
let dedup = config.backend.create().await?;

Expand All @@ -109,13 +113,21 @@ impl ArgsAction {
loop {
let message = tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
maybe_result = send_tasks.join_next() => match maybe_result {
Some(result) => {
result??;
continue;
}
None => tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
message = consumer.recv() => message,
}
},
Expand Down Expand Up @@ -175,6 +187,10 @@ impl ArgsAction {
if send_tasks.len() >= config.kafka_queue_size {
tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
result = send_tasks.join_next() => {
if let Some(result) = result {
result??;
Expand All @@ -183,9 +199,17 @@ impl ArgsAction {
}
}
}
warn!("shutdown received...");
while let Some(result) = send_tasks.join_next().await {
result??;
if !kafka_error {
warn!("shutdown received...");
loop {
tokio::select! {
_ = &mut kafka_error_rx => break,
result = send_tasks.join_next() => match result {
Some(result) => result??,
None => break
}
}
}
}
Ok(())
}
Expand All @@ -200,8 +224,10 @@ impl ArgsAction {
}

// Connect to kafka
let kafka = prom::StatsContext::create_future_producer(&kafka_config)
let (kafka, kafka_error_rx) = prom::StatsContext::create_future_producer(&kafka_config)
.context("failed to create kafka producer")?;
let mut kafka_error = false;
tokio::pin!(kafka_error_rx);

// Create gRPC client & subscribe
let mut client = GeyserGrpcClient::connect_with_timeout(
Expand All @@ -220,13 +246,21 @@ impl ArgsAction {
loop {
let message = tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
maybe_result = send_tasks.join_next() => match maybe_result {
Some(result) => {
let _ = result??;
result??;
continue;
}
None => tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
message = geyser.next() => message,
}
},
Expand Down Expand Up @@ -264,13 +298,17 @@ impl ArgsAction {
let result = future.await;
debug!("kafka send message with key: {key}, result: {result:?}");

let result = result?.map_err(|(error, _message)| error)?;
let _ = result?.map_err(|(error, _message)| error)?;
prom::sent_inc(prom_kind);
Ok::<(i32, i64), anyhow::Error>(result)
Ok::<(), anyhow::Error>(())
});
if send_tasks.len() >= config.kafka_queue_size {
tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
result = send_tasks.join_next() => {
if let Some(result) = result {
result??;
Expand All @@ -285,9 +323,17 @@ impl ArgsAction {
None => break,
}
}
warn!("shutdown received...");
while let Some(result) = send_tasks.join_next().await {
let _ = result??;
if !kafka_error {
warn!("shutdown received...");
loop {
tokio::select! {
_ = &mut kafka_error_rx => break,
result = send_tasks.join_next() => match result {
Some(result) => result??,
None => break
}
}
}
}
Ok(())
}
Expand All @@ -303,13 +349,19 @@ impl ArgsAction {

let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?;

let consumer = prom::StatsContext::create_stream_consumer(&kafka_config)
let (consumer, kafka_error_rx) = prom::StatsContext::create_stream_consumer(&kafka_config)
.context("failed to create kafka consumer")?;
let mut kafka_error = false;
tokio::pin!(kafka_error_rx);
consumer.subscribe(&[&config.kafka_topic])?;

loop {
let message = tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break
},
message = consumer.recv() => message?,
};
prom::recv_inc();
Expand All @@ -330,7 +382,9 @@ impl ArgsAction {
}
}

warn!("shutdown received...");
if !kafka_error {
warn!("shutdown received...");
}
Ok(grpc_shutdown.await??)
}
}
Expand Down
68 changes: 59 additions & 9 deletions yellowstone-grpc-tools/src/kafka/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use {
crate::prom::GprcMessageKind,
prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts},
rdkafka::{
client::ClientContext,
config::{ClientConfig, FromClientConfigAndContext},
client::{ClientContext, DefaultClientContext},
config::{ClientConfig, FromClientConfigAndContext, RDKafkaLogLevel},
consumer::{ConsumerContext, StreamConsumer},
error::KafkaResult,
error::{KafkaError, KafkaResult},
producer::FutureProducer,
statistics::Statistics,
},
std::sync::Mutex,
tokio::sync::oneshot,
};

lazy_static::lazy_static! {
Expand All @@ -31,8 +33,30 @@ lazy_static::lazy_static! {
).unwrap();
}

#[derive(Debug, Default, Clone, Copy)]
pub struct StatsContext;
#[derive(Debug)]
pub struct StatsContext {
default: DefaultClientContext,
error_tx: Mutex<Option<oneshot::Sender<()>>>,
}

impl StatsContext {
fn new() -> (Self, oneshot::Receiver<()>) {
let (error_tx, error_rx) = oneshot::channel();
(
Self {
default: DefaultClientContext,
error_tx: Mutex::new(Some(error_tx)),
},
error_rx,
)
}

fn send_error(&self) {
if let Some(error_tx) = self.error_tx.lock().expect("alive mutex").take() {
let _ = error_tx.send(());
}
}
}

impl ClientContext for StatsContext {
fn stats(&self, statistics: Statistics) {
Expand Down Expand Up @@ -89,17 +113,43 @@ impl ClientContext for StatsContext {
}
}
}

fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
self.default.log(level, fac, log_message);
if matches!(
level,
RDKafkaLogLevel::Emerg
| RDKafkaLogLevel::Alert
| RDKafkaLogLevel::Critical
| RDKafkaLogLevel::Error
) {
self.send_error()
}
}

fn error(&self, error: KafkaError, reason: &str) {
self.default.error(error, reason);
self.send_error()
}
}

impl ConsumerContext for StatsContext {}

impl StatsContext {
pub fn create_future_producer(config: &ClientConfig) -> KafkaResult<FutureProducer<Self>> {
FutureProducer::from_config_and_context(config, Self)
pub fn create_future_producer(
config: &ClientConfig,
) -> KafkaResult<(FutureProducer<Self>, oneshot::Receiver<()>)> {
let (context, error_rx) = Self::new();
FutureProducer::from_config_and_context(config, context)
.map(|producer| (producer, error_rx))
}

pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult<StreamConsumer<Self>> {
StreamConsumer::from_config_and_context(config, Self)
pub fn create_stream_consumer(
config: &ClientConfig,
) -> KafkaResult<(StreamConsumer<Self>, oneshot::Receiver<()>)> {
let (context, error_rx) = Self::new();
StreamConsumer::from_config_and_context(config, context)
.map(|consumer| (consumer, error_rx))
}
}

Expand Down

0 comments on commit c604138

Please sign in to comment.