Skip to content

Commit

Permalink
feat(inclusion_monitor): add ip, city, country
Browse files Browse the repository at this point in the history
  • Loading branch information
alextes committed Dec 14, 2023
1 parent e34efc9 commit d0b68e6
Showing 1 changed file with 101 additions and 3 deletions.
104 changes: 101 additions & 3 deletions src/phoenix/inclusion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct ProposerLabelMeta {
pub lido_operator: Option<String>,
}

pub async fn proposer_label_meta(
async fn proposer_label_meta(
pg_pool: &PgPool,
proposer_pubkey: &str,
) -> anyhow::Result<ProposerLabelMeta> {
Expand All @@ -136,9 +136,78 @@ pub async fn proposer_label_meta(
.context("failed to get proposer label meta")
}

async fn proposer_registration_ip(
pg_pool: &PgPool,
proposer_pubkey: &str,
) -> anyhow::Result<Option<String>> {
sqlx::query(
"
SELECT last_registration_ip_address
FROM validators
WHERE pubkey = $1
",
)
.bind(proposer_pubkey)
.fetch_optional(pg_pool)
.await
.map(|row| row.and_then(|row| row.get(0)))
.context("failed to get proposer ip")
}

async fn proposer_payload_request_ip(
pg_pool: &PgPool,
proposer_pubkey: &str,
) -> anyhow::Result<Option<String>> {
sqlx::query(
"
SELECT ip
FROM payload_requests
WHERE pubkey = $1
",
)
.bind(proposer_pubkey)
.fetch_optional(pg_pool)
.await
.map(|row| row.and_then(|row| row.get(0)))
.context("failed to get proposer ip")
}

async fn get_proposer_ip(
pg_pool: &PgPool,
proposer_pubkey: &str,
) -> anyhow::Result<Option<String>> {
let registration_ip = proposer_registration_ip(pg_pool, proposer_pubkey).await?;
let payload_request_ip = proposer_payload_request_ip(pg_pool, proposer_pubkey).await?;

Ok(registration_ip.or(payload_request_ip))
}

#[derive(Default, sqlx::FromRow)]
struct ProposerLocation {
pub country: Option<String>,
pub city: Option<String>,
}

async fn proposer_location(pg_pool: &PgPool, ip_address: &str) -> anyhow::Result<ProposerLocation> {
sqlx::query_as::<_, ProposerLocation>(
"
SELECT country, city
FROM ip_meta
WHERE ip_address = $1
",
)
.bind(ip_address)
.fetch_optional(pg_pool)
.await
.map(|row| row.unwrap_or_default())
.context("failed to get proposer location")
}

fn format_delivered_not_found_message(
log_stats: Option<PayloadLogStats>,
proposer_meta: ProposerLabelMeta,
proposer_ip: Option<String>,
proposer_location: ProposerLocation,
slot: &i64,
) -> String {
let explorer_url = APP_CONFIG.env.to_beacon_explorer_url();
Expand Down Expand Up @@ -171,6 +240,21 @@ fn format_delivered_not_found_message(
telegram_escape(grafitti)
};

let proposer_ip = {
let ip = proposer_ip.unwrap_or("-".to_string());
telegram_escape(&ip)
};

let proposer_country = {
let country = proposer_location.country.unwrap_or("-".to_string());
telegram_escape(&country)
};

let proposer_city = {
let city = proposer_location.city.unwrap_or("-".to_string());
telegram_escape(&city)
};

formatdoc!(
"
delivered block not found for slot
Expand All @@ -180,7 +264,10 @@ fn format_delivered_not_found_message(
```
decoded_at_slot_age_ms: {decoded_at_slot_age_ms}
pre_publish_duration_ms: {pre_publish_duration_ms}
proposer_city: {proposer_city}
proposer_country: {proposer_country}
proposer_grafitti: {grafitti}
proposer_ip: {proposer_ip}
proposer_label: {operator}
proposer_lido_operator: {lido_operator}
publish_duration_ms: {publish_duration_ms}
Expand Down Expand Up @@ -273,8 +360,19 @@ pub async fn run_inclusion_monitor(
let log_stats = log_client.payload_logs(payload.slot).await?;
let proposer_meta =
proposer_label_meta(mev_pool, &payload.proposer_pubkey).await?;
let msg =
format_delivered_not_found_message(log_stats, proposer_meta, &payload.slot);
let proposer_ip = get_proposer_ip(mev_pool, &payload.proposer_pubkey).await?;
let proposer_location = if let Some(proposer_ip) = &proposer_ip {
proposer_location(mev_pool, proposer_ip).await?
} else {
ProposerLocation::default()
};
let msg = format_delivered_not_found_message(
log_stats,
proposer_meta,
proposer_ip,
proposer_location,
&payload.slot,
);

alert::send_telegram_alert(&msg).await?;
} else {
Expand Down

0 comments on commit d0b68e6

Please sign in to comment.