Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
bugfix: set max_connections sqlx parameter (#1508)
Browse files Browse the repository at this point in the history
* set max_connections sqlx parameter

* track db connections in qa script

* update insta snapshots

* remove debug print statement

* restore manifests

* remove print statement
  • Loading branch information
lostman authored Dec 8, 2023
1 parent 3644a04 commit 58cdb47
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion packages/fuel-indexer-api-server/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ pub async fn exec(args: ApiServerArgs) -> anyhow::Result<()> {

let (tx, _) = channel::<ServiceRequest>(SERVICE_REQUEST_CHANNEL_SIZE);

let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?;
let pool = IndexerConnectionPool::connect(
&config.database.to_string(),
config.max_db_connections,
)
.await?;

if config.run_migrations {
let mut c = pool.acquire().await?;
Expand Down
51 changes: 51 additions & 0 deletions packages/fuel-indexer-benchmarks/src/bin/qa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ impl StatManager {
/ 1000.;
let avg_cpu =
runs.iter().map(|run| run.avg_cpu()).sum::<f64>() / runs.len() as f64;
let db_connections = runs.iter().map(|run| run.max_conn()).max().unwrap_or(0);
let max_db_connections = max_db_connections().unwrap_or(0);
let avg_blocks_per_sec =
runs.iter().map(|run| run.blocks_per_sec).sum::<f64>() / runs.len() as f64;
let index_size = runs.iter().map(|run| run.index_size).sum::<u64>() as f64
Expand Down Expand Up @@ -148,6 +150,7 @@ runtime: {runtime:.1} minutes
missing blocks: {missing_blocks}
avg memory: {avg_memory:.1}kB
avg cpu: {avg_cpu:.1}%
db connections: {db_connections}/{max_db_connections}
avg blocks/sec: {avg_blocks_per_sec:.1}
index size: {index_size:.1}kB per block
Expand All @@ -173,6 +176,7 @@ struct RunStat {
pub end_block: u32,
pub mem: Vec<u64>,
pub cpu: Vec<f64>,
pub conn: Vec<usize>,
pub blocks_per_sec: f64,
pub index_size: u64,
pub missing_blocks: u64,
Expand All @@ -187,6 +191,7 @@ impl RunStat {
end_block,
mem: Vec::new(),
cpu: Vec::new(),
conn: Vec::new(),
index_size: 0,
blocks_per_sec: 0.0,
missing_blocks: 0,
Expand All @@ -209,12 +214,17 @@ impl RunStat {
};
self.mem.push(mem);
self.cpu.push(record_cpu_usage());
self.conn.push(record_active_connections().unwrap_or(0));
}

fn avg_mem(&self) -> u64 {
self.mem.iter().sum::<u64>() / self.mem.len() as u64
}

fn max_conn(&self) -> usize {
*self.conn.iter().max().unwrap_or(&0)
}

fn stdv_mem(&self) -> f64 {
let avg = self.avg_mem() as f64;
let sum = self
Expand Down Expand Up @@ -321,6 +331,8 @@ WHERE schema_name = 'fuellabs_explorer';
let stdv_cpu = self.stdv_cpu();
let avg_cpu = self.avg_cpu();
let stdv_mem = self.stdv_mem() / 1000.;
let db_connections = self.conn.iter().max().unwrap_or(&0usize);
let max_db_connections = max_db_connections().unwrap_or(0);
let runtime = *runtime as f64 / 60.;
let block_size = self.index_size as f64 / 1000.;

Expand All @@ -334,6 +346,7 @@ run: {id}
stdv memory: {stdv_mem:.1}kB
avg cpu: {avg_cpu:.1}%
stdv cpu: {stdv_cpu:.1}%
db connections: {db_connections}/{max_db_connections}
missing blocks: {missing_blocks}
blocks/sec: {blocks_per_sec:.1}
index size: {block_size}kB per block"#
Expand Down Expand Up @@ -505,6 +518,44 @@ fn record_mem_usage() -> Result<String, std::io::Error> {
.read()
}

fn record_active_connections() -> Result<usize, std::io::Error> {
let proc = Command::new("psql")
.arg("-U")
.arg("postgres")
.arg("-c")
.arg("SELECT count(*) used FROM pg_stat_activity")
.arg("--no-align")
.arg("--tuples-only")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let output = proc.wait_with_output().unwrap();
let output = String::from_utf8(output.stdout).unwrap();
let output = output.trim();
let output = output.parse::<usize>().unwrap_or(0);
Ok(output)
}

fn max_db_connections() -> Result<usize, std::io::Error> {
let proc = Command::new("psql")
.arg("-U")
.arg("postgres")
.arg("-c")
.arg("SELECT setting::int max_conn FROM pg_settings WHERE name=$$max_connections$$")
.arg("--no-align")
.arg("--tuples-only")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let output = proc.wait_with_output().unwrap();
let output = String::from_utf8(output.stdout).unwrap();
let output = output.trim();
let output = output.parse::<usize>().unwrap_or(0);
Ok(output)
}

#[tokio::main]
async fn main() {
let opts = Args::from_args();
Expand Down
1 change: 1 addition & 0 deletions packages/fuel-indexer-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ fuel-indexer-lib = { workspace = true }
fuel-indexer-postgres = { workspace = true }
sqlx = { version = "0.6" }
thiserror = { workspace = true }
tracing = { workspace = true }
url = "2.2"
25 changes: 23 additions & 2 deletions packages/fuel-indexer-database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl IndexerConnectionPool {

pub async fn connect(
database_url: &str,
max_db_connections: u32,
) -> Result<IndexerConnectionPool, IndexerDatabaseError> {
let url = url::Url::parse(database_url);
if url.is_err() {
Expand All @@ -83,11 +84,18 @@ impl IndexerConnectionPool {
opts.disable_statement_logging();

let pool = attempt_database_connection(|| {
sqlx::postgres::PgPoolOptions::new().connect_with(opts.clone())
sqlx::postgres::PgPoolOptions::new()
.max_connections(max_db_connections)
.connect_with(opts.clone())
})
.await;

Ok(IndexerConnectionPool::Postgres(pool))
let result = IndexerConnectionPool::Postgres(pool);
let backend_max_connections = result.max_connections().await?;
if backend_max_connections < max_db_connections {
tracing::warn!("Indexer --max-db-connections `{max_db_connections}` exceeds `{backend_max_connections}` value set by db backend")
};
Ok(result)
}
err => Err(IndexerDatabaseError::BackendNotSupported(err.into())),
}
Expand Down Expand Up @@ -116,4 +124,17 @@ impl IndexerConnectionPool {
}
}
}

pub async fn max_connections(&self) -> sqlx::Result<u32> {
match self {
IndexerConnectionPool::Postgres(pool) => {
let max_connections: i32 = sqlx::query_scalar(
"SELECT setting::int FROM pg_settings WHERE name = 'max_connections'",
)
.fetch_one(pool)
.await?;
Ok(max_connections as u32)
}
}
}
}
8 changes: 8 additions & 0 deletions packages/fuel-indexer-lib/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct IndexerArgs {
#[clap(long, help = "Database type.", default_value = defaults::DATABASE, value_parser(["postgres"]))]
pub database: String,

/// The maximum number of database connections.
#[clap(long, help = "The maximum number of database connections.", default_value_t = defaults::MAX_DB_CONNECTIONS)]
pub max_db_connections: u32,

/// Max body size for web server requests.
#[clap(long, help = "Max body size for web server requests.", default_value_t = defaults::MAX_BODY_SIZE )]
pub max_body_size: usize,
Expand Down Expand Up @@ -262,6 +266,10 @@ pub struct ApiServerArgs {
#[clap(long, help = "Database type.", default_value = defaults::DATABASE, value_parser(["postgres"]))]
pub database: String,

/// The maximum number of database connections.
#[clap(long, help = "The maximum number of database connections.", default_value_t = defaults::MAX_DB_CONNECTIONS)]
pub max_db_connections: u32,

/// Max body size for web server requests.
#[clap(long, help = "Max body size for web requests.", default_value_t = defaults::MAX_BODY_SIZE )]
pub max_body_size: usize,
Expand Down
10 changes: 10 additions & 0 deletions packages/fuel-indexer-lib/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Default for IndexerArgs {
web_api_host: defaults::WEB_API_HOST.to_string(),
web_api_port: defaults::WEB_API_PORT.to_string(),
database: defaults::DATABASE.to_string(),
max_db_connections: defaults::MAX_DB_CONNECTIONS,
max_body_size: defaults::MAX_BODY_SIZE,
postgres_user: Some(defaults::POSTGRES_USER.to_string()),
postgres_database: Some(defaults::POSTGRES_DATABASE.to_string()),
Expand Down Expand Up @@ -132,6 +133,7 @@ pub struct IndexerConfig {
pub web_api: WebApiConfig,
#[serde(default)]
pub database: DatabaseConfig,
pub max_db_connections: u32,
pub metrics: bool,
pub stop_idle_indexers: bool,
pub run_migrations: bool,
Expand All @@ -157,6 +159,7 @@ impl Default for IndexerConfig {
fuel_node: FuelClientConfig::default(),
web_api: WebApiConfig::default(),
database: DatabaseConfig::default(),
max_db_connections: defaults::MAX_DB_CONNECTIONS,
metrics: defaults::USE_METRICS,
stop_idle_indexers: defaults::STOP_IDLE_INDEXERS,
run_migrations: defaults::RUN_MIGRATIONS,
Expand Down Expand Up @@ -221,6 +224,7 @@ impl From<IndexerArgs> for IndexerConfig {
local_fuel_node: args.local_fuel_node,
indexer_net_config: args.indexer_net_config,
database,
max_db_connections: args.max_db_connections,
fuel_node: FuelClientConfig {
host: args.fuel_node_host,
port: args.fuel_node_port,
Expand Down Expand Up @@ -320,6 +324,7 @@ impl From<ApiServerArgs> for IndexerConfig {
local_fuel_node: defaults::LOCAL_FUEL_NODE,
indexer_net_config: defaults::INDEXER_NET_CONFIG,
database,
max_db_connections: args.max_db_connections,
fuel_node: FuelClientConfig {
host: args.fuel_node_host,
port: args.fuel_node_port,
Expand Down Expand Up @@ -452,6 +457,7 @@ impl IndexerConfig {
let fuel_config_key = serde_yaml::Value::String("fuel_node".into());
let web_config_key = serde_yaml::Value::String("web_api".into());
let database_config_key = serde_yaml::Value::String("database".into());
let max_db_connections = serde_yaml::Value::String("max_db_connections".into());
let auth_config_key = serde_yaml::Value::String("authentication".into());
let rate_limit_config_key = serde_yaml::Value::String("rate_limit".into());

Expand Down Expand Up @@ -549,6 +555,10 @@ impl IndexerConfig {
}
}

if let Some(max_db_connections) = content.get(max_db_connections) {
config.max_db_connections = max_db_connections.as_u64().unwrap() as u32;
}

if let Some(section) = content.get(auth_config_key) {
let auth_enabled = section.get(&serde_yaml::Value::String("enabled".into()));
if let Some(auth_enabled) = auth_enabled {
Expand Down
3 changes: 3 additions & 0 deletions packages/fuel-indexer-lib/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ pub const DISABLE_TOOLCHAIN_VERSION_CHECK: bool = false;

/// Default Fuel network to use.
pub const NETWORK: &str = "beta-4";

/// Maximum number of database connections. It the number exceeds the database backend setting, a warning will be issued.
pub const MAX_DB_CONNECTIONS: u32 = 100;
50 changes: 29 additions & 21 deletions packages/fuel-indexer-tests/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use fuel_indexer_api_server::api::WebApi;
use fuel_indexer_database::IndexerConnectionPool;
use fuel_indexer_lib::{
config::{DatabaseConfig, IndexerConfig, WebApiConfig},
defaults::SERVICE_REQUEST_CHANNEL_SIZE,
defaults::{MAX_DB_CONNECTIONS, SERVICE_REQUEST_CHANNEL_SIZE},
manifest::Manifest,
utils::{derive_socket_addr, ServiceRequest},
};
Expand Down Expand Up @@ -193,20 +193,22 @@ impl TestPostgresDb {
.await?;

// Instantiate a pool so that it can be stored in the struct for use in the tests
let pool =
match IndexerConnectionPool::connect(&test_db_config.clone().to_string())
.await
{
Ok(pool) => match pool {
IndexerConnectionPool::Postgres(p) => {
let mut conn = p.acquire().await?;

fuel_indexer_postgres::run_migration(&mut conn).await?;
p
}
},
Err(e) => return Err(TestError::PoolCreationError(e)),
};
let pool = match IndexerConnectionPool::connect(
&test_db_config.clone().to_string(),
MAX_DB_CONNECTIONS,
)
.await
{
Ok(pool) => match pool {
IndexerConnectionPool::Postgres(p) => {
let mut conn = p.acquire().await?;

fuel_indexer_postgres::run_migration(&mut conn).await?;
p
}
},
Err(e) => return Err(TestError::PoolCreationError(e)),
};

Ok(Self {
db_name,
Expand Down Expand Up @@ -382,9 +384,12 @@ pub async fn api_server_app_postgres(
config.database = DatabaseConfig::from_str(url).unwrap();
}

let pool = IndexerConnectionPool::connect(&config.database.to_string())
.await
.unwrap();
let pool = IndexerConnectionPool::connect(
&config.database.to_string(),
config.max_db_connections,
)
.await
.unwrap();

let (tx, rx) = channel::<ServiceRequest>(SERVICE_REQUEST_CHANNEL_SIZE);

Expand All @@ -405,9 +410,12 @@ pub async fn indexer_service_postgres(

let (_tx, rx) = channel::<ServiceRequest>(SERVICE_REQUEST_CHANNEL_SIZE);

let pool = IndexerConnectionPool::connect(&config.database.to_string())
.await
.unwrap();
let pool = IndexerConnectionPool::connect(
&config.database.to_string(),
config.max_db_connections,
)
.await
.unwrap();

IndexerService::new(config, pool, rx).await.unwrap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ database:
port: "5432"
database: postgres
verbose: "false"
max_db_connections: 100
metrics: true
stop_idle_indexers: false
run_migrations: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ OPTIONS:
--max-body-size <MAX_BODY_SIZE>
Max body size for web server requests. [default: 5242880]

--max-db-connections <MAX_DB_CONNECTIONS>
The maximum number of database connections. [default: 100]

--metering-points <METERING_POINTS>
The number of WASM opcodes after which the indexer's event handler will stop execution.
[default: 30000000000]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ OPTIONS:
--max-body-size <MAX_BODY_SIZE>
Max body size for web requests. [default: 5242880]

--max-db-connections <MAX_DB_CONNECTIONS>
The maximum number of database connections. [default: 100]

--metrics
Use Prometheus metrics reporting.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ OPTIONS:
--max-body-size <MAX_BODY_SIZE>
Max body size for web server requests. [default: 5242880]

--max-db-connections <MAX_DB_CONNECTIONS>
The maximum number of database connections. [default: 100]

--metering-points <METERING_POINTS>
The number of WASM opcodes after which the indexer's event handler will stop execution.
[default: 30000000000]
Expand Down
Loading

0 comments on commit 58cdb47

Please sign in to comment.