Skip to content

Commit

Permalink
remove spawn in stream_all_messages by using async_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Aug 8, 2024
1 parent d6cb8f3 commit b2d7b0b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-http-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
- name: Start Docker containers
run: dev/up
- name: Run cargo test on main workspace
run: cargo test --workspace --exclude xmtp_api_grpc -- --test-threads=2
run: cargo test --workspace --exclude xmtp_api_grpc --features http-api -- --test-threads=4
1 change: 1 addition & 0 deletions bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ xmtp_cryptography = { workspace = true }
xmtp_id = { path = "../xmtp_id" }
xmtp_proto = { workspace = true, features = ["proto_full", "convert"] }
xmtp_v2 = { path = "../xmtp_v2" }
async-stream = "0.3"

# Test/Bench Utils
xmtp_api_grpc = { path = "../xmtp_api_grpc", optional = true }
Expand Down
37 changes: 18 additions & 19 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ where
)?;
let mls_group =
MlsGroup::new(context, group_id, stream_info.convo_created_at_ns);
log::info!("Processing stream entry ....");
mls_group
.process_stream_entry(envelope.clone(), client.clone())
.await
Expand Down Expand Up @@ -279,9 +278,7 @@ where

pub async fn stream_all_messages(
client: Arc<Client<ApiClient>>,
) -> Result<impl Stream<Item = StoredGroupMessage>, ClientError> {
let (tx, rx) = mpsc::unbounded_channel();

) -> Result<impl Stream<Item = Result<StoredGroupMessage, ClientError>>, ClientError> {
client.sync_welcomes().await?;

let mut group_id_to_info = client
Expand All @@ -292,7 +289,7 @@ where
.map(Into::into)
.collect::<HashMap<Vec<u8>, MessagesStreamInfo>>();

tokio::spawn(async move {
let stream = async_stream::stream! {
let client = client.clone();
let mut messages_stream = client
.clone()
Expand All @@ -310,22 +307,15 @@ where

messages = futures::future::ready(&mut extra_messages), if !extra_messages.is_empty() => {
for message in messages.drain(0..) {
if tx.send(message).is_err() {
break;
}
yield Ok(message);
}
},
Some(message) = messages_stream.next() => {
// an error can only mean the receiver has been dropped or closed so we're
// safe to end the stream
if tx.send(message).is_err() {
break;
}
yield Ok(message);
}
Some(new_group) = convo_stream.next() => {
if tx.is_closed() {
break;
}
if group_id_to_info.contains_key(&new_group.group_id) {
continue;
}
Expand All @@ -340,7 +330,14 @@ where
cursor: 1, // For the new group, stream all messages since the group was created
},
);
let new_messages_stream = client.clone().stream_messages(group_id_to_info.clone()).await?;

let new_messages_stream = match client.clone().stream_messages(group_id_to_info.clone()).await {
Ok(stream) => stream,
Err(e) => {
log::error!("{}", e);
break;
}
};

// attempt to drain all ready messages from existing stream
while let Some(Some(message)) = messages_stream.next().now_or_never() {
Expand All @@ -350,10 +347,9 @@ where
},
}
}
Ok::<_, ClientError>(())
});
};

Ok(UnboundedReceiverStream::new(rx))
Ok(Box::pin(stream))
}

pub fn stream_all_messages_with_callback(
Expand All @@ -366,7 +362,10 @@ where
let mut stream = Self::stream_all_messages(client).await?;
let _ = tx.send(());
while let Some(message) = stream.next().await {
callback(message)
match message {
Ok(m) => callback(m),
Err(m) => log::error!("error during stream all messages {}", m),
}
}
Ok(())
});
Expand Down

0 comments on commit b2d7b0b

Please sign in to comment.