Skip to content

Commit

Permalink
Read welcomes from the network (#332)
Browse files Browse the repository at this point in the history
* Implement message decryption and storage - still need to tidy and write
tests

* Remove unnecessary apiclientwrapper methods

* Add TODOs

* Split into smaller methods

* Handle errors

* Fix test and lint

* Add test and tidy

* Tweak test

* Remove unnecessary change

* Use partialeq

* Use test-log

* Remove timestamp validation comment

* Add error instead of panicking on unexpected message type

* Revert "Use test-log"

This reverts commit 8829ac8.

* More lints

* Welcome reading

* Tidy up code

* Clean up implementation

* Update devcontainer.json

* Remove unused error type

* Use host network in container

---------

Co-authored-by: Richard Hua <[email protected]>
  • Loading branch information
neekolas and richardhuaaa authored Nov 14, 2023
1 parent a9dd41d commit 09a2442
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 20 deletions.
40 changes: 23 additions & 17 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/docker-existing-dockerfile
{
"name": "Existing Dockerfile",
"build": {
// Sets the run context to one level up instead of the .devcontainer folder.
"context": "..",
// Update the 'dockerFile' property if you aren't using the standard 'Dockerfile' filename.
"dockerfile": "../Dockerfile"
}
"name": "Existing Dockerfile",
"build": {
// Sets the run context to one level up instead of the .devcontainer folder.
"context": "..",
// Update the 'dockerFile' property if you aren't using the standard 'Dockerfile' filename.
"dockerfile": "../Dockerfile"
},

// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},
// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
"forwardPorts": [],
"customizations": {
"vscode": {
"extensions": ["tamasfe.even-better-toml", "rust-lang.rust-analyzer"]
}
},
"runArgs": ["--network=host"]

// Uncomment the next line to run commands after the container is created.
// "postCreateCommand": "cat /etc/os-release",
// Uncomment the next line to run commands after the container is created.
// "postCreateCommand": "cat /etc/os-release",

// Configure tool-specific properties.
// "customizations": {},
// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as an existing user other than the container default. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "devcontainer"
// Uncomment to connect as an existing user other than the container default. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "devcontainer"
}
96 changes: 94 additions & 2 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::HashSet;

use openmls::prelude::TlsSerializeTrait;
use openmls::{
framing::{MlsMessageIn, MlsMessageInBody},
messages::Welcome,
prelude::TlsSerializeTrait,
};
use thiserror::Error;
use tls_codec::Error as TlsSerializationError;
use tls_codec::{Deserialize, Error as TlsSerializationError};
use xmtp_proto::api_client::{XmtpApiClient, XmtpMlsClient};

use crate::{
Expand All @@ -11,6 +15,7 @@ use crate::{
identity::Identity,
storage::{group::GroupMembershipState, EncryptedMessageStore, StorageError},
types::Address,
utils::topic::get_welcome_topic,
verified_key_package::{KeyPackageVerificationError, VerifiedKeyPackage},
xmtp_openmls_provider::XmtpOpenMlsProvider,
};
Expand Down Expand Up @@ -39,6 +44,8 @@ pub enum ClientError {
Serialization(#[from] TlsSerializationError),
#[error("key package verification: {0}")]
KeyPackageVerification(#[from] KeyPackageVerificationError),
#[error("message processing: {0}")]
MessageProcessing(#[from] crate::groups::MessageProcessingError),
#[error("generic:{0}")]
Generic(String),
}
Expand Down Expand Up @@ -188,6 +195,43 @@ where
.collect::<Result<_, _>>()?)
}

// Download all unread welcome messages and convert to groups.
// Returns any new groups created in the operation
pub async fn sync_welcomes(&self) -> Result<Vec<MlsGroup<ApiClient>>, ClientError> {
let welcome_topic = get_welcome_topic(&self.installation_public_key());
let mut conn = self.store.conn()?;
let provider = self.mls_provider();
// TODO: Use the last_message_timestamp_ns field on the TopicRefreshState to only fetch new messages
// Waiting for more atomic update methods
let envelopes = self.api_client.read_topic(&welcome_topic, 0).await?;

let groups: Vec<MlsGroup<ApiClient>> = envelopes
.into_iter()
.filter_map(|envelope| {
// TODO: Wrap in a transaction
let welcome = match extract_welcome(&envelope.message) {
Ok(welcome) => welcome,
Err(err) => {
log::error!("failed to extract welcome: {}", err);
return None;
}
};

// TODO: Update last_message_timestamp_ns on success or non-retryable error
// TODO: Abort if error is retryable
match MlsGroup::create_from_welcome(self, &mut conn, &provider, welcome) {
Ok(mls_group) => Some(mls_group),
Err(err) => {
log::error!("failed to create group from welcome: {}", err);
None
}
}
})
.collect();

Ok(groups)
}

pub fn account_address(&self) -> Address {
self.identity.account_address.clone()
}
Expand All @@ -197,6 +241,17 @@ where
}
}

fn extract_welcome(welcome_bytes: &Vec<u8>) -> Result<Welcome, ClientError> {
// let welcome_proto = WelcomeMessageProto::decode(&mut welcome_bytes.as_slice())?;
let welcome = MlsMessageIn::tls_deserialize(&mut welcome_bytes.as_slice())?;
match welcome.extract() {
MlsMessageInBody::Welcome(welcome) => Ok(welcome),
_ => Err(ClientError::Generic(
"unexpected message type in welcome".to_string(),
)),
}
}

#[cfg(test)]
mod tests {
use xmtp_cryptography::utils::generate_local_wallet;
Expand Down Expand Up @@ -238,4 +293,41 @@ mod tests {
assert_eq!(groups[0].group_id, group_1.group_id);
assert_eq!(groups[1].group_id, group_2.group_id);
}

#[tokio::test]
async fn test_sync_welcomes() {
let alice = ClientBuilder::new_test_client(generate_local_wallet().into()).await;
alice.register_identity().await.unwrap();
let bob = ClientBuilder::new_test_client(generate_local_wallet().into()).await;
bob.register_identity().await.unwrap();

let conn = &mut alice.store.conn().unwrap();
let alice_bob_group = alice.create_group().unwrap();
alice_bob_group
.add_members_by_installation_id(vec![bob.installation_public_key()])
.await
.unwrap();

// Manually mark as committed
// TODO: Replace with working synchronization once we can add members end to end
let intents = alice
.store
.find_group_intents(conn, alice_bob_group.group_id.clone(), None, None)
.unwrap();
let intent = intents.first().unwrap();
// Set the intent to committed manually
alice
.store
.set_group_intent_committed(conn, intent.id)
.unwrap();

alice_bob_group.post_commit(conn).await.unwrap();

let bob_received_groups = bob.sync_welcomes().await.unwrap();
assert_eq!(bob_received_groups.len(), 1);
assert_eq!(
bob_received_groups.first().unwrap().group_id,
alice_bob_group.group_id
);
}
}
23 changes: 22 additions & 1 deletion xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use openmls::{
prelude::{
CredentialWithKey, CryptoConfig, GroupId, LeafNodeIndex, MlsGroup as OpenMlsGroup,
MlsGroupConfig, MlsMessageIn, MlsMessageInBody, PrivateMessageIn, ProcessedMessage,
ProcessedMessageContent, Sender, WireFormatPolicy,
ProcessedMessageContent, Sender, Welcome as MlsWelcome, WireFormatPolicy,
},
prelude_test::KeyPackage,
};
Expand Down Expand Up @@ -59,6 +59,8 @@ pub enum GroupError {
GroupCreate(#[from] openmls::prelude::NewGroupError<StorageError>),
#[error("self update: {0}")]
SelfUpdate(#[from] openmls::group::SelfUpdateError<StorageError>),
#[error("welcome error: {0}")]
WelcomeError(#[from] openmls::prelude::WelcomeError<StorageError>),
#[error("client: {0}")]
Client(#[from] ClientError),
#[error("receive errors: {0:?}")]
Expand Down Expand Up @@ -138,6 +140,24 @@ where
Ok(Self::new(client, group_id, stored_group.created_at_ns))
}

pub fn create_from_welcome(
client: &'c Client<ApiClient>,
conn: &mut DbConnection,
provider: &XmtpOpenMlsProvider,
welcome: MlsWelcome,
) -> Result<Self, GroupError> {
let mut mls_group =
OpenMlsGroup::new_from_welcome(provider, &build_group_config(), welcome, None)?;
mls_group.save(provider.key_store())?;

let group_id = mls_group.group_id().to_vec();
let stored_group =
StoredGroup::new(group_id.clone(), now_ns(), GroupMembershipState::Pending);
stored_group.store(conn)?;

Ok(Self::new(client, group_id, stored_group.created_at_ns))
}

pub fn find_messages(
&self,
kind: Option<GroupMessageKind>,
Expand Down Expand Up @@ -690,6 +710,7 @@ mod tests {
.unwrap();
let intent = intents.first().unwrap();
// Set the intent to committed manually
// TODO: Replace with working synchronization once we can add members end to end
client
.store
.set_group_intent_committed(conn, intent.id)
Expand Down

0 comments on commit 09a2442

Please sign in to comment.