Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic syncing and concurrency #335

Merged
merged 17 commits into from
Nov 17, 2023
Merged

Topic syncing and concurrency #335

merged 17 commits into from
Nov 17, 2023

Conversation

richardhuaaa
Copy link
Contributor

@richardhuaaa richardhuaaa commented Nov 14, 2023

  1. Only pull down payloads we haven't already processed (via pull_from_topic)
  2. Do all payload processing within an atomic transaction that also updates the last_synced_payload_time (via process_for_topic)

These methods are agnostic to which topic, so can be used for both welcomes and group message processing, as well as any other case in the future. Unsure if they belong on Client or somewhere else.

Some missing pieces:

  1. More complicated concurrency test cases
  2. Sophisticated error handling (when an error happens, do we update last_synced_payload_time or do we not?)
  3. Will add a follow-up PR to improve the safety of borrowing the DB connection in a transaction

@richardhuaaa richardhuaaa changed the base branch from main to nm/handle-own-commits November 15, 2023 00:43
Base automatically changed from nm/handle-own-commits to main November 15, 2023 18:17
// TODO: We can handle errors in the transaction() function to make error handling
// cleaner. Retryable errors can possibly be part of their own enum
XmtpOpenMlsProvider::transaction(&mut self.store.conn()?, |provider| {
let is_updated = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping this in a block, because the borrow needs to be ended before it's borrowed again somewhere else (e.g. when OpenMLS writes to the key store), or else there will be a runtime error. Don't love this solution but it works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, will implement a WrappedConnection struct in the next PR that can solve this problem

@richardhuaaa richardhuaaa marked this pull request as ready for review November 16, 2023 23:27
@richardhuaaa richardhuaaa requested a review from a team November 16, 2023 23:27
Copy link
Contributor

@insipx insipx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good

@@ -387,38 +377,36 @@ where
}
}

pub fn process_messages(&self, envelopes: Vec<Envelope>) -> Result<(), GroupError> {
pub(crate) fn process_messages(&self, envelopes: Vec<Envelope>) -> Result<(), GroupError> {
let mut conn = self.client.store.conn()?;
let provider = self.client.mls_provider(&mut conn);
let mut openmls_group = self.load_mls_group(&provider)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could merge this with the two lines above now that we don't need access to the provider or conn below this point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by merging - like put it all in the same line? I think that just makes it less readable :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is less readable, but it does return the connection to the pool sooner.

Not a big deal one way or the other.


let envelopes = self
.api_client
.read_topic(topic, last_synced_timestamp_ns as u64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The network query API is "greater than or equals", so I think we actually want this to be (last_synced_timestamp_ns + 1) to avoid getting duplicate messages we will end up skipping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this, but wouldn't it make more sense to have the network query API just be "greater than"? That seems like a much cleaner system overall

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely wish I had done it that way when I first wrote the query API last year. The rationale was compatibility with Waku, which worked that way. Since we are using the generic query interface, we'd have to make the change for everyone (maybe with a flag). But yes, that would be better.

Copy link
Contributor

@neekolas neekolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using this branch in the demo this morning and everything was working.

@richardhuaaa richardhuaaa enabled auto-merge (squash) November 17, 2023 20:01
@richardhuaaa richardhuaaa merged commit 6c05ffa into main Nov 17, 2023
4 checks passed
@richardhuaaa richardhuaaa deleted the rich/topic-state branch November 17, 2023 20:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants