Skip to content

Commit

Permalink
Fix conflicting streams issue (#348)
Browse files Browse the repository at this point in the history
* write a test to try and repro it

* fix iOS conflicting streams

* bump the pod

* make the error clearer
  • Loading branch information
nplasterer authored Jun 13, 2024
1 parent f219749 commit c890044
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
25 changes: 17 additions & 8 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public enum ConversationError: Error, CustomStringConvertible {
}

public enum GroupError: Error, CustomStringConvertible {
case alphaMLSNotEnabled, memberCannotBeSelf, memberNotRegistered([String]), groupsRequireMessagePassed, notSupportedByGroups
case alphaMLSNotEnabled, memberCannotBeSelf, memberNotRegistered([String]), groupsRequireMessagePassed, notSupportedByGroups, streamingFailure

public var description: String {
switch self {
Expand All @@ -31,6 +31,8 @@ public enum GroupError: Error, CustomStringConvertible {
return "GroupError.groupsRequireMessagePassed you cannot call this method without passing a message instead of an envelope"
case .notSupportedByGroups:
return "GroupError.notSupportedByGroups this method is not supported by groups"
case .streamingFailure:
return "GroupError.streamingFailure a stream has failed"
}
}
}
Expand Down Expand Up @@ -92,11 +94,18 @@ public actor Conversations {
public func streamGroups() async throws -> AsyncThrowingStream<Group, Error> {
AsyncThrowingStream { continuation in
Task {
self.streamHolder.stream = try await self.client.v3Client?.conversations().stream(
callback: GroupStreamCallback(client: self.client) { group in
continuation.yield(group)
}
)
let groupCallback = GroupStreamCallback(client: self.client) { group in
continuation.yield(group)
}

guard let stream = try await self.client.v3Client?.conversations().stream(callback: groupCallback) else {
continuation.finish(throwing: GroupError.streamingFailure)
return
}

continuation.onTermination = { @Sendable reason in
stream.end()
}
}
}
}
Expand Down Expand Up @@ -303,7 +312,7 @@ public actor Conversations {
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecodedMessage, Error>) async {
do {
var iterator = stream.makeAsyncIterator()
while let element = try await iterator.next() {
while let element = try await iterator.next() {
continuation.yield(element)
}
continuation.finish()
Expand All @@ -315,7 +324,7 @@ public actor Conversations {
Task {
await forwardStreamToMerged(stream: try streamAllV2Messages())
}
if (includeGroups) {
if includeGroups {
Task {
await forwardStreamToMerged(stream: streamAllGroupMessages())
}
Expand Down
25 changes: 25 additions & 0 deletions Tests/XMTPTests/GroupTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,31 @@ class GroupTests: XCTestCase {
await waitForExpectations(timeout: 3)
}

func testStreamGroupsAndAllMessages() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a group")
let expectation2 = expectation(description: "got a message")


Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamGroups() {
expectation1.fulfill()
}
}

Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAllMessages(includeGroups: true) {
expectation2.fulfill()
}
}

let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
try await group.send(content: "hello")

await waitForExpectations(timeout: 3)
}

func testCanStreamAllMessages() async throws {
let fixtures = try await localFixtures()

Expand Down
2 changes: 1 addition & 1 deletion XMTP.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pod::Spec.new do |spec|
#

spec.name = "XMTP"
spec.version = "0.11.5"
spec.version = "0.11.6"
spec.summary = "XMTP SDK Cocoapod"

# This description is used to generate tags and improve search results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/xmtp/libxmtp-swift",
"state" : {
"revision" : "bc7e65b4db73430ae259bce32b391eefa82d4071",
"version" : "0.5.1-beta0"
"revision" : "c5e9ed9d3ee9de55beec4d7a8f76861c9d43230d",
"version" : "0.5.1-beta1"
}
},
{
Expand Down

0 comments on commit c890044

Please sign in to comment.