Skip to content

Commit

Permalink
Fix some uniqueness issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Jun 3, 2024
1 parent 0d5b67a commit b4980cb
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 43 deletions.
20 changes: 20 additions & 0 deletions db/migrations/005_network_rpc_contraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Step 1: Identify and remove duplicate rpcs
WITH ranked_rpcs AS (
SELECT
id,
chain_id,
kind,
ROW_NUMBER() OVER (PARTITION BY chain_id, kind ORDER BY id DESC) AS rank
FROM
rpcs
)
DELETE FROM rpcs
WHERE id IN (
SELECT id
FROM ranked_rpcs
WHERE rank > 1
);

-- Step 2: Add the uniqueness constraint
ALTER TABLE rpcs
ADD CONSTRAINT unique_chain_id_kind UNIQUE (chain_id, kind);
45 changes: 37 additions & 8 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tracing::instrument;

use crate::broadcast_utils::gas_estimation::FeesEstimate;
use crate::config::DatabaseConfig;
use crate::new_server::types::NetworkInfo;
use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority};

pub mod data;
Expand Down Expand Up @@ -963,7 +964,7 @@ impl Database {
}

#[instrument(skip(self), level = "debug")]
pub async fn create_network(
pub async fn upsert_network(
&self,
chain_id: u64,
name: &str,
Expand All @@ -976,7 +977,8 @@ impl Database {
r#"
INSERT INTO networks (chain_id, name)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
ON CONFLICT (chain_id) DO UPDATE
SET name = EXCLUDED.name
"#,
)
.bind(chain_id as i64)
Expand All @@ -988,14 +990,27 @@ impl Database {
r#"
INSERT INTO rpcs (chain_id, url, kind)
VALUES
($1, $2, $3),
($1, $4, $5)
ON CONFLICT DO NOTHING
($1, $2, $3)
ON CONFLICT (chain_id, kind) DO UPDATE
SET url = EXCLUDED.url
"#,
)
.bind(chain_id as i64)
.bind(http_rpc)
.bind(RpcKind::Http)
.execute(tx.as_mut())
.await?;

sqlx::query(
r#"
INSERT INTO rpcs (chain_id, url, kind)
VALUES
($1, $2, $3)
ON CONFLICT (chain_id, kind) DO UPDATE
SET url = EXCLUDED.url
"#,
)
.bind(chain_id as i64)
.bind(ws_rpc)
.bind(RpcKind::Ws)
.execute(tx.as_mut())
Expand Down Expand Up @@ -1042,6 +1057,20 @@ impl Database {
Ok(items.into_iter().map(|(x,)| x as u64).collect())
}

#[instrument(skip(self), level = "debug")]
pub async fn get_networks(&self) -> eyre::Result<Vec<NetworkInfo>> {
Ok(sqlx::query_as(
r#"
SELECT networks.chain_id, name, http.url as http_rpc, ws.url as ws_rpc
FROM networks
INNER JOIN rpcs http ON networks.chain_id = http.chain_id AND http.kind = 'http'
INNER JOIN rpcs ws ON networks.chain_id = ws.chain_id AND ws.kind = 'ws'
"#,
)
.fetch_all(&self.pool)
.await?)
}

#[instrument(skip(self), level = "debug")]
pub async fn create_api_key(
&self,
Expand Down Expand Up @@ -1255,7 +1284,7 @@ mod tests {
let http_rpc = "http_rpc";
let ws_rpc = "ws_rpc";

db.create_network(chain_id, network_name, http_rpc, ws_rpc)
db.upsert_network(chain_id, network_name, http_rpc, ws_rpc)
.await?;

let relayer_id = uuid();
Expand Down Expand Up @@ -1314,7 +1343,7 @@ mod tests {
let http_rpc = "http_rpc";
let ws_rpc = "ws_rpc";

db.create_network(chain_id, network_name, http_rpc, ws_rpc)
db.upsert_network(chain_id, network_name, http_rpc, ws_rpc)
.await?;

let relayer_id = uuid();
Expand Down Expand Up @@ -1391,7 +1420,7 @@ mod tests {
let http_rpc = "http_rpc";
let ws_rpc = "ws_rpc";

db.create_network(chain_id, network_name, http_rpc, ws_rpc)
db.upsert_network(chain_id, network_name, http_rpc, ws_rpc)
.await?;

let relayer_id = uuid();
Expand Down
16 changes: 12 additions & 4 deletions src/new_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use poem::{Endpoint, EndpointExt, Result, Route};
use poem_openapi::param::Path;
use poem_openapi::payload::{Json, PlainText};
use poem_openapi::{ApiResponse, OpenApi, OpenApiService};
use types::NetworkInfo;
use url::Url;

use crate::app::App;
use crate::service::Service;
use crate::task_runner::TaskRunner;

mod types;
pub mod types;

struct AdminApi;

Expand Down Expand Up @@ -100,7 +101,7 @@ impl AdminApi {
.map_err(|err| poem::error::BadRequest(err))?;

app.db
.create_network(
.upsert_network(
chain_id,
&network.name,
http_url.as_str(),
Expand Down Expand Up @@ -129,8 +130,15 @@ impl AdminApi {
async fn list_networks(
&self,
app: Data<&Arc<App>>,
) -> Result<Json<Vec<String>>> {
Ok(Json(vec![]))
) -> Result<Json<Vec<NetworkInfo>>> {
let networks = app.db.get_networks().await.map_err(|err| {
poem::error::Error::from_string(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
})?;

Ok(Json(networks))
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/new_server/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use poem_openapi::Object;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;

#[derive(Debug, Default, Clone, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
Expand All @@ -10,8 +11,15 @@ pub struct NewNetworkInfo {
pub ws_rpc: String,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, FromRow, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct NetworkInfo {

#[sqlx(try_from = "i64")]
pub chain_id: u64,
pub name: String,
pub http_rpc: String,
pub ws_rpc: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, Object)]
Expand Down
5 changes: 3 additions & 2 deletions src/server/routes/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use axum::extract::{Json, Path, State};
use eyre::Result;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use url::Url;

use crate::app::App;
Expand All @@ -18,7 +19,7 @@ pub struct NewNetworkInfo {
pub ws_rpc: String,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct NetworkInfo {
pub chain_id: u64,
Expand All @@ -44,7 +45,7 @@ pub async fn create_network(
})?;

app.db
.create_network(
.upsert_network(
chain_id,
&network.name,
http_url.as_str(),
Expand Down
2 changes: 1 addition & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn initialize_predefined_values(
tracing::warn!("Running with predefined values is not recommended in a production environment");

app.db
.create_network(
.upsert_network(
predefined.network.chain_id,
&predefined.network.name,
&predefined.network.http_rpc,
Expand Down
65 changes: 38 additions & 27 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub async fn index_block(

let metric_labels: [(&str, String); 1] =
[("chain_id", chain_id.to_string())];

for tx in mined_txs {
tracing::info!(
tx_id = tx.0,
Expand All @@ -94,43 +95,53 @@ pub async fn index_block(
Ok(())
}

#[tracing::instrument(skip(app, rpc, latest_block))]
#[tracing::instrument(skip(app, rpc, latest_block), level = "info")]
pub async fn backfill_to_block(
app: Arc<App>,
chain_id: u64,
rpc: &Provider<Http>,
latest_block: Block<H256>,
) -> eyre::Result<()> {
// Get the latest block from the db
if let Some(latest_db_block_number) =
let Some(latest_db_block_number) =
app.db.get_latest_block_number(chain_id).await?
{
let next_block_number: u64 = latest_db_block_number + 1;

// Get the first block from the stream and backfill any missing blocks
let latest_block_number = latest_block
.number
.context("Missing block number")?
.as_u64();

if latest_block_number > next_block_number {
// Backfill blocks between the last synced block and the chain head, non inclusive
for block_number in next_block_number..latest_block_number {
let block = rpc
.get_block::<BlockNumber>(block_number.into())
.await?
.context(format!(
"Could not get block at height {}",
block_number
))?;

index_block(app.clone(), chain_id, rpc, block).await?;
}
else {
tracing::info!(chain_id, "No latest block");
return Ok(());
};

let next_block_number: u64 = latest_db_block_number + 1;

// Get the first block from the stream and backfill any missing blocks
let latest_block_number = latest_block
.number
.context("Missing block number")?
.as_u64();

tracing::info!(
latest_block_number,
next_block_number,
"Backfilling to block"
);

if latest_block_number > next_block_number {
// Backfill blocks between the last synced block and the chain head, non inclusive
for block_number in next_block_number..latest_block_number {
let block = rpc
.get_block::<BlockNumber>(block_number.into())
.await?
.context(format!(
"Could not get block at height {}",
block_number
))?;

index_block(app.clone(), chain_id, rpc, block).await?;
}
}

// Index the latest block after backfilling
index_block(app.clone(), chain_id, rpc, latest_block).await?;

// Index the latest block after backfilling
index_block(app.clone(), chain_id, rpc, latest_block).await?;
};
Ok(())
}

Expand Down

0 comments on commit b4980cb

Please sign in to comment.