Skip to content

Commit

Permalink
fix: add .subscribe / async support to wasm client
Browse files Browse the repository at this point in the history
  • Loading branch information
dmccartney committed Oct 12, 2023
1 parent 1b05872 commit 5c2178c
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 41 deletions.
4 changes: 2 additions & 2 deletions bindings_wasm/tests/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use prost::Message;
use wasm_bindgen::prelude::*;
use wasm_bindgen_test::*;
use xmtp_cryptography::signature::RecoverableSignature;
use xmtp_proto::api_client::XmtpApiClient;
use xmtp_proto::api_client::{XmtpApiClient, XmtpApiSubscription};
use xmtp_proto::xmtp::message_api::v1::{
BatchQueryRequest, Envelope, PublishRequest, QueryRequest,
BatchQueryRequest, Envelope, PublishRequest, QueryRequest, SubscribeRequest,
};

// Only run these tests in a browser.
Expand Down
4 changes: 3 additions & 1 deletion xmtp/src/mock_xmtp_api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use xmtp_proto::api_client::*;

pub struct MockXmtpApiSubscription {}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl XmtpApiSubscription for MockXmtpApiSubscription {
fn is_closed(&self) -> bool {
false
}

fn get_messages(&self) -> Vec<Envelope> {
async fn get_messages(&mut self) -> Vec<Envelope> {
vec![]
}

Expand Down
5 changes: 3 additions & 2 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,13 @@ impl Subscription {
}
}

impl XmtpApiSubscription for Subscription {
#[async_trait]
impl<'a> XmtpApiSubscription for Subscription {
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}

fn get_messages(&self) -> Vec<Envelope> {
async fn get_messages(&mut self) -> Vec<Envelope> {
let mut pending = self.pending.lock().unwrap();
let items = pending.drain(..).collect::<Vec<Envelope>>();
items
Expand Down
4 changes: 2 additions & 2 deletions xmtp_api_grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ mod tests {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Ensure that messages appear
let results = stream_handler.get_messages();
let results = stream_handler.get_messages().await;
println!("{}", results.len());
assert!(results.len() == 1);

// Ensure that the messages array has been cleared
let second_results = stream_handler.get_messages();
let second_results = stream_handler.get_messages().await;
assert!(second_results.is_empty());

// Ensure the is_closed status is propagated
Expand Down
50 changes: 50 additions & 0 deletions xmtp_api_grpc_gateway/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion xmtp_api_grpc_gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
async-trait = "0.1.68"
bytes = "1.4.0"
futures = "0.3"
futures-util = "0.3"
getrandom = { version = "0.2", features = ["js"] }
hex = "0.4"
js-sys = "0.3"
prost = { version = "0.11", features = ["prost-derive"] }
prost-types = "0.11"
reqwest = { version = "0.11.20", features = ["json"] }
reqwest = { version = "0.11.20", features = ["json", "stream"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0"
tokio = {version="1.28.1", features=["rt", "sync", "macros", "io-util"]}
tokio-util = {version="0.7.8", features=["codec", "rt", "io-util"]}
wasm-bindgen = "0.2.87"
wasm-bindgen-futures = "0.4.37"
xmtp_cryptography = { path = "../xmtp_cryptography", features = ["ws"] }
Expand Down
Loading

0 comments on commit 5c2178c

Please sign in to comment.