Skip to content

Commit

Permalink
feat: send messages to channels
Browse files Browse the repository at this point in the history
specifies two new channels meant for specific types of heads up. this
also offloads the alerts channel in an attempt to keep it high signal
for the whole team.
  • Loading branch information
alextes committed Nov 19, 2024
1 parent 1229fb8 commit 9e07ce1
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 72 deletions.
9 changes: 7 additions & 2 deletions src/phoenix/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod opsgenie;
pub mod telegram;

use telegram::Channel;
use tracing::{debug, error};

use crate::env::Network;
Expand All @@ -14,15 +15,17 @@ pub async fn send_opsgenie_telegram_alert(message: &str) {
let telegram_alerts = telegram::TelegramAlerts::new();

telegram_alerts
.send_alert_with_fallback(&TelegramSafeAlert::new(message))
.send_message(&TelegramSafeAlert::new(message), Channel::Alerts)
.await;

// Only send actual OpsGenie alerts on Mainnet.
if APP_CONFIG.network == Network::Mainnet {
let result_send_opsgenie_alert = opsgenie::send_opsgenie_alert(message).await;
match result_send_opsgenie_alert {
Ok(_) => {
debug!(message, "sent OpsGenie alert");
}
// If sending the OpsGenie alert fails, log the error and send a telegram message.
Err(err) => {
error!(?err, "failed to send OpsGenie alert");

Expand All @@ -31,7 +34,9 @@ pub async fn send_opsgenie_telegram_alert(message: &str) {
let message = format!("failed to send OpsGenie alert: {}", escaped_err);
TelegramSafeAlert::from_escaped_string(message)
};
telegram_alerts.send_alert_with_fallback(&message).await;
telegram_alerts
.send_message(&message, Channel::Alerts)
.await;
}
}
}
Expand Down
77 changes: 31 additions & 46 deletions src/phoenix/alerts/telegram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,21 @@ impl fmt::Display for TelegramSafeAlert {
}
}

enum NotificationType {
Warning,
Alert,
#[derive(Debug, Clone, Copy)]
pub enum Channel {
Alerts,
BlockNotFound,
Demotions,
Warnings,
}

impl fmt::Display for NotificationType {
impl fmt::Display for Channel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NotificationType::Warning => write!(f, "warning"),
NotificationType::Alert => write!(f, "alert"),
Channel::Alerts => write!(f, "alerts"),
Channel::BlockNotFound => write!(f, "block not found"),
Channel::Demotions => write!(f, "demotions"),
Channel::Warnings => write!(f, "warnings"),
}
}
}
Expand All @@ -109,14 +114,12 @@ impl TelegramAlerts {
}
}

async fn send_telegram_message(
&self,
notification_type: NotificationType,
message: &str,
) -> Result<()> {
async fn send_message_request(&self, notification_type: Channel, message: &str) -> Result<()> {
let channel_id = match notification_type {
NotificationType::Warning => APP_CONFIG.telegram_warnings_channel_id.as_str(),
NotificationType::Alert => APP_CONFIG.telegram_alerts_channel_id.as_str(),
Channel::Alerts => APP_CONFIG.telegram_alerts_channel_id.as_str(),
Channel::BlockNotFound => APP_CONFIG.telegram_block_not_found_channel_id.as_str(),
Channel::Demotions => APP_CONFIG.telegram_demotions_channel_id.as_str(),
Channel::Warnings => APP_CONFIG.telegram_warnings_channel_id.as_str(),
};

let url = format!(
Expand Down Expand Up @@ -152,40 +155,14 @@ impl TelegramAlerts {
}
}

pub async fn send_warning(&self, message: &TelegramSafeAlert) {
let result = self
.send_telegram_message(NotificationType::Warning, &message.0)
.await;

if let Err(err) = result {
tracing::error!(?err, "failed to send telegram warning");
}
}

async fn send_alert(&self, message: &TelegramSafeAlert) -> anyhow::Result<()> {
self.send_telegram_message(NotificationType::Alert, &message.0)
.await
}

/// Allows to send a telegram alert, with retry, and a simple fallback in case the passed message
/// fails to be delivered. Telegram has very sensitive rules about escaping. We may also at times
/// be rate limited.
pub async fn send_alert_with_fallback(&self, message: &TelegramSafeAlert) {
/// Send a telegram message with various precautions.
///
/// Messages are expected to be quite important like alerts. Messages will be retried.
/// If retries fail, a simple fallback message will be sent.
pub async fn send_message(&self, message: &TelegramSafeAlert, channel: Channel) {
// Retry twice, with a delay in between.
for index in 0..3 {
let message = if index == 2 {
// Last attempt. This message intentionally does not contain *any* special
// characters as many require escaping, and is within the character limit.
TelegramSafeAlert::new("failed to send telegram alert please check logs")
} else {
message.clone()
};

// We may be timing out, if this is not our first attempt, wait a bit.
if index != 0 {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
};

let send_result = self.send_alert(&message).await;
let send_result = self.send_message_request(channel, &message.0).await;

match send_result {
Ok(_) => {
Expand All @@ -199,8 +176,16 @@ impl TelegramAlerts {
%err,
"failed to send telegram alert"
);

// We did not succeed, wait then move on to the next attempt.
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}
}

// Last attempt. This message intentionally does not contain *any* special
// characters as many require escaping, and is within the character limit.
let message = TelegramSafeAlert::new("failed to send telegram alert please check logs");
self.send_message_request(channel, &message.0).await.ok();
}
}
10 changes: 7 additions & 3 deletions src/phoenix/demotion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use tracing::{debug, info};
use crate::{
env::ToBeaconExplorerUrl,
phoenix::{
alerts::telegram::TelegramSafeAlert, promotion_monitor::is_promotable_error, telegram,
alerts::telegram::{Channel, TelegramSafeAlert},
promotion_monitor::is_promotable_error,
telegram,
},
};

Expand Down Expand Up @@ -168,7 +170,7 @@ async fn generate_and_send_alerts(demotions: Vec<BuilderDemotion>) -> Result<()>
};
info!(?alert_message, "sending telegram alert");
telegram_alerts
.send_alert_with_fallback(&alert_message)
.send_message(&alert_message, Channel::Demotions)
.await
}

Expand All @@ -182,7 +184,9 @@ async fn generate_and_send_alerts(demotions: Vec<BuilderDemotion>) -> Result<()>
};
info!(?warning_message, "sending telegram warning");

telegram_alerts.send_warning(&warning_message).await
telegram_alerts
.send_message(&warning_message, Channel::Warnings)
.await
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/phoenix/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct AppConfig {
pub relay_database_url: String,
pub telegram_api_key: String,
pub telegram_alerts_channel_id: String,
pub telegram_block_not_found_channel_id: String,
pub telegram_demotions_channel_id: String,
pub telegram_warnings_channel_id: String,
#[serde(deserialize_with = "deserialize_urls")]
pub validation_nodes: Vec<Url>,
Expand Down
29 changes: 12 additions & 17 deletions src/phoenix/inclusion_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
use self::{loki_client::LatePayloadStats, proposer_meta::ProposerLocation};

use super::{
alerts::telegram::{self, TelegramSafeAlert},
alerts::telegram::{self, Channel, TelegramSafeAlert},
checkpoint::{self, CheckpointId},
env::{Geo, APP_CONFIG},
};
Expand Down Expand Up @@ -297,25 +297,20 @@ async fn report_missing_payload(
message.push_str("no late call warnings found");
}

// Late call or attempted reorg, these are much less concerning.
if published_stats.is_none() && late_call_stats.is_some() || is_attempted_reorg {
message.push_str("\n\n");
message.push_str(
"'no publish attempted and late call' or 'attempted reorg' these are less concerning",
);
}

let telegram_alerts = telegram::TelegramAlerts::new();
let escaped_message = TelegramSafeAlert::from_escaped_string(message);

// Publish errors: alert
if !publish_errors.is_empty() {
telegram_alerts
.send_alert_with_fallback(&escaped_message)
.await;
}
// Late call or attempted reorg: warn
else if published_stats.is_none() && late_call_stats.is_some() || is_attempted_reorg {
telegram_alerts.send_warning(&escaped_message).await;
}
// Otherwise: alert
else {
telegram_alerts
.send_alert_with_fallback(&escaped_message)
.await;
}
telegram_alerts
.send_message(&escaped_message, Channel::BlockNotFound)
.await;

Ok(())
}
Expand Down
12 changes: 8 additions & 4 deletions src/phoenix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
sync::{Arc, Mutex},
};

use alerts::telegram::TELEGRAM_SAFE_MESSAGE_LENGTH;
use alerts::telegram::{Channel, TELEGRAM_SAFE_MESSAGE_LENGTH};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use axum::{http::StatusCode, routing::get, Router};
Expand Down Expand Up @@ -93,7 +93,7 @@ impl Alarm {
AlarmType::Opsgenie => alerts::send_opsgenie_telegram_alert(message).await,
AlarmType::Telegram => {
self.telegram_alerts
.send_warning(&TelegramSafeAlert::new(message))
.send_message(&TelegramSafeAlert::new(message), Channel::Warnings)
.await
}
}
Expand Down Expand Up @@ -334,7 +334,9 @@ pub async fn monitor_critical_services() -> Result<()> {

async fn handle_unexpected_exit(telegram_alerts: TelegramAlerts) -> Result<()> {
let message = TelegramSafeAlert::new("phoenix processes exited unexpectedly");
telegram_alerts.send_alert_with_fallback(&message).await;
telegram_alerts
.send_message(&message, Channel::Alerts)
.await;
Err(anyhow!(message))
}

Expand All @@ -357,6 +359,8 @@ async fn handle_unexpected_error(
"
);
let message = TelegramSafeAlert::from_escaped_string(formatted_message);
telegram_alerts.send_alert_with_fallback(&message).await;
telegram_alerts
.send_message(&message, Channel::Alerts)
.await;
Err(anyhow!(err))
}

0 comments on commit 9e07ce1

Please sign in to comment.