From 5c2178cda44d3b7a130d36f08122749e2bb09adb Mon Sep 17 00:00:00 2001 From: Daniel McCartney Date: Wed, 11 Oct 2023 00:15:39 -0700 Subject: [PATCH] fix: add .subscribe / async support to wasm client --- bindings_wasm/tests/web.rs | 4 +- xmtp/src/mock_xmtp_api_client.rs | 4 +- xmtp_api_grpc/src/grpc_api_helper.rs | 5 +- xmtp_api_grpc/src/lib.rs | 4 +- xmtp_api_grpc_gateway/Cargo.lock | 50 ++++++++ xmtp_api_grpc_gateway/Cargo.toml | 9 +- xmtp_api_grpc_gateway/src/lib.rs | 157 ++++++++++++++++++----- xmtp_api_grpc_gateway/tests/wasm_test.rs | 71 +++++++++- xmtp_proto/src/api_client.rs | 4 +- 9 files changed, 267 insertions(+), 41 deletions(-) diff --git a/bindings_wasm/tests/web.rs b/bindings_wasm/tests/web.rs index 4c8918a9b..01065bb9d 100644 --- a/bindings_wasm/tests/web.rs +++ b/bindings_wasm/tests/web.rs @@ -5,9 +5,9 @@ use prost::Message; use wasm_bindgen::prelude::*; use wasm_bindgen_test::*; use xmtp_cryptography::signature::RecoverableSignature; -use xmtp_proto::api_client::XmtpApiClient; +use xmtp_proto::api_client::{XmtpApiClient, XmtpApiSubscription}; use xmtp_proto::xmtp::message_api::v1::{ - BatchQueryRequest, Envelope, PublishRequest, QueryRequest, + BatchQueryRequest, Envelope, PublishRequest, QueryRequest, SubscribeRequest, }; // Only run these tests in a browser. diff --git a/xmtp/src/mock_xmtp_api_client.rs b/xmtp/src/mock_xmtp_api_client.rs index ab6ce71c0..0e3a49b9c 100644 --- a/xmtp/src/mock_xmtp_api_client.rs +++ b/xmtp/src/mock_xmtp_api_client.rs @@ -7,12 +7,14 @@ use xmtp_proto::api_client::*; pub struct MockXmtpApiSubscription {} +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl XmtpApiSubscription for MockXmtpApiSubscription { fn is_closed(&self) -> bool { false } - fn get_messages(&self) -> Vec { + async fn get_messages(&mut self) -> Vec { vec![] } diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs index dcc9a95ce..94c051f17 100644 --- a/xmtp_api_grpc/src/grpc_api_helper.rs +++ b/xmtp_api_grpc/src/grpc_api_helper.rs @@ -245,12 +245,13 @@ impl Subscription { } } -impl XmtpApiSubscription for Subscription { +#[async_trait] +impl<'a> XmtpApiSubscription for Subscription { fn is_closed(&self) -> bool { self.closed.load(Ordering::SeqCst) } - fn get_messages(&self) -> Vec { + async fn get_messages(&mut self) -> Vec { let mut pending = self.pending.lock().unwrap(); let items = pending.drain(..).collect::>(); items diff --git a/xmtp_api_grpc/src/lib.rs b/xmtp_api_grpc/src/lib.rs index 1ae3b43d2..f3df63e77 100644 --- a/xmtp_api_grpc/src/lib.rs +++ b/xmtp_api_grpc/src/lib.rs @@ -116,12 +116,12 @@ mod tests { tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Ensure that messages appear - let results = stream_handler.get_messages(); + let results = stream_handler.get_messages().await; println!("{}", results.len()); assert!(results.len() == 1); // Ensure that the messages array has been cleared - let second_results = stream_handler.get_messages(); + let second_results = stream_handler.get_messages().await; assert!(second_results.is_empty()); // Ensure the is_closed status is propagated diff --git a/xmtp_api_grpc_gateway/Cargo.lock b/xmtp_api_grpc_gateway/Cargo.lock index 021ac6e29..483625edf 100644 --- a/xmtp_api_grpc_gateway/Cargo.lock +++ b/xmtp_api_grpc_gateway/Cargo.lock @@ -38,6 +38,17 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "1.0.5" @@ -1438,6 +1449,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -2661,10 +2675,12 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.25.2", "winreg", @@ -3392,9 +3408,21 @@ dependencies = [ "num_cpus", "pin-project-lite", "socket2 0.5.4", + "tokio-macros", "windows-sys", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.33", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -3439,6 +3467,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.12.3", "pin-project-lite", "tokio", "tracing", @@ -3766,6 +3796,19 @@ dependencies = [ "quote", ] +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -3952,12 +3995,19 @@ name = "xmtp_api_grpc_gateway" version = "0.1.0" dependencies = [ "async-trait", + "bytes", + "futures", + "futures-util", "getrandom", "hex", "js-sys", "prost", "prost-types", "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-util", "uuid 1.4.1", "wasm-bindgen", "wasm-bindgen-futures", diff --git a/xmtp_api_grpc_gateway/Cargo.toml b/xmtp_api_grpc_gateway/Cargo.toml index 8b58fcb33..f86068cb7 100644 --- a/xmtp_api_grpc_gateway/Cargo.toml +++ b/xmtp_api_grpc_gateway/Cargo.toml @@ -8,12 +8,19 @@ crate-type = ["cdylib", "rlib"] [dependencies] async-trait = "0.1.68" +bytes = "1.4.0" +futures = "0.3" +futures-util = "0.3" getrandom = { version = "0.2", features = ["js"] } hex = "0.4" js-sys = "0.3" prost = { version = "0.11", features = ["prost-derive"] } prost-types = "0.11" -reqwest = { version = "0.11.20", features = ["json"] } +reqwest = { version = "0.11.20", features = ["json", "stream"] } +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0" +tokio = {version="1.28.1", features=["rt", "sync", "macros", "io-util"]} +tokio-util = {version="0.7.8", features=["codec", "rt", "io-util"]} wasm-bindgen = "0.2.87" wasm-bindgen-futures = "0.4.37" xmtp_cryptography = { path = "../xmtp_cryptography", features = ["ws"] } diff --git a/xmtp_api_grpc_gateway/src/lib.rs b/xmtp_api_grpc_gateway/src/lib.rs index 9b526a4d4..a5703c43b 100644 --- a/xmtp_api_grpc_gateway/src/lib.rs +++ b/xmtp_api_grpc_gateway/src/lib.rs @@ -1,4 +1,13 @@ use async_trait::async_trait; +use bytes::Bytes; +use futures_util::stream::LocalBoxStream; +use futures_util::task::noop_waker_ref; +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use serde::Deserialize; +use std::marker::PhantomData; +use std::task::{Context, Poll}; +use tokio_util::codec::{FramedRead, LinesCodec}; +use tokio_util::io::StreamReader; use xmtp_proto::api_client::{Error, ErrorKind, XmtpApiClient, XmtpApiSubscription}; use xmtp_proto::xmtp::message_api::v1::{ BatchQueryRequest, BatchQueryResponse, Envelope, PublishRequest, PublishResponse, QueryRequest, @@ -9,23 +18,25 @@ use xmtp_proto::xmtp::message_api::v1::{ pub const LOCALHOST_ADDRESS: &str = "http://localhost:5555"; pub const DEV_ADDRESS: &str = "https://dev.xmtp.network:5555"; -pub struct XmtpGrpcGatewayClient { +pub struct XmtpGrpcGatewayClient<'a> { url: String, http: reqwest::Client, + phantom: PhantomData<&'a XmtpGrpcGatewaySubscription<'a>>, } -impl XmtpGrpcGatewayClient { +impl<'a> XmtpGrpcGatewayClient<'a> { pub fn new(url: String) -> Self { XmtpGrpcGatewayClient { url, http: reqwest::Client::new(), + phantom: PhantomData, } } } #[async_trait(?Send)] -impl XmtpApiClient for XmtpGrpcGatewayClient { - type Subscription = XmtpGrpcGatewaySubscription; +impl<'a> XmtpApiClient for XmtpGrpcGatewayClient<'a> { + type Subscription = XmtpGrpcGatewaySubscription<'a>; fn set_app_version(&mut self, _version: String) { // TODO @@ -36,67 +47,153 @@ impl XmtpApiClient for XmtpGrpcGatewayClient { token: String, request: PublishRequest, ) -> Result { - let res: PublishResponse = self + let response = self .http .post(&format!("{}/message/v1/publish", self.url)) .bearer_auth(token) .json(&request) .send() .await - .map_err(|e| Error::new(ErrorKind::PublishError).with(e))? - .json() - .await .map_err(|e| Error::new(ErrorKind::PublishError).with(e))?; - Ok(res) + Ok(response + .json::() + .await + .map_err(|e| Error::new(ErrorKind::PublishError).with(e))?) } async fn subscribe( &self, - _request: SubscribeRequest, - ) -> Result { - // TODO - Err(Error::new(ErrorKind::SubscribeError)) + request: SubscribeRequest, + ) -> Result, Error> { + // grpc-gateway streams newline-delimited JSON bodies + let response = self + .http + .post(&format!("{}/message/v1/subscribe", self.url)) + .json(&request) + .send() + .into_stream() + .filter_map(|r| async move { r.ok() }) + .map(|r| r.bytes_stream()) + .flatten() + .boxed_local(); + + Ok(XmtpGrpcGatewaySubscription::start(response)) } async fn query(&self, request: QueryRequest) -> Result { - let res: QueryResponse = self + let response = self .http .post(&format!("{}/message/v1/query", self.url)) .json(&request) .send() .await - .map_err(|e| Error::new(ErrorKind::QueryError).with(e))? - .json() - .await .map_err(|e| Error::new(ErrorKind::QueryError).with(e))?; - Ok(res) + Ok(response + .json::() + .await + .map_err(|e| Error::new(ErrorKind::QueryError).with(e))?) } async fn batch_query(&self, request: BatchQueryRequest) -> Result { - let res: BatchQueryResponse = self + let response = self .http .post(&format!("{}/message/v1/batch-query", self.url)) .json(&request) .send() .await - .map_err(|e| Error::new(ErrorKind::BatchQueryError).with(e))? - .json() - .await .map_err(|e| Error::new(ErrorKind::BatchQueryError).with(e))?; - Ok(res) + Ok(response + .json::() + .await + .map_err(|e| Error::new(ErrorKind::BatchQueryError).with(e))?) + } +} + +pub struct XmtpGrpcGatewaySubscription<'a> { + // When this is `None`, the stream has been closed. + pub stream: Option>>, +} + +// The result of calling .subscribe() +// The grpc-gateway streams newline-delimited JSON bodies +// in this shape: +// { result: { ... Envelope ... } }\n +// { result: { ... Envelope ... } }\n +// { result: { ... Envelope ... } }\n +// So we use this to pluck the `Envelope` as the `result`. +#[derive(Deserialize, Debug)] +struct SubscribeResult { + result: Envelope, +} + +impl<'a> XmtpGrpcGatewaySubscription<'a> { + pub fn start(req: LocalBoxStream<'a, Result>) -> Self { + let bytes_reader = StreamReader::new( + req.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), + ); + let codec = LinesCodec::new_with_max_length(1024 * 1024); + let frames_reader = FramedRead::with_capacity(bytes_reader, codec, 8 * 1024 * 1024); + let stream = frames_reader + .map(|frame_res| match frame_res { + Ok(frame_str) => serde_json::from_str::(frame_str.as_str()) + .map(|v| v.result) + .map_err(|e| Error::new(ErrorKind::SubscribeError).with(e)), + Err(err) => Err(Error::new(ErrorKind::SubscribeError).with(err)), + }) + .boxed_local(); + + XmtpGrpcGatewaySubscription { + stream: Some(stream), + } } } -// TODO: implement JSON segmented streaming -pub struct XmtpGrpcGatewaySubscription {} -impl XmtpApiSubscription for XmtpGrpcGatewaySubscription { +#[async_trait(?Send)] +impl<'a> XmtpApiSubscription for XmtpGrpcGatewaySubscription<'a> { fn is_closed(&self) -> bool { - true + return self.stream.is_none(); } - fn get_messages(&self) -> Vec { - vec![] + // HACK: this consumes from the stream whatever is already ready. + // TODO: implement a JS-friendly promise/future interface instead + async fn get_messages(&mut self) -> Vec { + if self.stream.is_none() { + return vec![]; + } + let stream = self.stream.as_mut().unwrap(); + let mut items: Vec = Vec::new(); + + // TODO: consider using `size_hint` and fixing buffer size + // let (lower, upper) = self.stream.unwrap().size_hint(); + // let capacity = clamp(lower, 10, 50); + // items.reserve(capacity); + // ... and on append: if items.len() >= capacity { return items; } + + // For now we rely on the subscriber to periodically call `get_messages`. + // There is no hint to JS to tell it when to wake up and check for more. + // So we use this no-op waker as context. + // TODO: implement JS event or promise instead. + let mut cx = Context::from_waker(noop_waker_ref()); + loop { + match stream.as_mut().poll_next(&mut cx) { + Poll::Pending => { + return items; + } + Poll::Ready(Some(item)) => { + if item.is_ok() { + items.push(item.unwrap()); + } + // else item.is_err() and we discard it. + } + Poll::Ready(None) => { + self.stream = None; + return items; + } + } + } } - fn close_stream(&mut self) {} + fn close_stream(&mut self) { + self.stream = None; + } } diff --git a/xmtp_api_grpc_gateway/tests/wasm_test.rs b/xmtp_api_grpc_gateway/tests/wasm_test.rs index bd7ee1eae..724c89e5b 100644 --- a/xmtp_api_grpc_gateway/tests/wasm_test.rs +++ b/xmtp_api_grpc_gateway/tests/wasm_test.rs @@ -1,11 +1,15 @@ extern crate wasm_bindgen_test; extern crate xmtp_api_grpc_gateway; + +use futures_util::task::noop_waker_ref; +use futures_util::StreamExt; +use std::task::Context; use wasm_bindgen::prelude::*; use wasm_bindgen_test::*; use xmtp_api_grpc_gateway::XmtpGrpcGatewayClient; -use xmtp_proto::api_client::XmtpApiClient; +use xmtp_proto::api_client::{XmtpApiClient, XmtpApiSubscription}; use xmtp_proto::xmtp::message_api::v1::{ - BatchQueryRequest, Envelope, PublishRequest, QueryRequest, + BatchQueryRequest, Envelope, PublishRequest, QueryRequest, SubscribeRequest, }; // Only run these tests in a browser. @@ -21,6 +25,7 @@ extern "C" { #[wasm_bindgen_test] pub async fn test_query_publish_query() { + log("test_query_publish_query"); let xmtp_url: String = "http://localhost:5555".to_string(); let topic = uuid::Uuid::new_v4(); let auth_token = ""; // TODO @@ -59,6 +64,7 @@ pub async fn test_query_publish_query() { #[wasm_bindgen_test] pub async fn test_batch_query_publish_batch_query() { + log("test_batch_query_publish_batch_query"); let xmtp_url: String = "http://localhost:5555".to_string(); let api = XmtpGrpcGatewayClient::new(xmtp_url); let topic1 = uuid::Uuid::new_v4(); @@ -129,3 +135,64 @@ pub async fn test_batch_query_publish_batch_query() { assert_eq!(vec![1, 1, 1, 1], e1.message); assert_eq!(vec![2, 2, 2, 2], e2.message); } + +#[wasm_bindgen_test] +pub async fn test_client_subscribe() { + log("test_client_subscribe"); + let xmtp_url: String = "http://localhost:5555".to_string(); + let api = XmtpGrpcGatewayClient::new(xmtp_url); + + let topic = uuid::Uuid::new_v4(); + log(&topic.to_string()); + let auth_token = ""; // TODO + + // Subscribe to a topic and then expect to see what we publish. + let mut sub = api + .subscribe(SubscribeRequest { + content_topics: vec![topic.to_string()], + // ..SubscribeRequest::default() + }) + .await + .expect("successfully subscribed"); + assert_eq!(false, sub.is_closed()); + + // HACK: this tugs at the stream w/ poll_next() to initiate the HTTP req. + // we need to do this _before_ we publish, otherwise the + // publish will happen before the subscription is ready. + // TODO: implement a JS-friendly promise/future interface instead + let mut cx = Context::from_waker(noop_waker_ref()); + let stream = sub.stream.as_mut().unwrap(); + let init = stream.as_mut().poll_next(&mut cx); + assert_eq!(true, init.is_pending()); + + log("publishing message"); + api.publish( + auth_token.to_string(), + PublishRequest { + envelopes: vec![Envelope { + content_topic: topic.to_string(), + message: vec![1, 2, 3, 4], + timestamp_ns: 1234, + }], + }, + ) + .await + .expect("published"); + + log("waiting for the next message"); + let msg = sub + .stream + .as_mut() + .expect("a subscription stream") + .next() + .await + .expect("received subscription message"); + assert_eq!(true, msg.is_ok()); + let env = msg.ok().unwrap(); + assert_eq!(topic.to_string(), env.content_topic); + assert_eq!(vec![1, 2, 3, 4], env.message); + + sub.close_stream(); + + assert_eq!(true, sub.is_closed()); +} diff --git a/xmtp_proto/src/api_client.rs b/xmtp_proto/src/api_client.rs index b3805bfe3..055e45717 100644 --- a/xmtp_proto/src/api_client.rs +++ b/xmtp_proto/src/api_client.rs @@ -72,9 +72,11 @@ impl StdError for Error { } } +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] pub trait XmtpApiSubscription { fn is_closed(&self) -> bool; - fn get_messages(&self) -> Vec; + async fn get_messages(&mut self) -> Vec; fn close_stream(&mut self); }