diff --git a/Cargo.lock b/Cargo.lock index 94c9f552c..946d8f16d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3396,6 +3396,7 @@ dependencies = [ "fuel-indexer-postgres", "sqlx", "thiserror", + "tracing", "url", ] diff --git a/packages/fuel-indexer-api-server/src/commands/run.rs b/packages/fuel-indexer-api-server/src/commands/run.rs index 836001ef5..27810fc45 100644 --- a/packages/fuel-indexer-api-server/src/commands/run.rs +++ b/packages/fuel-indexer-api-server/src/commands/run.rs @@ -18,7 +18,11 @@ pub async fn exec(args: ApiServerArgs) -> anyhow::Result<()> { let (tx, _) = channel::(SERVICE_REQUEST_CHANNEL_SIZE); - let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?; + let pool = IndexerConnectionPool::connect( + &config.database.to_string(), + config.max_db_connections, + ) + .await?; if config.run_migrations { let mut c = pool.acquire().await?; diff --git a/packages/fuel-indexer-benchmarks/src/bin/qa.rs b/packages/fuel-indexer-benchmarks/src/bin/qa.rs index 673e889a8..aae354e15 100644 --- a/packages/fuel-indexer-benchmarks/src/bin/qa.rs +++ b/packages/fuel-indexer-benchmarks/src/bin/qa.rs @@ -115,6 +115,8 @@ impl StatManager { / 1000.; let avg_cpu = runs.iter().map(|run| run.avg_cpu()).sum::() / runs.len() as f64; + let db_connections = runs.iter().map(|run| run.max_conn()).max().unwrap_or(0); + let max_db_connections = max_db_connections().unwrap_or(0); let avg_blocks_per_sec = runs.iter().map(|run| run.blocks_per_sec).sum::() / runs.len() as f64; let index_size = runs.iter().map(|run| run.index_size).sum::() as f64 @@ -148,6 +150,7 @@ runtime: {runtime:.1} minutes missing blocks: {missing_blocks} avg memory: {avg_memory:.1}kB avg cpu: {avg_cpu:.1}% +db connections: {db_connections}/{max_db_connections} avg blocks/sec: {avg_blocks_per_sec:.1} index size: {index_size:.1}kB per block @@ -173,6 +176,7 @@ struct RunStat { pub end_block: u32, pub mem: Vec, pub cpu: Vec, + pub conn: Vec, pub blocks_per_sec: f64, pub index_size: u64, pub missing_blocks: u64, @@ -187,6 +191,7 @@ impl RunStat { end_block, mem: Vec::new(), cpu: Vec::new(), + conn: Vec::new(), index_size: 0, blocks_per_sec: 0.0, missing_blocks: 0, @@ -209,12 +214,17 @@ impl RunStat { }; self.mem.push(mem); self.cpu.push(record_cpu_usage()); + self.conn.push(record_active_connections().unwrap_or(0)); } fn avg_mem(&self) -> u64 { self.mem.iter().sum::() / self.mem.len() as u64 } + fn max_conn(&self) -> usize { + *self.conn.iter().max().unwrap_or(&0) + } + fn stdv_mem(&self) -> f64 { let avg = self.avg_mem() as f64; let sum = self @@ -321,6 +331,8 @@ WHERE schema_name = 'fuellabs_explorer'; let stdv_cpu = self.stdv_cpu(); let avg_cpu = self.avg_cpu(); let stdv_mem = self.stdv_mem() / 1000.; + let db_connections = self.conn.iter().max().unwrap_or(&0usize); + let max_db_connections = max_db_connections().unwrap_or(0); let runtime = *runtime as f64 / 60.; let block_size = self.index_size as f64 / 1000.; @@ -334,6 +346,7 @@ run: {id} stdv memory: {stdv_mem:.1}kB avg cpu: {avg_cpu:.1}% stdv cpu: {stdv_cpu:.1}% + db connections: {db_connections}/{max_db_connections} missing blocks: {missing_blocks} blocks/sec: {blocks_per_sec:.1} index size: {block_size}kB per block"# @@ -505,6 +518,44 @@ fn record_mem_usage() -> Result { .read() } +fn record_active_connections() -> Result { + let proc = Command::new("psql") + .arg("-U") + .arg("postgres") + .arg("-c") + .arg("SELECT count(*) used FROM pg_stat_activity") + .arg("--no-align") + .arg("--tuples-only") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + let output = proc.wait_with_output().unwrap(); + let output = String::from_utf8(output.stdout).unwrap(); + let output = output.trim(); + let output = output.parse::().unwrap_or(0); + Ok(output) +} + +fn max_db_connections() -> Result { + let proc = Command::new("psql") + .arg("-U") + .arg("postgres") + .arg("-c") + .arg("SELECT setting::int max_conn FROM pg_settings WHERE name=$$max_connections$$") + .arg("--no-align") + .arg("--tuples-only") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + let output = proc.wait_with_output().unwrap(); + let output = String::from_utf8(output.stdout).unwrap(); + let output = output.trim(); + let output = output.parse::().unwrap_or(0); + Ok(output) +} + #[tokio::main] async fn main() { let opts = Args::from_args(); diff --git a/packages/fuel-indexer-database/Cargo.toml b/packages/fuel-indexer-database/Cargo.toml index 73cc34ca8..a20c03b6d 100644 --- a/packages/fuel-indexer-database/Cargo.toml +++ b/packages/fuel-indexer-database/Cargo.toml @@ -15,4 +15,5 @@ fuel-indexer-lib = { workspace = true } fuel-indexer-postgres = { workspace = true } sqlx = { version = "0.6" } thiserror = { workspace = true } +tracing = { workspace = true } url = "2.2" diff --git a/packages/fuel-indexer-database/src/lib.rs b/packages/fuel-indexer-database/src/lib.rs index 2a351b2e4..62a842450 100644 --- a/packages/fuel-indexer-database/src/lib.rs +++ b/packages/fuel-indexer-database/src/lib.rs @@ -59,6 +59,7 @@ impl IndexerConnectionPool { pub async fn connect( database_url: &str, + max_db_connections: u32, ) -> Result { let url = url::Url::parse(database_url); if url.is_err() { @@ -83,11 +84,18 @@ impl IndexerConnectionPool { opts.disable_statement_logging(); let pool = attempt_database_connection(|| { - sqlx::postgres::PgPoolOptions::new().connect_with(opts.clone()) + sqlx::postgres::PgPoolOptions::new() + .max_connections(max_db_connections) + .connect_with(opts.clone()) }) .await; - Ok(IndexerConnectionPool::Postgres(pool)) + let result = IndexerConnectionPool::Postgres(pool); + let backend_max_connections = result.max_connections().await?; + if backend_max_connections < max_db_connections { + tracing::warn!("Indexer --max-db-connections `{max_db_connections}` exceeds `{backend_max_connections}` value set by db backend") + }; + Ok(result) } err => Err(IndexerDatabaseError::BackendNotSupported(err.into())), } @@ -116,4 +124,17 @@ impl IndexerConnectionPool { } } } + + pub async fn max_connections(&self) -> sqlx::Result { + match self { + IndexerConnectionPool::Postgres(pool) => { + let max_connections: i32 = sqlx::query_scalar( + "SELECT setting::int FROM pg_settings WHERE name = 'max_connections'", + ) + .fetch_one(pool) + .await?; + Ok(max_connections as u32) + } + } + } } diff --git a/packages/fuel-indexer-lib/src/config/cli.rs b/packages/fuel-indexer-lib/src/config/cli.rs index e08611c5f..2f832ee46 100644 --- a/packages/fuel-indexer-lib/src/config/cli.rs +++ b/packages/fuel-indexer-lib/src/config/cli.rs @@ -64,6 +64,10 @@ pub struct IndexerArgs { #[clap(long, help = "Database type.", default_value = defaults::DATABASE, value_parser(["postgres"]))] pub database: String, + /// The maximum number of database connections. + #[clap(long, help = "The maximum number of database connections.", default_value_t = defaults::MAX_DB_CONNECTIONS)] + pub max_db_connections: u32, + /// Max body size for web server requests. #[clap(long, help = "Max body size for web server requests.", default_value_t = defaults::MAX_BODY_SIZE )] pub max_body_size: usize, @@ -262,6 +266,10 @@ pub struct ApiServerArgs { #[clap(long, help = "Database type.", default_value = defaults::DATABASE, value_parser(["postgres"]))] pub database: String, + /// The maximum number of database connections. + #[clap(long, help = "The maximum number of database connections.", default_value_t = defaults::MAX_DB_CONNECTIONS)] + pub max_db_connections: u32, + /// Max body size for web server requests. #[clap(long, help = "Max body size for web requests.", default_value_t = defaults::MAX_BODY_SIZE )] pub max_body_size: usize, diff --git a/packages/fuel-indexer-lib/src/config/mod.rs b/packages/fuel-indexer-lib/src/config/mod.rs index 6b76a678a..815cdd3f8 100644 --- a/packages/fuel-indexer-lib/src/config/mod.rs +++ b/packages/fuel-indexer-lib/src/config/mod.rs @@ -82,6 +82,7 @@ impl Default for IndexerArgs { web_api_host: defaults::WEB_API_HOST.to_string(), web_api_port: defaults::WEB_API_PORT.to_string(), database: defaults::DATABASE.to_string(), + max_db_connections: defaults::MAX_DB_CONNECTIONS, max_body_size: defaults::MAX_BODY_SIZE, postgres_user: Some(defaults::POSTGRES_USER.to_string()), postgres_database: Some(defaults::POSTGRES_DATABASE.to_string()), @@ -132,6 +133,7 @@ pub struct IndexerConfig { pub web_api: WebApiConfig, #[serde(default)] pub database: DatabaseConfig, + pub max_db_connections: u32, pub metrics: bool, pub stop_idle_indexers: bool, pub run_migrations: bool, @@ -157,6 +159,7 @@ impl Default for IndexerConfig { fuel_node: FuelClientConfig::default(), web_api: WebApiConfig::default(), database: DatabaseConfig::default(), + max_db_connections: defaults::MAX_DB_CONNECTIONS, metrics: defaults::USE_METRICS, stop_idle_indexers: defaults::STOP_IDLE_INDEXERS, run_migrations: defaults::RUN_MIGRATIONS, @@ -221,6 +224,7 @@ impl From for IndexerConfig { local_fuel_node: args.local_fuel_node, indexer_net_config: args.indexer_net_config, database, + max_db_connections: args.max_db_connections, fuel_node: FuelClientConfig { host: args.fuel_node_host, port: args.fuel_node_port, @@ -320,6 +324,7 @@ impl From for IndexerConfig { local_fuel_node: defaults::LOCAL_FUEL_NODE, indexer_net_config: defaults::INDEXER_NET_CONFIG, database, + max_db_connections: args.max_db_connections, fuel_node: FuelClientConfig { host: args.fuel_node_host, port: args.fuel_node_port, @@ -452,6 +457,7 @@ impl IndexerConfig { let fuel_config_key = serde_yaml::Value::String("fuel_node".into()); let web_config_key = serde_yaml::Value::String("web_api".into()); let database_config_key = serde_yaml::Value::String("database".into()); + let max_db_connections = serde_yaml::Value::String("max_db_connections".into()); let auth_config_key = serde_yaml::Value::String("authentication".into()); let rate_limit_config_key = serde_yaml::Value::String("rate_limit".into()); @@ -549,6 +555,10 @@ impl IndexerConfig { } } + if let Some(max_db_connections) = content.get(max_db_connections) { + config.max_db_connections = max_db_connections.as_u64().unwrap() as u32; + } + if let Some(section) = content.get(auth_config_key) { let auth_enabled = section.get(&serde_yaml::Value::String("enabled".into())); if let Some(auth_enabled) = auth_enabled { diff --git a/packages/fuel-indexer-lib/src/defaults.rs b/packages/fuel-indexer-lib/src/defaults.rs index 599fc3e6a..d3c19b8bc 100644 --- a/packages/fuel-indexer-lib/src/defaults.rs +++ b/packages/fuel-indexer-lib/src/defaults.rs @@ -143,3 +143,6 @@ pub const DISABLE_TOOLCHAIN_VERSION_CHECK: bool = false; /// Default Fuel network to use. pub const NETWORK: &str = "beta-4"; + +/// Maximum number of database connections. It the number exceeds the database backend setting, a warning will be issued. +pub const MAX_DB_CONNECTIONS: u32 = 100; diff --git a/packages/fuel-indexer-tests/src/fixtures.rs b/packages/fuel-indexer-tests/src/fixtures.rs index c7925edb4..686d74861 100644 --- a/packages/fuel-indexer-tests/src/fixtures.rs +++ b/packages/fuel-indexer-tests/src/fixtures.rs @@ -9,7 +9,7 @@ use fuel_indexer_api_server::api::WebApi; use fuel_indexer_database::IndexerConnectionPool; use fuel_indexer_lib::{ config::{DatabaseConfig, IndexerConfig, WebApiConfig}, - defaults::SERVICE_REQUEST_CHANNEL_SIZE, + defaults::{MAX_DB_CONNECTIONS, SERVICE_REQUEST_CHANNEL_SIZE}, manifest::Manifest, utils::{derive_socket_addr, ServiceRequest}, }; @@ -193,20 +193,22 @@ impl TestPostgresDb { .await?; // Instantiate a pool so that it can be stored in the struct for use in the tests - let pool = - match IndexerConnectionPool::connect(&test_db_config.clone().to_string()) - .await - { - Ok(pool) => match pool { - IndexerConnectionPool::Postgres(p) => { - let mut conn = p.acquire().await?; - - fuel_indexer_postgres::run_migration(&mut conn).await?; - p - } - }, - Err(e) => return Err(TestError::PoolCreationError(e)), - }; + let pool = match IndexerConnectionPool::connect( + &test_db_config.clone().to_string(), + MAX_DB_CONNECTIONS, + ) + .await + { + Ok(pool) => match pool { + IndexerConnectionPool::Postgres(p) => { + let mut conn = p.acquire().await?; + + fuel_indexer_postgres::run_migration(&mut conn).await?; + p + } + }, + Err(e) => return Err(TestError::PoolCreationError(e)), + }; Ok(Self { db_name, @@ -382,9 +384,12 @@ pub async fn api_server_app_postgres( config.database = DatabaseConfig::from_str(url).unwrap(); } - let pool = IndexerConnectionPool::connect(&config.database.to_string()) - .await - .unwrap(); + let pool = IndexerConnectionPool::connect( + &config.database.to_string(), + config.max_db_connections, + ) + .await + .unwrap(); let (tx, rx) = channel::(SERVICE_REQUEST_CHANNEL_SIZE); @@ -405,9 +410,12 @@ pub async fn indexer_service_postgres( let (_tx, rx) = channel::(SERVICE_REQUEST_CHANNEL_SIZE); - let pool = IndexerConnectionPool::connect(&config.database.to_string()) - .await - .unwrap(); + let pool = IndexerConnectionPool::connect( + &config.database.to_string(), + config.max_db_connections, + ) + .await + .unwrap(); IndexerService::new(config, pool, rx).await.unwrap() } diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap index 1c911339a..3cd5f6937 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap @@ -23,6 +23,7 @@ database: port: "5432" database: postgres verbose: "false" +max_db_connections: 100 metrics: true stop_idle_indexers: false run_migrations: true diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap index dd80e7000..e34a3018e 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap @@ -73,6 +73,9 @@ OPTIONS: --max-body-size Max body size for web server requests. [default: 5242880] + --max-db-connections + The maximum number of database connections. [default: 100] + --metering-points The number of WASM opcodes after which the indexer's event handler will stop execution. [default: 30000000000] diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_api_server_run_help_output.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_api_server_run_help_output.snap index 68f7b8f39..02c7b3de5 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_api_server_run_help_output.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_api_server_run_help_output.snap @@ -52,6 +52,9 @@ OPTIONS: --max-body-size Max body size for web requests. [default: 5242880] + --max-db-connections + The maximum number of database connections. [default: 100] + --metrics Use Prometheus metrics reporting. diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap index 45f36786e..08df71656 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap @@ -73,6 +73,9 @@ OPTIONS: --max-body-size Max body size for web server requests. [default: 5242880] + --max-db-connections + The maximum number of database connections. [default: 100] + --metering-points The number of WASM opcodes after which the indexer's event handler will stop execution. [default: 30000000000] diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index 79288caee..bb8648a1a 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -117,7 +117,11 @@ If the --embedded-database flag demonstrates flaky behavior on your machine, or #[allow(unused)] let (tx, rx) = channel::(defaults::SERVICE_REQUEST_CHANNEL_SIZE); - let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?; + let pool = IndexerConnectionPool::connect( + &config.database.to_string(), + config.max_db_connections, + ) + .await?; if config.run_migrations { let mut c = pool.acquire().await?; diff --git a/plugins/forc-index/src/ops/forc_index_start.rs b/plugins/forc-index/src/ops/forc_index_start.rs index f216c09f8..5dc346b1f 100644 --- a/plugins/forc-index/src/ops/forc_index_start.rs +++ b/plugins/forc-index/src/ops/forc_index_start.rs @@ -32,6 +32,7 @@ pub async fn init(command: StartCommand) -> anyhow::Result<()> { verbose, local_fuel_node, max_body_size, + max_db_connections, stop_idle_indexers, indexer_net_config, rate_limit, @@ -83,6 +84,8 @@ pub async fn init(command: StartCommand) -> anyhow::Result<()> { .arg(OsStr::new(&metering_points.to_string())); cmd.arg("--block-page-size") .arg(OsStr::new(&block_page_size.to_string())); + cmd.arg("--max-db-connections") + .arg(OsStr::new(&max_db_connections.to_string())); // Bool options let options = [ diff --git a/scripts/utils/build_modules.bash b/scripts/utils/build_modules.bash old mode 100644 new mode 100755