From 2ce9bcb3c10e4204dde21fd8b697926eb38be911 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Mon, 29 Jan 2024 09:55:35 -0800 Subject: [PATCH 1/8] no longer need to break --- Sources/XMTPiOS/Conversations.swift | 2 -- .../project.xcworkspace/xcshareddata/swiftpm/Package.resolved | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index b6e2072a..be54948c 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -117,13 +117,11 @@ public actor Conversations { } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { let conversation = try fromInvite(envelope: envelope) conversationsByTopic[conversation.topic] = conversation - break // Break so we can resubscribe with the new conversation } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { let conversation = try fromIntro(envelope: envelope) conversationsByTopic[conversation.topic] = conversation let decoded = try conversation.decode(envelope) continuation.yield(decoded) - break // Break so we can resubscribe with the new conversation } else { print("huh \(envelope)") } diff --git a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 24ae34d9..910fd639 100644 --- a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -59,8 +59,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/xmtp/libxmtp-swift", "state" : { - "branch" : "60b99d0", - "revision" : "60b99d04f642a7e2b15156d818dabc4aab65b1c0" + "branch" : "ccbf6ac", + "revision" : "ccbf6ac71b8c5a89c3078d8dc4057123bea8a291" } }, { From 66ce4f56673747e7b2adbad98217eb288c530fb2 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 30 Jan 2024 17:29:54 -0800 Subject: [PATCH 2/8] try some new update methods --- Sources/XMTPiOS/ApiClient.swift | 16 +++++-- Sources/XMTPiOS/Client.swift | 8 ++++ Sources/XMTPiOS/Conversations.swift | 68 +++++++++++++++++------------ 3 files changed, 62 insertions(+), 30 deletions(-) diff --git a/Sources/XMTPiOS/ApiClient.swift b/Sources/XMTPiOS/ApiClient.swift index 9309d5da..2b6462be 100644 --- a/Sources/XMTPiOS/ApiClient.swift +++ b/Sources/XMTPiOS/ApiClient.swift @@ -36,6 +36,8 @@ protocol ApiClient: Sendable { func publish(envelopes: [Envelope]) async throws func publish(request: PublishRequest) async throws func subscribe(topics: [String]) -> AsyncThrowingStream + func subscribe2(request: SubscribeRequest) async throws -> FfiV2Subscription + func makeSubscribeRequest(topics: [String]) -> SubscribeRequest } func makeQueryRequest(topic: String, pagination: Pagination? = nil, cursor: Cursor? = nil) -> QueryRequest { @@ -132,20 +134,28 @@ final class GRPCApiClient: ApiClient { return envelopes } - + + func makeSubscribeRequest(topics: [String]) -> SubscribeRequest { + return SubscribeRequest.with { $0.contentTopics = topics } + } + + func subscribe2(request: SubscribeRequest) async throws -> LibXMTP.FfiV2Subscription { + return try await rustClient.subscribe(request: request.toFFI) + } + func subscribe(topics: [String]) -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { let request = SubscribeRequest.with { $0.contentTopics = topics } do { let subscription = try await rustClient.subscribe(request: request.toFFI) - + defer { Task { await subscription.end() } } - + while true { let nextEnvelope = try await subscription.next() continuation.yield(nextEnvelope.fromFFI) diff --git a/Sources/XMTPiOS/Client.swift b/Sources/XMTPiOS/Client.swift index b50bfd46..35ce35c8 100644 --- a/Sources/XMTPiOS/Client.swift +++ b/Sources/XMTPiOS/Client.swift @@ -328,6 +328,14 @@ public final class Client: Sendable { public func subscribe(topics: [Topic]) -> AsyncThrowingStream { return subscribe(topics: topics.map(\.description)) } + + public func subscribe2(request: SubscribeRequest) async throws -> FfiV2Subscription { + return try await apiClient.subscribe2(request: request) + } + + public func makeSubscribeRequest(topics: [String]) -> SubscribeRequest { + return apiClient.makeSubscribeRequest(topics: topics) + } func getUserContact(peerAddress: String) async throws -> ContactBundle? { let peerAddress = EthereumAddress(peerAddress).toChecksumAddress() diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index be54948c..af09ebba 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -1,7 +1,7 @@ import Foundation public enum ConversationError: Error { - case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String) + case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String), streamingIssue(String) } /// Handles listing and creating Conversations. @@ -98,42 +98,56 @@ public actor Conversations { public func streamAllMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in - Task { - while true { - var topics: [String] = [ - Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description, - ] + var topics: [String] = [ + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description, + ] - for conversation in try await list() { - topics.append(conversation.topic) + for conversation in try await list() { + topics.append(conversation.topic) + } + + var subscribeRequest = client.makeSubscribeRequest(topics: topics) + var subscription = client.subscribe2(request: subscribeRequest) + + Task { + let request = SubscribeRequest.with { $0.contentTopics = topics } + do { + defer { + Task { + await subscription.end() + } } - do { - for try await envelope in client.subscribe(topics: topics) { - if let conversation = conversationsByTopic[envelope.contentTopic] { - let decoded = try conversation.decode(envelope) - continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try fromInvite(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try fromIntro(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - let decoded = try conversation.decode(envelope) - continuation.yield(decoded) - } else { - print("huh \(envelope)") - } + while true { + let nextEnvelope = try await subscription.next() + let envelope = nextEnvelope.fromFFI + if let conversation = conversationsByTopic[envelope.contentTopic] { + let decoded = try conversation.decode(envelope) + continuation.yield(decoded) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { + let conversation = try fromInvite(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + subscribeRequest = client.makeSubscribeRequest(topics: topics) + subscription.update(req: subscribeRequest.toFFI) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { + let conversation = try fromIntro(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + let decoded = try conversation.decode(envelope) + subscription.update(req: subscribeRequest.toFFI) + continuation.yield(decoded) + } else { + print("huh \(envelope)") } - } catch { - continuation.finish(throwing: error) } + } catch { + throw ConversationError.streamingIssue(error.localizedDescription) } } } } + public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { From 96d521a12c7bc59036c6db1bccd491454d8464e4 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 30 Jan 2024 20:09:02 -0800 Subject: [PATCH 3/8] get the code closer --- Sources/XMTPiOS/ApiClient.swift | 4 +-- Sources/XMTPiOS/Conversations.swift | 39 +++++++++++++---------------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/Sources/XMTPiOS/ApiClient.swift b/Sources/XMTPiOS/ApiClient.swift index 2b6462be..f24c8c5b 100644 --- a/Sources/XMTPiOS/ApiClient.swift +++ b/Sources/XMTPiOS/ApiClient.swift @@ -149,13 +149,13 @@ final class GRPCApiClient: ApiClient { let request = SubscribeRequest.with { $0.contentTopics = topics } do { let subscription = try await rustClient.subscribe(request: request.toFFI) - + defer { Task { await subscription.end() } } - + while true { let nextEnvelope = try await subscription.next() continuation.yield(nextEnvelope.fromFFI) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index af09ebba..9ffae77b 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -98,27 +98,21 @@ public actor Conversations { public func streamAllMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in - var topics: [String] = [ - Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description, - ] + Task { + var topics: [String] = [ + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description, + ] - for conversation in try await list() { - topics.append(conversation.topic) - } + for conversation in try await list() { + topics.append(conversation.topic) + } - var subscribeRequest = client.makeSubscribeRequest(topics: topics) - var subscription = client.subscribe2(request: subscribeRequest) + var subscribeRequest = client.makeSubscribeRequest(topics: topics) + var subscription = try await client.subscribe2(request: subscribeRequest) - Task { let request = SubscribeRequest.with { $0.contentTopics = topics } do { - defer { - Task { - await subscription.end() - } - } - while true { let nextEnvelope = try await subscription.next() let envelope = nextEnvelope.fromFFI @@ -128,26 +122,27 @@ public actor Conversations { } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { let conversation = try fromInvite(envelope: envelope) conversationsByTopic[conversation.topic] = conversation + topics.append(conversation.topic) subscribeRequest = client.makeSubscribeRequest(topics: topics) - subscription.update(req: subscribeRequest.toFFI) + try await subscription.update(req: subscribeRequest.toFFI) } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { let conversation = try fromIntro(envelope: envelope) conversationsByTopic[conversation.topic] = conversation let decoded = try conversation.decode(envelope) - subscription.update(req: subscribeRequest.toFFI) continuation.yield(decoded) - } else { - print("huh \(envelope)") + topics.append(conversation.topic) + subscribeRequest = client.makeSubscribeRequest(topics: topics) + try await subscription.update(req: subscribeRequest.toFFI) } } } catch { - throw ConversationError.streamingIssue(error.localizedDescription) + await subscription.end() + continuation.finish(throwing: error) } } } } - public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { From 42c21c518bbc81830b5f345e5b4e6b67d7a2fcf4 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 30 Jan 2024 20:12:06 -0800 Subject: [PATCH 4/8] update it for decrypted as well --- Sources/XMTPiOS/Conversations.swift | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 9ffae77b..a30b6c75 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -111,7 +111,6 @@ public actor Conversations { var subscribeRequest = client.makeSubscribeRequest(topics: topics) var subscription = try await client.subscribe2(request: subscribeRequest) - let request = SubscribeRequest.with { $0.contentTopics = topics } do { while true { let nextEnvelope = try await subscription.next() @@ -146,7 +145,6 @@ public actor Conversations { public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { - while true { var topics: [String] = [ Topic.userInvite(client.address).description, Topic.userIntro(client.address).description, @@ -156,26 +154,34 @@ public actor Conversations { topics.append(conversation.topic) } + var subscribeRequest = client.makeSubscribeRequest(topics: topics) + var subscription = try await client.subscribe2(request: subscribeRequest) + do { - for try await envelope in client.subscribe(topics: topics) { + while true { + let nextEnvelope = try await subscription.next() + let envelope = nextEnvelope.fromFFI if let conversation = conversationsByTopic[envelope.contentTopic] { let decoded = try conversation.decrypt(envelope) continuation.yield(decoded) } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { let conversation = try fromInvite(envelope: envelope) conversationsByTopic[conversation.topic] = conversation - break // Break so we can resubscribe with the new conversation + topics.append(conversation.topic) + subscribeRequest = client.makeSubscribeRequest(topics: topics) + try await subscription.update(req: subscribeRequest.toFFI) } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { let conversation = try fromIntro(envelope: envelope) conversationsByTopic[conversation.topic] = conversation let decoded = try conversation.decrypt(envelope) continuation.yield(decoded) - break // Break so we can resubscribe with the new conversation - } else { - print("huh \(envelope)") + topics.append(conversation.topic) + subscribeRequest = client.makeSubscribeRequest(topics: topics) + try await subscription.update(req: subscribeRequest.toFFI) } } } catch { + await subscription.end() continuation.finish(throwing: error) } } From c13cd56931b112176e3e351b9cbc4ecd3b293213 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 30 Jan 2024 21:49:28 -0800 Subject: [PATCH 5/8] get the tests passing --- Sources/XMTPTestHelpers/TestHelpers.swift | 30 +++++++- Sources/XMTPiOS/ApiClient.swift | 13 +--- Sources/XMTPiOS/Client.swift | 10 +-- Sources/XMTPiOS/ConversationV1.swift | 6 +- Sources/XMTPiOS/ConversationV2.swift | 6 +- Sources/XMTPiOS/Conversations.swift | 90 ++++++++++------------- 6 files changed, 80 insertions(+), 75 deletions(-) diff --git a/Sources/XMTPTestHelpers/TestHelpers.swift b/Sources/XMTPTestHelpers/TestHelpers.swift index 9ef4a82e..3f9cb6f5 100644 --- a/Sources/XMTPTestHelpers/TestHelpers.swift +++ b/Sources/XMTPTestHelpers/TestHelpers.swift @@ -81,6 +81,27 @@ class FakeStreamHolder: ObservableObject { @available(iOS 15, *) public class FakeApiClient: ApiClient { + public func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: XMTPiOS.Envelope, subscription: LibXMTP.FfiV2Subscription), Error> { + AsyncThrowingStream { continuation in + self.cancellable = stream.$envelope.sink(receiveValue: { env in + if let env, topics.contains(env.contentTopic) { + Task { + let request = SubscribeRequest.with { $0.contentTopics = topics } + try continuation.yield((env, await self.subscribe2(request: request))) + } + } + }) + } + } + + public func subscribe2(request: XMTPiOS.SubscribeRequest) async throws -> LibXMTP.FfiV2Subscription { + return try await rustClient.subscribe(request: request.toFFI) + } + + public func makeSubscribeRequest(topics: [String]) -> XMTPiOS.SubscribeRequest { + return SubscribeRequest.with { $0.contentTopics = topics } + } + public func envelopes(topic: String, pagination: XMTPiOS.Pagination?) async throws -> [XMTPiOS.Envelope] { try await query(topic: topic, pagination: pagination).envelopes } @@ -88,6 +109,7 @@ public class FakeApiClient: ApiClient { public var environment: XMTPEnvironment public var authToken: String = "" public var appVersion: String + public var rustClient: LibXMTP.FfiV2ApiClient private var responses: [String: [XMTPiOS.Envelope]] = [:] private var stream = FakeStreamHolder() public var published: [XMTPiOS.Envelope] = [] @@ -118,9 +140,10 @@ public class FakeApiClient: ApiClient { responses[topic.description] = responsesForTopic } - public init() { + public init() async throws { environment = .local appVersion = "test/0.0.0" + rustClient = try await LibXMTP.createV2Client(host: GRPCApiClient.envToUrl(env: .local), isSecure: false) } public func send(envelope: XMTPiOS.Envelope) { @@ -137,9 +160,10 @@ public class FakeApiClient: ApiClient { // MARK: ApiClient conformance - public required init(environment: XMTPiOS.XMTPEnvironment, secure _: Bool, rustClient _: LibXMTP.FfiV2ApiClient, appVersion: String?) throws { + public required init(environment: XMTPiOS.XMTPEnvironment, secure _: Bool, rustClient: LibXMTP.FfiV2ApiClient, appVersion: String?) throws { self.environment = environment self.appVersion = appVersion ?? "0.0.0" + self.rustClient = rustClient } public func subscribe(topics: [String]) -> AsyncThrowingStream { @@ -271,7 +295,7 @@ public struct Fixtures { alice = try PrivateKey.generate() bob = try PrivateKey.generate() - fakeApiClient = FakeApiClient() + fakeApiClient = try await FakeApiClient() aliceClient = try await Client.create(account: alice, apiClient: fakeApiClient) bobClient = try await Client.create(account: bob, apiClient: fakeApiClient) diff --git a/Sources/XMTPiOS/ApiClient.swift b/Sources/XMTPiOS/ApiClient.swift index f24c8c5b..e8a99e77 100644 --- a/Sources/XMTPiOS/ApiClient.swift +++ b/Sources/XMTPiOS/ApiClient.swift @@ -35,8 +35,7 @@ protocol ApiClient: Sendable { func envelopes(topic: String, pagination: Pagination?) async throws -> [Envelope] func publish(envelopes: [Envelope]) async throws func publish(request: PublishRequest) async throws - func subscribe(topics: [String]) -> AsyncThrowingStream - func subscribe2(request: SubscribeRequest) async throws -> FfiV2Subscription + func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> func makeSubscribeRequest(topics: [String]) -> SubscribeRequest } @@ -138,12 +137,8 @@ final class GRPCApiClient: ApiClient { func makeSubscribeRequest(topics: [String]) -> SubscribeRequest { return SubscribeRequest.with { $0.contentTopics = topics } } - - func subscribe2(request: SubscribeRequest) async throws -> LibXMTP.FfiV2Subscription { - return try await rustClient.subscribe(request: request.toFFI) - } - - func subscribe(topics: [String]) -> AsyncThrowingStream { + + func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> { return AsyncThrowingStream { continuation in Task { let request = SubscribeRequest.with { $0.contentTopics = topics } @@ -158,7 +153,7 @@ final class GRPCApiClient: ApiClient { while true { let nextEnvelope = try await subscription.next() - continuation.yield(nextEnvelope.fromFFI) + continuation.yield((envelope: nextEnvelope.fromFFI, subscription: subscription)) } } catch { throw ApiClientError.subscribeError(error.localizedDescription) diff --git a/Sources/XMTPiOS/Client.swift b/Sources/XMTPiOS/Client.swift index 35ce35c8..7229dc44 100644 --- a/Sources/XMTPiOS/Client.swift +++ b/Sources/XMTPiOS/Client.swift @@ -321,18 +321,14 @@ public final class Client: Sendable { try await apiClient.publish(envelopes: envelopes) } - public func subscribe(topics: [String]) -> AsyncThrowingStream { + public func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> { return apiClient.subscribe(topics: topics) } - public func subscribe(topics: [Topic]) -> AsyncThrowingStream { + public func subscribe(topics: [Topic]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> { return subscribe(topics: topics.map(\.description)) } - - public func subscribe2(request: SubscribeRequest) async throws -> FfiV2Subscription { - return try await apiClient.subscribe2(request: request) - } - + public func makeSubscribeRequest(topics: [String]) -> SubscribeRequest { return apiClient.makeSubscribeRequest(topics: topics) } diff --git a/Sources/XMTPiOS/ConversationV1.swift b/Sources/XMTPiOS/ConversationV1.swift index 282a11e6..278e7d6d 100644 --- a/Sources/XMTPiOS/ConversationV1.swift +++ b/Sources/XMTPiOS/ConversationV1.swift @@ -154,7 +154,7 @@ public struct ConversationV1 { public func streamMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { - for try await envelope in client.subscribe(topics: [topic.description]) { + for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) { let decoded = try decode(envelope: envelope) continuation.yield(decoded) } @@ -165,7 +165,7 @@ public struct ConversationV1 { public func streamDecryptedMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { - for try await envelope in client.subscribe(topics: [topic.description]) { + for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) { let decoded = try decrypt(envelope: envelope) continuation.yield(decoded) } @@ -181,7 +181,7 @@ public struct ConversationV1 { AsyncThrowingStream { continuation in Task { do { - for try await envelope in client.subscribe(topics: [ephemeralTopic]) { + for try await (envelope, subscription) in client.subscribe(topics: [ephemeralTopic]) { continuation.yield(envelope) } } catch { diff --git a/Sources/XMTPiOS/ConversationV2.swift b/Sources/XMTPiOS/ConversationV2.swift index cd345d9b..e2033f52 100644 --- a/Sources/XMTPiOS/ConversationV2.swift +++ b/Sources/XMTPiOS/ConversationV2.swift @@ -159,7 +159,7 @@ public struct ConversationV2 { AsyncThrowingStream { continuation in Task { do { - for try await envelope in client.subscribe(topics: [ephemeralTopic]) { + for try await (envelope, subscription) in client.subscribe(topics: [ephemeralTopic]) { continuation.yield(envelope) } } catch { @@ -172,7 +172,7 @@ public struct ConversationV2 { public func streamMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { - for try await envelope in client.subscribe(topics: [topic.description]) { + for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) { let decoded = try decode(envelope: envelope) continuation.yield(decoded) @@ -184,7 +184,7 @@ public struct ConversationV2 { public func streamDecryptedMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { - for try await envelope in client.subscribe(topics: [topic.description]) { + for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) { let decoded = try decrypt(envelope: envelope) continuation.yield(decoded) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index a30b6c75..50d4744d 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -108,34 +108,29 @@ public actor Conversations { topics.append(conversation.topic) } - var subscribeRequest = client.makeSubscribeRequest(topics: topics) - var subscription = try await client.subscribe2(request: subscribeRequest) - do { - while true { - let nextEnvelope = try await subscription.next() - let envelope = nextEnvelope.fromFFI - if let conversation = conversationsByTopic[envelope.contentTopic] { - let decoded = try conversation.decode(envelope) - continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try fromInvite(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - topics.append(conversation.topic) - subscribeRequest = client.makeSubscribeRequest(topics: topics) - try await subscription.update(req: subscribeRequest.toFFI) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try fromIntro(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - let decoded = try conversation.decode(envelope) - continuation.yield(decoded) - topics.append(conversation.topic) - subscribeRequest = client.makeSubscribeRequest(topics: topics) - try await subscription.update(req: subscribeRequest.toFFI) + for try await (envelope, subscription) in client.subscribe(topics: topics) { + while true { + if let conversation = conversationsByTopic[envelope.contentTopic] { + let decoded = try conversation.decode(envelope) + continuation.yield(decoded) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { + let conversation = try fromInvite(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + topics.append(conversation.topic) + try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { + let conversation = try fromIntro(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + let decoded = try conversation.decode(envelope) + continuation.yield(decoded) + topics.append(conversation.topic) + try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) + + } } } } catch { - await subscription.end() continuation.finish(throwing: error) } } @@ -154,40 +149,35 @@ public actor Conversations { topics.append(conversation.topic) } - var subscribeRequest = client.makeSubscribeRequest(topics: topics) - var subscription = try await client.subscribe2(request: subscribeRequest) - do { - while true { - let nextEnvelope = try await subscription.next() - let envelope = nextEnvelope.fromFFI - if let conversation = conversationsByTopic[envelope.contentTopic] { - let decoded = try conversation.decrypt(envelope) - continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try fromInvite(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - topics.append(conversation.topic) - subscribeRequest = client.makeSubscribeRequest(topics: topics) - try await subscription.update(req: subscribeRequest.toFFI) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try fromIntro(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - let decoded = try conversation.decrypt(envelope) - continuation.yield(decoded) - topics.append(conversation.topic) - subscribeRequest = client.makeSubscribeRequest(topics: topics) - try await subscription.update(req: subscribeRequest.toFFI) + for try await (envelope, subscription) in client.subscribe(topics: topics) { + while true { + let nextEnvelope = try await subscription.next() + let envelope = nextEnvelope.fromFFI + if let conversation = conversationsByTopic[envelope.contentTopic] { + let decoded = try conversation.decrypt(envelope) + continuation.yield(decoded) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { + let conversation = try fromInvite(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + topics.append(conversation.topic) + try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { + let conversation = try fromIntro(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + let decoded = try conversation.decrypt(envelope) + continuation.yield(decoded) + topics.append(conversation.topic) + try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) + } } } } catch { - await subscription.end() continuation.finish(throwing: error) } } } } - } public func fromInvite(envelope: Envelope) throws -> Conversation { let sealedInvitation = try SealedInvitation(serializedData: envelope.message) @@ -253,7 +243,7 @@ public actor Conversations { Task { var streamedConversationTopics: Set = [] - for try await envelope in client.subscribe(topics: [.userIntro(client.address), .userInvite(client.address)]) { + for try await (envelope, subscription) in client.subscribe(topics: [.userIntro(client.address), .userInvite(client.address)]) { if envelope.contentTopic == Topic.userIntro(client.address).description { let conversationV1 = try fromIntro(envelope: envelope) From e183f356dad8c56b4bc533158d5fd029e83453b6 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 30 Jan 2024 21:55:25 -0800 Subject: [PATCH 6/8] fix up some spacing --- Sources/XMTPiOS/Conversations.swift | 65 ++++++++++++++--------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 50d4744d..2d7a747e 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -108,9 +108,9 @@ public actor Conversations { topics.append(conversation.topic) } - do { - for try await (envelope, subscription) in client.subscribe(topics: topics) { - while true { + while(true) { + do { + for try await (envelope, subscription) in client.subscribe(topics: topics) { if let conversation = conversationsByTopic[envelope.contentTopic] { let decoded = try conversation.decode(envelope) continuation.yield(decoded) @@ -126,12 +126,11 @@ public actor Conversations { continuation.yield(decoded) topics.append(conversation.topic) try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) - } } + } catch { + continuation.finish(throwing: error) } - } catch { - continuation.finish(throwing: error) } } } @@ -140,36 +139,33 @@ public actor Conversations { public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { - var topics: [String] = [ - Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description, - ] - - for conversation in try await list() { - topics.append(conversation.topic) - } - + var topics: [String] = [ + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description, + ] + + for conversation in try await list() { + topics.append(conversation.topic) + } + + while true { do { for try await (envelope, subscription) in client.subscribe(topics: topics) { - while true { - let nextEnvelope = try await subscription.next() - let envelope = nextEnvelope.fromFFI - if let conversation = conversationsByTopic[envelope.contentTopic] { - let decoded = try conversation.decrypt(envelope) - continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try fromInvite(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - topics.append(conversation.topic) - try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try fromIntro(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation - let decoded = try conversation.decrypt(envelope) - continuation.yield(decoded) - topics.append(conversation.topic) - try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) - } + if let conversation = conversationsByTopic[envelope.contentTopic] { + let decoded = try conversation.decrypt(envelope) + continuation.yield(decoded) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { + let conversation = try fromInvite(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + topics.append(conversation.topic) + try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { + let conversation = try fromIntro(envelope: envelope) + conversationsByTopic[conversation.topic] = conversation + let decoded = try conversation.decrypt(envelope) + continuation.yield(decoded) + topics.append(conversation.topic) + try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI) } } } catch { @@ -178,6 +174,7 @@ public actor Conversations { } } } + } public func fromInvite(envelope: Envelope) throws -> Conversation { let sealedInvitation = try SealedInvitation(serializedData: envelope.message) From 9b1c3206160f54b1308b28b369c5bbb837280765 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 29 Feb 2024 09:01:30 -0700 Subject: [PATCH 7/8] fix up the resolved --- .../xcshareddata/swiftpm/Package.resolved | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 9b0b4e31..3612b1e6 100644 --- a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -59,32 +59,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/xmtp/libxmtp-swift", "state" : { -<<<<<<< HEAD -<<<<<<< HEAD - "branch" : "ccbf6ac", - "revision" : "ccbf6ac71b8c5a89c3078d8dc4057123bea8a291" -||||||| 7632a9b - "branch" : "60b99d0", - "revision" : "60b99d04f642a7e2b15156d818dabc4aab65b1c0" -======= "revision" : "f6571b6b771b4c0a2a9469bd407ce124a9d91fed", "version" : "0.4.2-beta3" ->>>>>>> d6a719d3e84816098eeb8c5787f0c78a511860f4 -||||||| merged common ancestors -<<<<<<<<< Temporary merge branch 1 - "branch" : "92274fe", - "revision" : "92274fe0dde1fc7f8f716ebcffa3d252813be56d" -||||||||| 7632a9b - "branch" : "60b99d0", - "revision" : "60b99d04f642a7e2b15156d818dabc4aab65b1c0" -========= - "branch" : "ccbf6ac", - "revision" : "ccbf6ac71b8c5a89c3078d8dc4057123bea8a291" ->>>>>>>>> Temporary merge branch 2 -======= - "branch" : "92274fe", - "revision" : "92274fe0dde1fc7f8f716ebcffa3d252813be56d" ->>>>>>> 7b4eba89bf2c502d5df695c19cc27016e003da62 } }, { From 2d1addc9ad565bb1f951e92e91f6251518dbf185 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 29 Feb 2024 11:56:36 -0700 Subject: [PATCH 8/8] more updates to resolved --- .../project.xcworkspace/xcshareddata/swiftpm/Package.resolved | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 3612b1e6..42929c12 100644 --- a/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/XMTPiOSExample/XMTPiOSExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -59,8 +59,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/xmtp/libxmtp-swift", "state" : { - "revision" : "f6571b6b771b4c0a2a9469bd407ce124a9d91fed", - "version" : "0.4.2-beta3" + "revision" : "28ee27a4ded8b996a74850e366247e9fe51d782a", + "version" : "0.4.2-beta4" } }, {