Skip to content

Commit

Permalink
Simplify client creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
arik-so committed Mar 21, 2024
1 parent 81cb3b3 commit cd8767f
Showing 1 changed file with 16 additions and 24 deletions.
40 changes: 16 additions & 24 deletions src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::ops::Deref;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::log_info;
Expand Down Expand Up @@ -38,10 +37,10 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
}

pub(crate) async fn persist_gossip(&mut self) {
let mut client = crate::connect_to_db().await;
{ // initialize the database
// this client instance is only used once
let mut client = crate::connect_to_db().await;

{
// initialize the database
let initialization = client
.execute(config::db_config_table_creation_query(), &[])
.await;
Expand Down Expand Up @@ -122,6 +121,18 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
insert_limiter.acquire().await.unwrap().forget();

let limiter_ref = Arc::clone(&insert_limiter);
let client = {
let connections_cache_ref = Arc::clone(&connections_cache);

let mut connections_set = connections_cache_ref.lock().await;
let client = if connections_set.is_empty() {
crate::connect_to_db().await
} else {
connections_set.pop().unwrap()
};
client
};

let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
Expand All @@ -132,16 +143,6 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
announcement.write(&mut announcement_signed).unwrap();

let _task = self.tokio_runtime.spawn(async move {
let client;
{
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
} else {
client = connections_set.pop().unwrap();
}
}
if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
Expand All @@ -166,6 +167,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut connections_set = connections_cache_ref.lock().await;
connections_set.push(client);
limiter_ref.add_permits(1);
drop(connections_set);
});
#[cfg(test)]
tasks_spawned.push(_task);
Expand Down Expand Up @@ -224,16 +226,6 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;

let _task = self.tokio_runtime.spawn(async move {
let client;
{
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
} else {
client = connections_set.pop().unwrap();
}
}
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
Expand Down

0 comments on commit cd8767f

Please sign in to comment.