Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

bugfix: set max_connections sqlx parameter #1508

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion packages/fuel-indexer-api-server/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ pub async fn exec(args: ApiServerArgs) -> anyhow::Result<()> {

let (tx, _) = channel::<ServiceRequest>(SERVICE_REQUEST_CHANNEL_SIZE);

let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?;
let pool = IndexerConnectionPool::connect(
&config.database.to_string(),
config.max_db_connections,
)
.await?;

if config.run_migrations {
let mut c = pool.acquire().await?;
Expand Down
51 changes: 51 additions & 0 deletions packages/fuel-indexer-benchmarks/src/bin/qa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ impl StatManager {
/ 1000.;
let avg_cpu =
runs.iter().map(|run| run.avg_cpu()).sum::<f64>() / runs.len() as f64;
let db_connections = runs.iter().map(|run| run.max_conn()).max().unwrap_or(0);
let max_db_connections = max_db_connections().unwrap_or(0);
let avg_blocks_per_sec =
runs.iter().map(|run| run.blocks_per_sec).sum::<f64>() / runs.len() as f64;
let index_size = runs.iter().map(|run| run.index_size).sum::<u64>() as f64
Expand Down Expand Up @@ -148,6 +150,7 @@ runtime: {runtime:.1} minutes
missing blocks: {missing_blocks}
avg memory: {avg_memory:.1}kB
avg cpu: {avg_cpu:.1}%
db connections: {db_connections}/{max_db_connections}
avg blocks/sec: {avg_blocks_per_sec:.1}
index size: {index_size:.1}kB per block

Expand All @@ -173,6 +176,7 @@ struct RunStat {
pub end_block: u32,
pub mem: Vec<u64>,
pub cpu: Vec<f64>,
pub conn: Vec<usize>,
pub blocks_per_sec: f64,
pub index_size: u64,
pub missing_blocks: u64,
Expand All @@ -187,6 +191,7 @@ impl RunStat {
end_block,
mem: Vec::new(),
cpu: Vec::new(),
conn: Vec::new(),
index_size: 0,
blocks_per_sec: 0.0,
missing_blocks: 0,
Expand All @@ -209,12 +214,17 @@ impl RunStat {
};
self.mem.push(mem);
self.cpu.push(record_cpu_usage());
self.conn.push(record_active_connections().unwrap_or(0));
}

fn avg_mem(&self) -> u64 {
self.mem.iter().sum::<u64>() / self.mem.len() as u64
}

fn max_conn(&self) -> usize {
*self.conn.iter().max().unwrap_or(&0)
}

fn stdv_mem(&self) -> f64 {
let avg = self.avg_mem() as f64;
let sum = self
Expand Down Expand Up @@ -321,6 +331,8 @@ WHERE schema_name = 'fuellabs_explorer';
let stdv_cpu = self.stdv_cpu();
let avg_cpu = self.avg_cpu();
let stdv_mem = self.stdv_mem() / 1000.;
let db_connections = self.conn.iter().max().unwrap_or(&0usize);
let max_db_connections = max_db_connections().unwrap_or(0);
let runtime = *runtime as f64 / 60.;
let block_size = self.index_size as f64 / 1000.;

Expand All @@ -334,6 +346,7 @@ run: {id}
stdv memory: {stdv_mem:.1}kB
avg cpu: {avg_cpu:.1}%
stdv cpu: {stdv_cpu:.1}%
db connections: {db_connections}/{max_db_connections}
missing blocks: {missing_blocks}
blocks/sec: {blocks_per_sec:.1}
index size: {block_size}kB per block"#
Expand Down Expand Up @@ -505,6 +518,44 @@ fn record_mem_usage() -> Result<String, std::io::Error> {
.read()
}

fn record_active_connections() -> Result<usize, std::io::Error> {
let proc = Command::new("psql")
.arg("-U")
.arg("postgres")
.arg("-c")
.arg("SELECT count(*) used FROM pg_stat_activity")
.arg("--no-align")
.arg("--tuples-only")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let output = proc.wait_with_output().unwrap();
let output = String::from_utf8(output.stdout).unwrap();
let output = output.trim();
let output = output.parse::<usize>().unwrap_or(0);
Ok(output)
}

fn max_db_connections() -> Result<usize, std::io::Error> {
let proc = Command::new("psql")
.arg("-U")
.arg("postgres")
.arg("-c")
.arg("SELECT setting::int max_conn FROM pg_settings WHERE name=$$max_connections$$")
.arg("--no-align")
.arg("--tuples-only")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let output = proc.wait_with_output().unwrap();
let output = String::from_utf8(output.stdout).unwrap();
let output = output.trim();
let output = output.parse::<usize>().unwrap_or(0);
Ok(output)
}

#[tokio::main]
async fn main() {
let opts = Args::from_args();
Expand Down
1 change: 1 addition & 0 deletions packages/fuel-indexer-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ fuel-indexer-lib = { workspace = true }
fuel-indexer-postgres = { workspace = true }
sqlx = { version = "0.6" }
thiserror = { workspace = true }
tracing = { workspace = true }
url = "2.2"
25 changes: 23 additions & 2 deletions packages/fuel-indexer-database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl IndexerConnectionPool {

pub async fn connect(
database_url: &str,
max_db_connections: u32,
) -> Result<IndexerConnectionPool, IndexerDatabaseError> {
let url = url::Url::parse(database_url);
if url.is_err() {
Expand All @@ -83,11 +84,18 @@ impl IndexerConnectionPool {
opts.disable_statement_logging();

let pool = attempt_database_connection(|| {
sqlx::postgres::PgPoolOptions::new().connect_with(opts.clone())
sqlx::postgres::PgPoolOptions::new()
.max_connections(max_db_connections)
.connect_with(opts.clone())
})
.await;

Ok(IndexerConnectionPool::Postgres(pool))
let result = IndexerConnectionPool::Postgres(pool);
let backend_max_connections = result.max_connections().await?;
if backend_max_connections < max_db_connections {
tracing::warn!("Indexer --max-db-connections `{max_db_connections}` exceeds `{backend_max_connections}` value set by db backend")
};
Ok(result)
}
err => Err(IndexerDatabaseError::BackendNotSupported(err.into())),
}
Expand Down Expand Up @@ -116,4 +124,17 @@ impl IndexerConnectionPool {
}
}
}

pub async fn max_connections(&self) -> sqlx::Result<u32> {
match self {
IndexerConnectionPool::Postgres(pool) => {
let max_connections: i32 = sqlx::query_scalar(
"SELECT setting::int FROM pg_settings WHERE name = 'max_connections'",
)
.fetch_one(pool)
.await?;
Ok(max_connections as u32)
}
}
}
}
8 changes: 8 additions & 0 deletions packages/fuel-indexer-lib/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct IndexerArgs {
#[clap(long, help = "Database type.", default_value = defaults::DATABASE, value_parser(["postgres"]))]
pub database: String,

/// The maximum number of database connections.
#[clap(long, help = "The maximum number of database connections.", default_value_t = defaults::MAX_DB_CONNECTIONS)]
pub max_db_connections: u32,

/// Max body size for web server requests.
#[clap(long, help = "Max body size for web server requests.", default_value_t = defaults::MAX_BODY_SIZE )]
pub max_body_size: usize,
Expand Down Expand Up @@ -262,6 +266,10 @@ pub struct ApiServerArgs {
#[clap(long, help = "Database type.", default_value = defaults::DATABASE, value_parser(["postgres"]))]
pub database: String,

/// The maximum number of database connections.
#[clap(long, help = "The maximum number of database connections.", default_value_t = defaults::MAX_DB_CONNECTIONS)]
pub max_db_connections: u32,

/// Max body size for web server requests.
#[clap(long, help = "Max body size for web requests.", default_value_t = defaults::MAX_BODY_SIZE )]
pub max_body_size: usize,
Expand Down
Loading