Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Key Update Before Every New GroupIntent #658

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8e87ba7
queue_key_update
Apr 15, 2024
9df8308
lint
Apr 15, 2024
60e8adc
lint
Apr 15, 2024
cb88c9e
merge main
Apr 15, 2024
3438591
queue key update test
Apr 16, 2024
0e9d812
key-update
Apr 22, 2024
1624a05
Fix paths in update-schema script
richardhuaaa Apr 22, 2024
7d0dade
merge master and add rotated_at_ns item
Apr 30, 2024
e62cdfc
add get_rotated_at_ns
Apr 30, 2024
5f35add
fetch rotated_at_ns
May 1, 2024
928bca4
Merge remote-tracking branch 'origin' into dz/key-update
May 1, 2024
b319f56
add pre_intent_hook
May 6, 2024
ed6e1dd
Revert unneeded fix, and specify the correct directory in the README …
richardhuaaa May 13, 2024
cd28efd
apply pre_intent_hook
May 16, 2024
c160c14
Merge remote-tracking branch 'origin' into dz/key-update
May 16, 2024
e1b5f56
merge master and resolve comments
May 16, 2024
c6ed2c0
fix tests
May 17, 2024
79d714e
test 1
May 30, 2024
4a317b4
merge master
May 31, 2024
ceb8d34
modify test
May 31, 2024
ecc3dc0
merge main
Jun 20, 2024
48ad52a
modify
Jun 20, 2024
059df1c
solve db connection and update
Jun 21, 2024
e8a3308
more key updates and test updates
Jun 24, 2024
a4482c6
add
Jun 24, 2024
dc2ba36
test
Jun 28, 2024
b4d676e
finish key update test
Jul 2, 2024
979dcea
Merge remote-tracking branch 'origin/main' into dz/key-update
richardhuaaa Oct 1, 2024
41a7618
Small fixes
richardhuaaa Oct 1, 2024
3d143eb
Fix group initialization error - no need to rotate keys if you are th…
richardhuaaa Oct 1, 2024
6377d85
Undo unit test changes
richardhuaaa Oct 1, 2024
71478f1
Make it impossible to miss a pre_intent_hook, add missing locations
richardhuaaa Oct 2, 2024
bb75ed4
Reduce duplication of maybe_update_installations
richardhuaaa Oct 2, 2024
8750c10
Progress on key rotation for optimistic message sends
richardhuaaa Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xmtp_mls/migrations/2024-04-22-061128_key_update/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
2 changes: 2 additions & 0 deletions xmtp_mls/migrations/2024-04-22-061128_key_update/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE groups
ADD COLUMN rotated_at_ns BIGINT NOT NULL DEFAULT 0
2 changes: 2 additions & 0 deletions xmtp_mls/migrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
cargo install diesel_cli --no-default-features --features sqlite
```

### Change directory to libxmtp/xmtp_mls/

### Create your migration SQL

In this example the migration is called `create_key_store`:
Expand Down
Binary file not shown.
Empty file.
104 changes: 98 additions & 6 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,7 @@ impl MlsGroup {
{
let conn = self.context.store.conn()?;

let update_interval = Some(5_000_000); // 5 seconds in nanoseconds
self.maybe_update_installations(conn.clone(), update_interval, client)
.await?;
self.pre_intent_hook(client).await?;

let now = now_ns();
let plain_envelope = Self::into_envelope(message, &now.to_string());
Expand All @@ -400,6 +398,7 @@ impl MlsGroup {
.map_err(GroupError::EncodeError)?;

let intent_data: Vec<u8> = SendMessageIntentData::new(encoded_envelope).into();

let intent =
NewGroupIntent::new(IntentKind::SendMessage, self.group_id.clone(), intent_data);
intent.store(&conn)?;
Expand Down Expand Up @@ -485,6 +484,7 @@ impl MlsGroup {
if inbox_id_map.len() != account_addresses.len() {
let found_addresses: HashSet<&String> = inbox_id_map.keys().collect();
let to_add_hashset = HashSet::from_iter(account_addresses.iter());
self.pre_intent_hook(client).await?;
richardhuaaa marked this conversation as resolved.
Show resolved Hide resolved
let missing_addresses = found_addresses.difference(&to_add_hashset);
return Err(GroupError::AddressNotFound(
missing_addresses.into_iter().cloned().cloned().collect(),
Expand All @@ -495,6 +495,7 @@ impl MlsGroup {
.await
}

// Before calling this function, please verify pre_intent_hook has been called.
pub async fn add_members_by_inbox_id<ApiClient: XmtpApi>(
&self,
client: &Client<ApiClient>,
Expand Down Expand Up @@ -569,6 +570,9 @@ impl MlsGroup {
let conn = self.context.store.conn()?;
let intent_data: Vec<u8> =
UpdateMetadataIntentData::new_update_group_name(group_name).into();

self.pre_intent_hook(client).await?;

let intent = conn.insert_group_intent(NewGroupIntent::new(
IntentKind::MetadataUpdate,
self.group_id.clone(),
Expand Down Expand Up @@ -630,6 +634,7 @@ impl MlsGroup {
};
let intent_data: Vec<u8> =
UpdateAdminListIntentData::new(intent_action_type, inbox_id).into();
self.pre_intent_hook(client).await?;
let intent = conn.insert_group_intent(NewGroupIntent::new(
IntentKind::UpdateAdminList,
self.group_id.clone(),
Expand Down Expand Up @@ -658,17 +663,39 @@ impl MlsGroup {
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;

let intent = NewGroupIntent::new(IntentKind::KeyUpdate, self.group_id.clone(), vec![]);
intent.store(&conn)?;

self.sync_with_conn(conn, client).await
}

/// Checking the last key rotation time before rotating the key.
pub async fn pre_intent_hook<ApiClient>(
&self,
client: &Client<ApiClient>,
) -> Result<(), GroupError>
where
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;
let last_rotated_time = conn.get_rotated_time_checked(self.group_id.clone())?;

if last_rotated_time == 0 {
self.key_update(client).await?;
conn.update_rotated_time_checked(self.group_id.clone())?;
}
let update_interval = Some(5_000_000);
self.maybe_update_installations(conn.clone(), update_interval, client)
.await?;

self.sync_with_conn(conn, client).await
}

pub fn is_active(&self) -> Result<bool, GroupError> {
let conn = self.context.store.conn()?;
let provider = XmtpOpenMlsProvider::new(conn);
let mls_group = self.load_mls_group(&provider)?;

Ok(mls_group.is_active())
}

Expand Down Expand Up @@ -915,6 +942,12 @@ fn build_group_join_config() -> MlsGroupJoinConfig {
#[cfg(test)]
mod tests {
use openmls::prelude::{tls_codec::Serialize, Member, MlsGroup as OpenMlsGroup};
use openmls::prelude::tls_codec::Deserialize;
use openmls::prelude::MlsMessageIn;
use openmls::prelude::MlsMessageBodyIn;
use openmls::prelude::Proposal;
use openmls::prelude::ProcessedMessageContent;
use crate::groups::GroupMessageVersion;
use prost::Message;
use tracing_test::traced_test;
use xmtp_cryptography::utils::generate_local_wallet;
Expand Down Expand Up @@ -1312,7 +1345,7 @@ mod tests {
.await
.expect("read topic");

assert_eq!(messages.len(), 2);
richardhuaaa marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(messages.len(), 3);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand All @@ -1333,7 +1366,7 @@ mod tests {
.query_group_messages(group.group_id.clone(), None)
.await
.unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages.len(), 3);

let conn = &client.context.store.conn().unwrap();
let provider = super::XmtpOpenMlsProvider::new(conn.clone());
Expand All @@ -1356,6 +1389,65 @@ mod tests {
assert_eq!(bola_messages.len(), 1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_pre_intent_hook() {
let client_a = ClientBuilder::new_test_client(&generate_local_wallet()).await;
let client_b = ClientBuilder::new_test_client(&generate_local_wallet()).await;

// client A makes a group with client B.
let group = client_a.create_group(None).expect("create group");
group
.add_members_by_inbox_id(&client_a, vec![client_b.inbox_id()])
.await
.unwrap();

// client B creates it from welcome.
let client_b_group = receive_group_invite(&client_b).await;
client_b_group.sync(&client_b).await.unwrap();

// verify no new payloads on client A.
let mut messages = client_a.api_client.query_group_messages(group.group_id.clone(), None).await.unwrap();
assert_eq!(messages.len(), 0);

// call pre_intent_hook on client B.
group.pre_intent_hook(&client_b).await.unwrap();

// Verify client A receives a key rotation payload
messages = client_a.api_client.query_group_messages(group.group_id.clone(), None).await.unwrap();
assert_eq!(messages.len(), 1);

let first_message = &messages[0];

let msgv1 = match &first_message.version {
Some(GroupMessageVersion::V1(value)) => value,
_ => panic!("error msgv1"),
};

let mls_message_in = MlsMessageIn::tls_deserialize_exact(&msgv1.data).unwrap();
let deserialized_message = match mls_message_in.extract() {
MlsMessageBodyIn::PrivateMessage(deserialized_message) => deserialized_message,
_ => panic!("error decentralized message"),
};

let conn = &client_a.context.store.conn().unwrap();
let provider = super::XmtpOpenMlsProvider::new(conn.clone());
let mut openmls_group = group.load_mls_group(provider.clone()).unwrap();
let openmls_group_ref: &mut OpenMlsGroup = &mut openmls_group;
let decrypted_message = openmls_group_ref.process_message(&provider, deserialized_message).unwrap();

let staged_commit = match decrypted_message.into_content(){
ProcessedMessageContent::StagedCommitMessage(staged_commit) => *staged_commit,
_ => panic!("error staged_commit"),
};

for proposal in staged_commit.queued_proposals() {
match proposal.proposal() {
Proposal::Update(_) => {},
_ => panic!("error proposal"),
}
};
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_post_commit() {
let client = ClientBuilder::new_test_client(&generate_local_wallet()).await;
Expand Down
27 changes: 27 additions & 0 deletions xmtp_mls/src/storage/encrypted_store/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ pub struct StoredGroup {
pub installations_last_checked: i64,
/// Enum, [`Purpose`] signifies the group purpose which extends to who can access it.
pub purpose: Purpose,

/// The inbox_id of who added the user to a group.
pub added_by_inbox_id: String,
pub rotated_at_ns: i64,
}

impl_fetch!(StoredGroup, groups, Vec<u8>);
Expand All @@ -57,6 +59,7 @@ impl StoredGroup {
installations_last_checked: 0,
purpose: Purpose::Conversation,
added_by_inbox_id,
rotated_at_ns: 0,
}
}

Expand All @@ -74,6 +77,7 @@ impl StoredGroup {
installations_last_checked: 0,
purpose: Purpose::Sync,
added_by_inbox_id: "".into(),
rotated_at_ns: 0,
}
}
}
Expand Down Expand Up @@ -157,6 +161,29 @@ impl DbConnection {
last_ts.ok_or(StorageError::NotFound)
}

pub fn get_rotated_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
let last_ts = self.raw_query(|conn| {
let ts = dsl::groups
.find(&group_id)
.select(dsl::rotated_at_ns)
.first(conn)
.optional()?;
Ok(ts)
})?;

last_ts.ok_or(StorageError::NotFound)
}

/// Update the 'rotated time' once we checked in pre_intent_hook
pub fn update_rotated_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
self.raw_query(|conn| {
let now = crate::utils::time::now_ns();
diesel::update(dsl::groups.find(&group_id)).set(dsl::rotated_at_ns.eq(now)).execute(conn)
})?;

Ok(())
}

/// Updates the 'last time checked' we checked for new installations.
pub fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
self.raw_query(|conn| {
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/storage/encrypted_store/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ diesel::table! {
installations_last_checked -> BigInt,
purpose -> Integer,
added_by_inbox_id -> Text,
rotated_at_ns -> BigInt,
}
}

Expand Down
Loading