From c8900444c62308c2835931c58c050bb3e5ca3b0d Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 12 Jun 2024 23:03:15 -0700 Subject: [PATCH] Fix conflicting streams issue (#348) * write a test to try and repro it * fix iOS conflicting streams * bump the pod * make the error clearer --- Sources/XMTPiOS/Conversations.swift | 25 +++++++++++++------ Tests/XMTPTests/GroupTests.swift | 25 +++++++++++++++++++ XMTP.podspec | 2 +- .../xcshareddata/swiftpm/Package.resolved | 4 +-- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 43b3b43c..3ed23a76 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -17,7 +17,7 @@ public enum ConversationError: Error, CustomStringConvertible { } public enum GroupError: Error, CustomStringConvertible { - case alphaMLSNotEnabled, memberCannotBeSelf, memberNotRegistered([String]), groupsRequireMessagePassed, notSupportedByGroups + case alphaMLSNotEnabled, memberCannotBeSelf, memberNotRegistered([String]), groupsRequireMessagePassed, notSupportedByGroups, streamingFailure public var description: String { switch self { @@ -31,6 +31,8 @@ public enum GroupError: Error, CustomStringConvertible { return "GroupError.groupsRequireMessagePassed you cannot call this method without passing a message instead of an envelope" case .notSupportedByGroups: return "GroupError.notSupportedByGroups this method is not supported by groups" + case .streamingFailure: + return "GroupError.streamingFailure a stream has failed" } } } @@ -92,11 +94,18 @@ public actor Conversations { public func streamGroups() async throws -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { - self.streamHolder.stream = try await self.client.v3Client?.conversations().stream( - callback: GroupStreamCallback(client: self.client) { group in - continuation.yield(group) - } - ) + let groupCallback = GroupStreamCallback(client: self.client) { group in + continuation.yield(group) + } + + guard let stream = try await self.client.v3Client?.conversations().stream(callback: groupCallback) else { + continuation.finish(throwing: GroupError.streamingFailure) + return + } + + continuation.onTermination = { @Sendable reason in + stream.end() + } } } } @@ -303,7 +312,7 @@ public actor Conversations { @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { do { var iterator = stream.makeAsyncIterator() - while let element = try await iterator.next() { + while let element = try await iterator.next() { continuation.yield(element) } continuation.finish() @@ -315,7 +324,7 @@ public actor Conversations { Task { await forwardStreamToMerged(stream: try streamAllV2Messages()) } - if (includeGroups) { + if includeGroups { Task { await forwardStreamToMerged(stream: streamAllGroupMessages()) } diff --git a/Tests/XMTPTests/GroupTests.swift b/Tests/XMTPTests/GroupTests.swift index 52d23d48..5585c035 100644 --- a/Tests/XMTPTests/GroupTests.swift +++ b/Tests/XMTPTests/GroupTests.swift @@ -562,6 +562,31 @@ class GroupTests: XCTestCase { await waitForExpectations(timeout: 3) } + func testStreamGroupsAndAllMessages() async throws { + let fixtures = try await localFixtures() + + let expectation1 = expectation(description: "got a group") + let expectation2 = expectation(description: "got a message") + + + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamGroups() { + expectation1.fulfill() + } + } + + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamAllMessages(includeGroups: true) { + expectation2.fulfill() + } + } + + let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + try await group.send(content: "hello") + + await waitForExpectations(timeout: 3) + } + func testCanStreamAllMessages() async throws { let fixtures = try await localFixtures() diff --git a/XMTP.podspec b/XMTP.podspec index b6918162..a6ce80d0 100644 --- a/XMTP.podspec +++ b/XMTP.podspec @@ -16,7 +16,7 @@ Pod::Spec.new do |spec| # spec.name = "XMTP" - spec.version = "0.11.5" + spec.version = "0.11.6" spec.summary = "XMTP SDK Cocoapod" # This description is used to generate tags and improve search results. diff --git a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 1fa3e748..f2d03777 100644 --- a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -59,8 +59,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/xmtp/libxmtp-swift", "state" : { - "revision" : "bc7e65b4db73430ae259bce32b391eefa82d4071", - "version" : "0.5.1-beta0" + "revision" : "c5e9ed9d3ee9de55beec4d7a8f76861c9d43230d", + "version" : "0.5.1-beta1" } }, {