Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add .subscribe / async support to wasm client #268

Closed
wants to merge 2 commits into from

Conversation

dmccartney
Copy link
Collaborator

This picks up where #222 left off implementing Subscribe in the wasm/grpc-gateway client.

@dmccartney dmccartney force-pushed the daniel-subscribe-async branch 3 times, most recently from 5c2178c to 734b314 Compare October 12, 2023 02:17
vec![]
// HACK: this consumes from the stream whatever is already ready.
// TODO: implement a JS-friendly promise/future interface instead
async fn get_messages(&mut self) -> Vec<Envelope> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After much-mashing to get this working, I think we should scrap this and change the abstraction for XmtpApiSubscription across the library. I suspect it will be cleaner either as a proper Stream (i.e. ✂️ the get_messages() stuff and use .collect() and other stream tools) and/or by designing outside-in to use JS promises (qua rust Futures) and callbacks.

Copy link
Contributor

@neekolas neekolas Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no objections to this. The stream polling stuff was a huge hack to make things work quickly on iOS and clearly there is a better way.

Comment on lines 159 to 166
// HACK: this tugs at the stream w/ poll_next() to initiate the HTTP req.
// we need to do this _before_ we publish, otherwise the
// publish will happen before the subscription is ready.
// TODO: implement a JS-friendly promise/future interface instead
let mut cx = Context::from_waker(noop_waker_ref());
let stream = sub.stream.as_mut().unwrap();
let init = stream.as_mut().poll_next(&mut cx);
assert_eq!(true, init.is_pending());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

@dmccartney dmccartney force-pushed the daniel-subscribe-async branch from 734b314 to 82855eb Compare October 18, 2023 03:58
@dmccartney dmccartney force-pushed the daniel-subscribe-async branch from 82855eb to 0e740b7 Compare October 19, 2023 00:04
@dmccartney dmccartney marked this pull request as ready for review October 19, 2023 00:34
@dmccartney dmccartney requested a review from a team October 19, 2023 00:34
Comment on lines -75 to +78
pub trait XmtpApiSubscription {
fn is_closed(&self) -> bool;
fn get_messages(&self) -> Vec<Envelope>;
fn close_stream(&mut self);
}
pub trait XmtpApiSubscription {}

impl<T: Stream<Item = Envelope>> XmtpApiSubscription for T {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beyond the WASM implementation, this is conceptually the big change to subscribe() across both grpc and grpc-gateway clients. Now the result is just an ordinary Stream of Envelopes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the XmtpApiSubscription trait at all, seeing as it's empty? Can we just mandate that subscribe() returns a Stream instead of an XmtpApiSubscription? I think that lets us call the Stream methods on consumers even if the concrete type for XmtpApiClient is not known.

Comment on lines -60 to +59
pub struct Client {
pub struct XmtpGrpcClient {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed to be a little easier to understand alongside the other XmtpApiClient:
xmtp_api_grpc -> XmtpGrpcClient
xmtp_api_grpc_gateway -> XmtpGrpcGatewayClient

impl XmtpApiClient for Client {
type Subscription = Subscription;
impl XmtpApiClient for XmtpGrpcClient {
type Subscription = BoxStream<'static, Envelope>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this has a static lifetime, are we sure the memory for these envelopes is ever going to get released?

// grpc-gateway streams newline-delimited JSON bodies
let response = self
.http
.post(&format!("{}/message/v1/subscribe", self.url))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like it needs to be a reference, does it?

.await
.expect("published");

if let Ok(env) = rx.await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nooby question, but what is the point of the channel here? If we are awaiting anyway, what difference does it make if we do rx.await rather than stream.next().await directly?

// Discard any messages that we can't parse.
// TODO: consider surfacing these in a log somewhere
.filter_map(|r| async move { r.ok() })
.boxed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nooby question - why do we need boxed() here? Can we use tonic::Streaming<Envelope> instead of BoxStream<'static, Envelope>?

@richardhuaaa
Copy link
Contributor

This looks much better, thanks for cleaning it up! Just left some nooby questions about Streams

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants