-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use separate Tokio runtime for gossip persistence. #73
Use separate Tokio runtime for gossip persistence. #73
Conversation
LGTM, why is this draft? |
CI says "Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context." |
Because of that CI issue that only happens in tests, though it runs fine. Just trying to get my local tests to fail the same way to then test my fix. Shouldn't take long. |
def546e
to
bf9d941
Compare
src/persistence.rs
Outdated
let mut _db_schema = None; | ||
#[cfg(test)] | ||
{ | ||
_db_schema = Some(crate::tests::db_test_schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cant this be in the #[test]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean pass db_schema
as a parameter to persist_gossip
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, apologies, I missed the copy. Can we instead do this transparently as a part of the connection object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise LGTM.
src/persistence.rs
Outdated
@@ -162,6 +166,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger { | |||
let mut connections_set = connections_cache_ref.lock().await; | |||
connections_set.push(client); | |||
limiter_ref.add_permits(1); | |||
drop(connections_set); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we do this right after we build the client
? Otherwise we end up holding the lock for the duration of the insert, killing our parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I'll clean it up for real now. Thanks for the concept ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, minor followup first: I moved the connections_set
creation into a scope that is dropped prior to the db insertion operations. Does that seem sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, looks good now.
src/persistence.rs
Outdated
@@ -118,6 +122,19 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger { | |||
insert_limiter.acquire().await.unwrap().forget(); | |||
|
|||
let limiter_ref = Arc::clone(&insert_limiter); | |||
let client = { | |||
let connections_cache_ref = Arc::clone(&connections_cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably don't need the Arc now :)
src/persistence.rs
Outdated
@@ -162,6 +166,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger { | |||
let mut connections_set = connections_cache_ref.lock().await; | |||
connections_set.push(client); | |||
limiter_ref.add_permits(1); | |||
drop(connections_set); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, looks good now.
src/persistence.rs
Outdated
@@ -162,6 +169,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger { | |||
let mut connections_set = connections_cache_ref.lock().await; | |||
connections_set.push(client); | |||
limiter_ref.add_permits(1); | |||
drop(connections_set); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dropping a lock taken two lines up at the end of a scope, you can just drop it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about the femtoseconds, Matt.
2fab4ba
to
eac7646
Compare
src/persistence.rs
Outdated
@@ -118,6 +121,18 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger { | |||
insert_limiter.acquire().await.unwrap().forget(); | |||
|
|||
let limiter_ref = Arc::clone(&insert_limiter); | |||
let client = { | |||
let connections_cache_ref = &connections_cache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this.
414ec15
to
560dabd
Compare
560dabd
to
244ae25
Compare
244ae25
to
418d776
Compare
No description provided.