Skip to content

Commit

Permalink
Start the sync worker in the builder (#1365)
Browse files Browse the repository at this point in the history
* Start the sync worker in the builder

* wait for identity

* tweak test

* keep life simple rn

* helpers

* stop, web time

* test fix

* Some WASM help from Andrew

* test fix

* Add sync and send constraints

* lint

* Add send, sync to wasm sync worker

* propagate sync, send a bit more

* propagate

* propagate

* propagate

* wait for identity to be ready

* cleanup

* cleanup
  • Loading branch information
codabrink authored Dec 5, 2024
1 parent 07c8470 commit 5849947
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 84 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 28 additions & 25 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ members = [
"bindings_node",
"bindings_ffi",
"xtask",
"xmtp_debug"
"xmtp_debug",
]

# Make the feature resolver explicit.
# See https://doc.rust-lang.org/edition-guide/rust-2021/default-cargo-resolver.html#details
resolver = "2"

[workspace.package]
version = "0.1.0"
license = "MIT"
version = "0.1.0"

[workspace.dependencies]
anyhow = "1.0"
async-stream = "0.3"
async-trait = "0.1.77"
base64 = "0.22"
chrono = "0.4.38"
wasm-timer = "0.2"
ctor = "0.2"
ed25519 = "2.2.3"
ed25519-dalek = { version = "2.1.1", features = ["zeroize"] }
Expand All @@ -48,8 +48,6 @@ pbjson-types = "0.7.0"
prost = "^0.13"
prost-types = "^0.13"
rand = "0.8.5"
uuid = "1.10"
base64 = "0.22"
regex = "1.10.4"
rustc-hex = "2.1.0"
serde = { version = "1.0", default-features = false }
Expand All @@ -59,6 +57,9 @@ sha3 = "0.10.8"
thiserror = "2.0"
tls_codec = "0.4.1"
tokio = { version = "1.35.1", default-features = false }
uuid = "1.10"
wasm-timer = "0.2"
web-time = "1.1"
# Changing this version and rustls may potentially break the android build. Use Caution.
# Test with Android and Swift first.
# Its probably preferable to one day use https://github.com/rustls/rustls-platform-verifier
Expand All @@ -67,34 +68,36 @@ tokio = { version = "1.35.1", default-features = false }
# - https://github.com/seanmonstar/reqwest/issues/2159
# - https://github.com/hyperium/tonic/pull/1974
# - https://github.com/rustls/rustls-platform-verifier/issues/58
tonic = { version = "0.12", default-features = false }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false }
bincode = "1.3"
console_error_panic_hook = "0.1"
const_format = "0.2"
diesel = { version = "2.2", default-features = false }
diesel-wasm-sqlite = "0.0.1"
diesel_migrations = { version = "2.2", default-features = false }
parking_lot = "0.12.3"
wasm-bindgen-futures = "0.4"
wasm-bindgen = "=0.2.97"
wasm-bindgen-test = "0.3.47"
dyn-clone = "1"
fdlimit = "0.3"
gloo-timers = "0.3"
web-sys = "0.3"
js-sys = "0.3"
openssl-sys = { version = "0.9", features = ["vendored"] }
libsqlite3-sys = { version = "0.29", features = [
"bundled-sqlcipher-vendored-openssl",
] }
openssl = { version = "0.10", features = ["vendored"] }
libsqlite3-sys = { version = "0.29", features = ["bundled-sqlcipher-vendored-openssl" ] }
dyn-clone = "1"
openssl-sys = { version = "0.9", features = ["vendored"] }
parking_lot = "0.12.3"
tonic = { version = "0.12", default-features = false }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false }
trait-variant = "0.1.2"
url = "2.5.0"
wasm-bindgen = "=0.2.97"
wasm-bindgen-futures = "0.4"
wasm-bindgen-test = "0.3.47"
web-sys = "0.3"
zeroize = "1.8"
bincode = "1.3"
console_error_panic_hook = "0.1"
fdlimit = "0.3"
const_format = "0.2"

# Internal Crate Dependencies
xmtp_cryptography = { path = "xmtp_cryptography" }
xmtp_api_grpc = { path = "xmtp_api_grpc" }
xmtp_cryptography = { path = "xmtp_cryptography" }
xmtp_id = { path = "xmtp_id" }
xmtp_mls = { path = "xmtp_mls" }
xmtp_proto = { path = "xmtp_proto" }
Expand Down Expand Up @@ -124,10 +127,10 @@ strip = "symbols"
# NOTE: The release profile reduces bundle size from 230M to 41M - may have performance impliciations
# https://stackoverflow.com/a/54842093
[profile.release.package.xmtpv3]
codegen-units = 1 # Reduce number of codegen units to increase optimizations
inherits = "release-with-lto"
codegen-units = 1 # Reduce number of codegen units to increase optimizations
opt-level = 'z' # Optimize for size + loop vectorization
strip = true # Strip symbols from binary*
opt-level = 'z' # Optimize for size + loop vectorization
strip = true # Strip symbols from binary*

[profile.release.package.bindings_wasm]
inherits = "release-with-lto"
Expand All @@ -138,7 +141,7 @@ opt-level = "s"
# are made public for third-party dependencies: https://github.com/diesel-rs/diesel/pull/4236
# (cfg-specific patche support does not exist)
[patch.crates-io]
diesel-wasm-sqlite = { git = "https://github.com/xmtp/diesel-wasm-sqlite", branch = "main" }
diesel = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
diesel-wasm-sqlite = { git = "https://github.com/xmtp/diesel-wasm-sqlite", branch = "main" }
diesel_derives = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
diesel_migrations = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
49 changes: 33 additions & 16 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,18 +440,9 @@ impl FfiXmtpClient {
.register_identity(signature_request.clone())
.await?;

self.maybe_start_sync_worker();

Ok(())
}

/// Starts the sync worker if the history sync url is present.
fn maybe_start_sync_worker(&self) {
if self.inner_client.history_sync_url().is_some() {
self.inner_client.start_sync_worker();
}
}

pub async fn send_sync_request(&self, kind: FfiDeviceSyncKind) -> Result<(), GenericError> {
let provider = self.inner_client.mls_provider()?;
self.inner_client
Expand Down Expand Up @@ -1811,6 +1802,7 @@ mod tests {
unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature},
};
use xmtp_mls::{
api::test_utils::{wait_for_eq, wait_for_ok},
groups::{scoped_client::LocalScopedGroupClient, GroupError},
storage::EncryptionKey,
InboxOwner,
Expand Down Expand Up @@ -2585,7 +2577,7 @@ mod tests {
}
bo.conversations().sync().await.unwrap();
let num_groups_synced_1: u32 = bo.conversations().sync_all_conversations().await.unwrap();
assert!(num_groups_synced_1 == 30);
assert_eq!(num_groups_synced_1, 30);

// Remove bo from all groups and sync
for group in alix
Expand All @@ -2602,11 +2594,11 @@ mod tests {

// First sync after removal needs to process all groups and set them to inactive
let num_groups_synced_2: u32 = bo.conversations().sync_all_conversations().await.unwrap();
assert!(num_groups_synced_2 == 30);
assert_eq!(num_groups_synced_2, 30);

// Second sync after removal will not process inactive groups
let num_groups_synced_3: u32 = bo.conversations().sync_all_conversations().await.unwrap();
assert!(num_groups_synced_3 == 0);
assert_eq!(num_groups_synced_3, 0);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
Expand Down Expand Up @@ -4061,15 +4053,40 @@ mod tests {
async fn test_stream_consent() {
let wallet = generate_local_wallet();
let alix_a = new_test_client_with_wallet_and_history(wallet.clone()).await;
let alix_a_conn = alix_a.inner_client.store().conn().unwrap();
// wait for alix_a's sync worker to create a sync group
let _ = wait_for_ok(|| async { alix_a.inner_client.get_sync_group(&alix_a_conn) }).await;

let alix_b = new_test_client_with_wallet_and_history(wallet).await;
wait_for_eq(|| async { alix_b.inner_client.identity().is_ready() }, true).await;

let bo = new_test_client_with_history().await;

// have alix_a pull down the new sync group created by alix_b
assert!(alix_a.conversations().sync().await.is_ok());
// wait for the first installation to get invited to the new sync group
wait_for_eq(
|| async {
assert!(alix_a.conversations().sync().await.is_ok());
alix_a
.inner_client
.store()
.conn()
.unwrap()
.all_sync_groups()
.unwrap()
.len()
},
2,
)
.await;

// check that they have the same sync group
let sync_group_a = alix_a.conversations().get_sync_group().unwrap();
let sync_group_b = alix_b.conversations().get_sync_group().unwrap();
let sync_group_a = wait_for_ok(|| async { alix_a.conversations().get_sync_group() })
.await
.unwrap();
let sync_group_b = wait_for_ok(|| async { alix_b.conversations().get_sync_group() })
.await
.unwrap();

assert_eq!(sync_group_a.id(), sync_group_b.id());

// create a stream from both installations
Expand Down
19 changes: 11 additions & 8 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bench = [
"criterion",
"dep:fdlimit",
"dep:ethers",
"dep:const_format"
"dep:const_format",
]
default = ["grpc-api"]
grpc-api = ["dep:xmtp_api_grpc"]
Expand All @@ -38,7 +38,8 @@ test-utils = [
"xmtp_proto/test-utils",
"xmtp_api_http/test-utils",
"xmtp_api_grpc/test-utils",
"dep:const_format"
"dep:const_format",
"mockall",
]
update-schema = ["toml"]

Expand Down Expand Up @@ -66,6 +67,7 @@ tokio-stream = { version = "0.1", default-features = false, features = [
tracing.workspace = true
trait-variant.workspace = true
wasm-timer.workspace = true
web-time.workspace = true
zeroize.workspace = true

# XMTP/Local
Expand All @@ -75,12 +77,12 @@ xmtp_proto = { workspace = true, features = ["convert"] }

# Optional/Features
console_error_panic_hook = { workspace = true, optional = true }
const_format = { workspace = true, optional = true }
ethers = { workspace = true, features = ["openssl"], optional = true }
fdlimit = { workspace = true, optional = true }
toml = { version = "0.8.4", optional = true }
tracing-wasm = { version = "0.2", optional = true }
xmtp_api_http = { path = "../xmtp_api_http", optional = true }
fdlimit = { workspace = true, optional = true }
ethers = { workspace = true, features = ["openssl"], optional = true }
const_format = { workspace = true, optional = true }

# Test/Bench Utils
anyhow = { workspace = true, optional = true }
Expand All @@ -89,14 +91,15 @@ criterion = { version = "0.5", features = [
"async_tokio",
], optional = true }
indicatif = { version = "0.17", optional = true }
mockall = { version = "0.13.1", optional = true }
once_cell = { version = "1.19", optional = true }
tracing-flame = { version = "0.2", optional = true }
tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
"registry"
"registry",
], optional = true }


Expand Down Expand Up @@ -137,11 +140,11 @@ web-sys.workspace = true

[dev-dependencies]
anyhow.workspace = true
const_format.workspace = true
mockall = "0.13.1"
openmls_basic_credential.workspace = true
xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
xmtp_proto = { workspace = true, features = ["test-utils"] }
const_format.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
ctor.workspace = true
Expand Down Expand Up @@ -192,8 +195,8 @@ harness = false
name = "identity"
required-features = ["bench"]


#[[bench]]
#harness = false
#name = "sync"
#required-features = ["bench"]

2 changes: 1 addition & 1 deletion xmtp_mls/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod identity;
pub mod mls;
#[cfg(test)]
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

use std::sync::Arc;
Expand Down
55 changes: 55 additions & 0 deletions xmtp_mls/src/api/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::{future::Future, time::Duration};
use web_time::Instant;

use mockall::mock;
use xmtp_proto::{
api_client::{ClientWithMetadata, XmtpIdentityClient, XmtpMlsClient, XmtpMlsStreams},
Expand Down Expand Up @@ -176,3 +179,55 @@ mod wasm {
}
}
}

pub async fn wait_for_some<F, Fut, T>(f: F) -> Option<T>
where
F: Fn() -> Fut,
Fut: Future<Output = Option<T>>,
{
let start = Instant::now();
while start.elapsed() < Duration::from_secs(3) {
let result = f().await;
if result.is_some() {
return result;
}
crate::sleep(Duration::from_millis(100)).await;
}
None
}

pub async fn wait_for_ok<F, Fut, T, E>(f: F) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
let start = Instant::now();
let mut result = f().await;
while start.elapsed() < Duration::from_secs(3) {
if result.is_ok() {
return result;
}
crate::sleep(Duration::from_millis(100)).await;
result = f().await;
}
result
}

pub async fn wait_for_eq<F, Fut, T>(f: F, expected: T)
where
F: Fn() -> Fut,
Fut: Future<Output = T>,
T: std::fmt::Debug + PartialEq,
{
let start = Instant::now();
let mut result = f().await;
while start.elapsed() < Duration::from_secs(3) {
if result == expected {
break;
}
crate::sleep(Duration::from_millis(100)).await;
result = f().await;
}

assert_eq!(expected, result);
}
Loading

0 comments on commit 5849947

Please sign in to comment.