From 7b1f5f28844b1155d54f891e73bcfaad9973e8f5 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 6 Nov 2024 21:55:11 -0800 Subject: [PATCH] remove decrypted message --- Sources/XMTPiOS/Conversations.swift | 39 ------ Sources/XMTPiOS/Dm.swift | 98 ------------- .../XMTPiOS/{Mls => Libxmtp}/InboxState.swift | 0 .../{Mls => Libxmtp}/Installation.swift | 0 Sources/XMTPiOS/{Mls => Libxmtp}/Member.swift | 0 .../MessageV3.swift => Libxmtp/Message.swift} | 37 +---- .../PermissionPolicySet.swift | 0 .../XMTPiOS/Messages/DecryptedMessage.swift | 17 --- Sources/XMTPiOS/Messages/Message.swift | 33 ----- .../XMTPiOS/Messages/MessageHeaderV1.swift | 19 --- .../XMTPiOS/Messages/MessageHeaderV2.swift | 18 --- Sources/XMTPiOS/Messages/MessageV1.swift | 114 ---------------- Sources/XMTPiOS/Messages/MessageV2.swift | 120 ---------------- Tests/XMTPTests/GroupTests.swift | 129 ------------------ 14 files changed, 4 insertions(+), 620 deletions(-) rename Sources/XMTPiOS/{Mls => Libxmtp}/InboxState.swift (100%) rename Sources/XMTPiOS/{Mls => Libxmtp}/Installation.swift (100%) rename Sources/XMTPiOS/{Mls => Libxmtp}/Member.swift (100%) rename Sources/XMTPiOS/{Mls/MessageV3.swift => Libxmtp/Message.swift} (59%) rename Sources/XMTPiOS/{Mls => Libxmtp}/PermissionPolicySet.swift (100%) delete mode 100644 Sources/XMTPiOS/Messages/DecryptedMessage.swift delete mode 100644 Sources/XMTPiOS/Messages/Message.swift delete mode 100644 Sources/XMTPiOS/Messages/MessageHeaderV1.swift delete mode 100644 Sources/XMTPiOS/Messages/MessageHeaderV2.swift delete mode 100644 Sources/XMTPiOS/Messages/MessageV1.swift delete mode 100644 Sources/XMTPiOS/Messages/MessageV2.swift diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 799967de..ea6ff6c5 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -423,45 +423,6 @@ public actor Conversations { } } - public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream< - DecryptedMessage, Error - > { - AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() - let task = Task { - let stream = await self.client.v3Client?.conversations() - .streamAllMessages( - messageCallback: MessageCallback(client: self.client) { - message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation - } - return - } - do { - continuation.yield( - try MessageV3( - client: self.client, ffiMessage: message - ).decrypt()) - } catch { - print("Error onMessage \(error)") - } - } - ) - await ffiStreamActor.setFfiStream(stream) - } - - continuation.onTermination = { _ in - task.cancel() - Task { - await ffiStreamActor.endStream() - } - } - } - } - public func fromWelcome(envelopeBytes: Data) async throws -> Conversation? { diff --git a/Sources/XMTPiOS/Dm.swift b/Sources/XMTPiOS/Dm.swift index a3546144..01ba2362 100644 --- a/Sources/XMTPiOS/Dm.swift +++ b/Sources/XMTPiOS/Dm.swift @@ -203,42 +203,6 @@ public struct Dm: Identifiable, Equatable, Hashable { } } - public func streamDecryptedMessages() -> AsyncThrowingStream< - DecryptedMessage, Error - > { - AsyncThrowingStream { continuation in - let task = Task.detached { - self.streamHolder.stream = await self.ffiConversation.stream( - messageCallback: MessageCallback(client: self.client) { - message in - guard !Task.isCancelled else { - continuation.finish() - return - } - do { - continuation.yield( - try MessageV3( - client: self.client, ffiMessage: message - ).decrypt()) - } catch { - print("Error onMessage \(error)") - continuation.finish(throwing: error) - } - } - ) - - continuation.onTermination = { @Sendable reason in - self.streamHolder.stream?.end() - } - } - - continuation.onTermination = { @Sendable reason in - task.cancel() - self.streamHolder.stream?.end() - } - } - } - public func messages( before: Date? = nil, after: Date? = nil, @@ -300,66 +264,4 @@ public struct Dm: Identifiable, Equatable, Hashable { .decodeOrNull() } } - - public func decryptedMessages( - before: Date? = nil, - after: Date? = nil, - limit: Int? = nil, - direction: PagingInfoSortDirection? = .descending, - deliveryStatus: MessageDeliveryStatus? = .all - ) async throws -> [DecryptedMessage] { - var options = FfiListMessagesOptions( - sentBeforeNs: nil, - sentAfterNs: nil, - limit: nil, - deliveryStatus: nil, - direction: nil - ) - - if let before { - options.sentBeforeNs = Int64( - before.millisecondsSinceEpoch * 1_000_000) - } - - if let after { - options.sentAfterNs = Int64( - after.millisecondsSinceEpoch * 1_000_000) - } - - if let limit { - options.limit = Int64(limit) - } - - let status: FfiDeliveryStatus? = { - switch deliveryStatus { - case .published: - return FfiDeliveryStatus.published - case .unpublished: - return FfiDeliveryStatus.unpublished - case .failed: - return FfiDeliveryStatus.failed - default: - return nil - } - }() - - options.deliveryStatus = status - - let direction: FfiDirection? = { - switch direction { - case .ascending: - return FfiDirection.ascending - default: - return FfiDirection.descending - } - }() - - options.direction = direction - - return try ffiConversation.findMessages(opts: options).compactMap { - ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage) - .decryptOrNull() - } - } } diff --git a/Sources/XMTPiOS/Mls/InboxState.swift b/Sources/XMTPiOS/Libxmtp/InboxState.swift similarity index 100% rename from Sources/XMTPiOS/Mls/InboxState.swift rename to Sources/XMTPiOS/Libxmtp/InboxState.swift diff --git a/Sources/XMTPiOS/Mls/Installation.swift b/Sources/XMTPiOS/Libxmtp/Installation.swift similarity index 100% rename from Sources/XMTPiOS/Mls/Installation.swift rename to Sources/XMTPiOS/Libxmtp/Installation.swift diff --git a/Sources/XMTPiOS/Mls/Member.swift b/Sources/XMTPiOS/Libxmtp/Member.swift similarity index 100% rename from Sources/XMTPiOS/Mls/Member.swift rename to Sources/XMTPiOS/Libxmtp/Member.swift diff --git a/Sources/XMTPiOS/Mls/MessageV3.swift b/Sources/XMTPiOS/Libxmtp/Message.swift similarity index 59% rename from Sources/XMTPiOS/Mls/MessageV3.swift rename to Sources/XMTPiOS/Libxmtp/Message.swift index 31301854..6c0bcf05 100644 --- a/Sources/XMTPiOS/Mls/MessageV3.swift +++ b/Sources/XMTPiOS/Libxmtp/Message.swift @@ -8,11 +8,11 @@ import Foundation import LibXMTP -enum MessageV3Error: Error { +enum MessageError: Error { case decodeError(String) } -public struct MessageV3: Identifiable { +public struct Message: Identifiable { let client: Client let ffiMessage: FfiMessage @@ -54,7 +54,6 @@ public struct MessageV3: Identifiable { let decodedMessage = DecodedMessage( id: id, - client: client, topic: Topic.groupMessage(convoId).description, encodedContent: encodedContent, senderAddress: senderInboxId, @@ -63,12 +62,12 @@ public struct MessageV3: Identifiable { ) if decodedMessage.encodedContent.type == ContentTypeGroupUpdated && ffiMessage.kind != .membershipChange { - throw MessageV3Error.decodeError("Error decoding group membership change") + throw MessageError.decodeError("Error decoding group membership change") } return decodedMessage } catch { - throw MessageV3Error.decodeError("Error decoding message: \(error.localizedDescription)") + throw MessageError.decodeError("Error decoding message: \(error.localizedDescription)") } } @@ -80,32 +79,4 @@ public struct MessageV3: Identifiable { return nil } } - - public func decryptOrNull() -> DecryptedMessage? { - do { - return try decrypt() - } catch { - print("MESSAGE_V3: discarding message that failed to decrypt", error) - return nil - } - } - - public func decrypt() throws -> DecryptedMessage { - let encodedContent = try EncodedContent(serializedData: ffiMessage.content) - - let decrytedMessage = DecryptedMessage( - id: id, - encodedContent: encodedContent, - senderAddress: senderInboxId, - sentAt: sentAt, - topic: Topic.groupMessage(convoId).description, - deliveryStatus: deliveryStatus - ) - - if decrytedMessage.encodedContent.type == ContentTypeGroupUpdated && ffiMessage.kind != .membershipChange { - throw MessageV3Error.decodeError("Error decoding group membership change") - } - - return decrytedMessage - } } diff --git a/Sources/XMTPiOS/Mls/PermissionPolicySet.swift b/Sources/XMTPiOS/Libxmtp/PermissionPolicySet.swift similarity index 100% rename from Sources/XMTPiOS/Mls/PermissionPolicySet.swift rename to Sources/XMTPiOS/Libxmtp/PermissionPolicySet.swift diff --git a/Sources/XMTPiOS/Messages/DecryptedMessage.swift b/Sources/XMTPiOS/Messages/DecryptedMessage.swift deleted file mode 100644 index 69a1a57a..00000000 --- a/Sources/XMTPiOS/Messages/DecryptedMessage.swift +++ /dev/null @@ -1,17 +0,0 @@ -// -// DecryptedMessage.swift -// -// -// Created by Pat Nakajima on 11/14/23. -// - -import Foundation - -public struct DecryptedMessage { - public var id: String - public var encodedContent: EncodedContent - public var senderAddress: String - public var sentAt: Date - public var topic: String = "" - public var deliveryStatus: MessageDeliveryStatus = .published -} diff --git a/Sources/XMTPiOS/Messages/Message.swift b/Sources/XMTPiOS/Messages/Message.swift deleted file mode 100644 index 01124160..00000000 --- a/Sources/XMTPiOS/Messages/Message.swift +++ /dev/null @@ -1,33 +0,0 @@ -// -// Message.swift -// -// -// Created by Pat Nakajima on 11/27/22. -// - -/// Handles encryption/decryption for communicating data in conversations -public typealias Message = Xmtp_MessageContents_Message - -public enum MessageVersion: String, RawRepresentable { - case v1, - v2 -} - -public enum MessageDeliveryStatus: String, RawRepresentable, Sendable { - case all, - published, - unpublished, - failed -} - -extension Message { - init(v1: MessageV1) { - self.init() - self.v1 = v1 - } - - init(v2: MessageV2) { - self.init() - self.v2 = v2 - } -} diff --git a/Sources/XMTPiOS/Messages/MessageHeaderV1.swift b/Sources/XMTPiOS/Messages/MessageHeaderV1.swift deleted file mode 100644 index 25c3682f..00000000 --- a/Sources/XMTPiOS/Messages/MessageHeaderV1.swift +++ /dev/null @@ -1,19 +0,0 @@ -// -// MessageHeaderV1.swift -// -// -// Created by Pat Nakajima on 11/27/22. -// - -import Foundation - -typealias MessageHeaderV1 = Xmtp_MessageContents_MessageHeaderV1 - -extension MessageHeaderV1 { - init(sender: PublicKeyBundle, recipient: PublicKeyBundle, timestamp: UInt64) { - self.init() - self.sender = sender - self.recipient = recipient - self.timestamp = timestamp - } -} diff --git a/Sources/XMTPiOS/Messages/MessageHeaderV2.swift b/Sources/XMTPiOS/Messages/MessageHeaderV2.swift deleted file mode 100644 index d967525e..00000000 --- a/Sources/XMTPiOS/Messages/MessageHeaderV2.swift +++ /dev/null @@ -1,18 +0,0 @@ -// -// MessageHeaderV2.swift -// -// -// Created by Pat Nakajima on 12/5/22. -// - -import Foundation - -typealias MessageHeaderV2 = Xmtp_MessageContents_MessageHeaderV2 - -extension MessageHeaderV2 { - init(topic: String, created: Date) { - self.init() - self.topic = topic - createdNs = UInt64(created.millisecondsSinceEpoch * 1_000_000) - } -} diff --git a/Sources/XMTPiOS/Messages/MessageV1.swift b/Sources/XMTPiOS/Messages/MessageV1.swift deleted file mode 100644 index 6a075ca7..00000000 --- a/Sources/XMTPiOS/Messages/MessageV1.swift +++ /dev/null @@ -1,114 +0,0 @@ -// -// MessageV1.swift -// -// -// Created by Pat Nakajima on 11/26/22. -// - -import Foundation - -typealias MessageV1 = Xmtp_MessageContents_MessageV1 - -enum MessageV1Error: Error { - case cannotDecodeFromBytes -} - -extension MessageV1 { - static func encode(sender: PrivateKeyBundleV1, recipient: PublicKeyBundle, message: Data, timestamp: Date) throws -> MessageV1 { - let secret = try sender.sharedSecret( - peer: recipient, - myPreKey: sender.preKeys[0].publicKey, - isRecipient: false - ) - - let header = MessageHeaderV1( - sender: sender.toPublicKeyBundle(), - recipient: recipient, - timestamp: UInt64(timestamp.millisecondsSinceEpoch) - ) - - let headerBytes = try header.serializedData() - let ciphertext = try Crypto.encrypt(secret, message, additionalData: headerBytes) - - return MessageV1(headerBytes: headerBytes, ciphertext: ciphertext) - } - - static func fromBytes(_ bytes: Data) throws -> MessageV1 { - let message = try Message(serializedData: bytes) - var headerBytes: Data - var ciphertext: CipherText - - switch message.version { - case .v1: - headerBytes = message.v1.headerBytes - ciphertext = message.v1.ciphertext - case .v2: - headerBytes = message.v2.headerBytes - ciphertext = message.v2.ciphertext - default: - throw MessageV1Error.cannotDecodeFromBytes - } - - return MessageV1(headerBytes: headerBytes, ciphertext: ciphertext) - } - - init(headerBytes: Data, ciphertext: CipherText) { - self.init() - self.headerBytes = headerBytes - self.ciphertext = ciphertext - } - - var header: MessageHeaderV1 { - get throws { - do { - return try MessageHeaderV1(serializedData: headerBytes) - } catch { - print("Error deserializing MessageHeaderV1 \(error)") - throw error - } - } - } - - var senderAddress: String? { - do { - let senderKey = try header.sender.identityKey.recoverWalletSignerPublicKey() - return senderKey.walletAddress - } catch { - print("Error getting sender address: \(error)") - return nil - } - } - - var sentAt: Date { - // swiftlint:disable force_try - try! Date(timeIntervalSince1970: Double(header.timestamp / 1000)) - // swiftlint:enable force_try - } - - var recipientAddress: String? { - do { - let recipientKey = try header.recipient.identityKey.recoverWalletSignerPublicKey() - - return recipientKey.walletAddress - } catch { - print("Error getting recipient address: \(error)") - return nil - } - } - - func decrypt(with viewer: PrivateKeyBundleV1) throws -> Data { - let header = try MessageHeaderV1(serializedData: headerBytes) - - let recipient = header.recipient - let sender = header.sender - - var secret: Data - if viewer.walletAddress == sender.walletAddress { - secret = try viewer.sharedSecret(peer: recipient, myPreKey: sender.preKey, isRecipient: false) - } else { - secret = try viewer.sharedSecret(peer: sender, myPreKey: recipient.preKey, isRecipient: true) - } - - return try Crypto.decrypt(secret, ciphertext, additionalData: headerBytes) - } -} diff --git a/Sources/XMTPiOS/Messages/MessageV2.swift b/Sources/XMTPiOS/Messages/MessageV2.swift deleted file mode 100644 index 3e5d0090..00000000 --- a/Sources/XMTPiOS/Messages/MessageV2.swift +++ /dev/null @@ -1,120 +0,0 @@ -// -// MessageV2.swift -// -// -// Created by Pat Nakajima on 12/5/22. -// - -import CryptoKit -import Foundation -import LibXMTP - -typealias MessageV2 = Xmtp_MessageContents_MessageV2 - -enum MessageV2Error: Error { - case invalidSignature, decodeError(String), invalidData -} - -extension MessageV2 { - init(headerBytes: Data, ciphertext: CipherText, senderHmac: Data, shouldPush: Bool) { - self.init() - self.headerBytes = headerBytes - self.ciphertext = ciphertext - self.senderHmac = senderHmac - self.shouldPush = shouldPush - } - - static func decrypt(_ id: String, _ topic: String, _ message: MessageV2, keyMaterial: Data, client: Client) throws -> DecryptedMessage { - let decrypted = try Crypto.decrypt(keyMaterial, message.ciphertext, additionalData: message.headerBytes) - let signed = try SignedContent(serializedData: decrypted) - - guard signed.sender.hasPreKey, signed.sender.hasIdentityKey else { - throw MessageV2Error.decodeError("missing sender pre-key or identity key") - } - - let senderPreKey = try PublicKey(signed.sender.preKey) - let senderIdentityKey = try PublicKey(signed.sender.identityKey) - - // This is a bit confusing since we're passing keyBytes as the digest instead of a SHA256 hash. - // That's because our underlying crypto library always SHA256's whatever data is sent to it for this. - if !(try senderPreKey.signature.verify(signedBy: senderIdentityKey, digest: signed.sender.preKey.keyBytes)) { - throw MessageV2Error.decodeError("pre-key not signed by identity key") - } - - // Verify content signature - let key = try PublicKey.with { key in - key.secp256K1Uncompressed.bytes = try KeyUtilx.recoverPublicKeySHA256(from: signed.signature.rawData, message: Data(message.headerBytes + signed.payload)) - } - - if key.walletAddress != (try PublicKey(signed.sender.preKey).walletAddress) { - throw MessageV2Error.invalidSignature - } - - let encodedMessage = try EncodedContent(serializedData: signed.payload) - let header = try MessageHeaderV2(serializedData: message.headerBytes) - - return DecryptedMessage( - id: id, - encodedContent: encodedMessage, - senderAddress: try signed.sender.walletAddress, - sentAt: Date(timeIntervalSince1970: Double(header.createdNs / 1_000_000) / 1000), - topic: topic - ) - } - - static func decode(_ id: String, _ topic: String, _ message: MessageV2, keyMaterial: Data, client: Client) throws -> DecodedMessage { - do { - let decryptedMessage = try decrypt(id, topic, message, keyMaterial: keyMaterial, client: client) - - return DecodedMessage( - id: id, - client: client, - topic: decryptedMessage.topic, - encodedContent: decryptedMessage.encodedContent, - senderAddress: decryptedMessage.senderAddress, - sent: decryptedMessage.sentAt - ) - } catch { - print("ERROR DECODING: \(error)") - throw error - } - } - - static func encode(client: Client, content encodedContent: EncodedContent, topic: String, keyMaterial: Data, codec: Codec) async throws -> MessageV2 { - let payload = try encodedContent.serializedData() - - let date = Date() - let header = MessageHeaderV2(topic: topic, created: date) - let headerBytes = try header.serializedData() - - let digest = SHA256.hash(data: headerBytes + payload) - let preKey = try client.keys.preKeys[0] - let signature = try await preKey.sign(Data(digest)) - - let bundle = try client.v1keys.toV2().getPublicKeyBundle() - - let signedContent = SignedContent(payload: payload, sender: bundle, signature: signature) - let signedBytes = try signedContent.serializedData() - - let ciphertext = try Crypto.encrypt(keyMaterial, signedBytes, additionalData: headerBytes) - - let thirtyDayPeriodsSinceEpoch = Int(date.timeIntervalSince1970 / 60 / 60 / 24 / 30) - let info = "\(thirtyDayPeriodsSinceEpoch)-\(client.address)" - guard let infoEncoded = info.data(using: .utf8) else { - throw MessageV2Error.invalidData - } - - let senderHmac = try Crypto.generateHmacSignature(secret: keyMaterial, info: infoEncoded, message: headerBytes) - - let decoded = try codec.decode(content: encodedContent, client: client) - let shouldPush = try codec.shouldPush(content: decoded) - - - return MessageV2( - headerBytes: headerBytes, - ciphertext: ciphertext, - senderHmac: senderHmac, - shouldPush: shouldPush - ) - } -} diff --git a/Tests/XMTPTests/GroupTests.swift b/Tests/XMTPTests/GroupTests.swift index 1936dc80..48cda592 100644 --- a/Tests/XMTPTests/GroupTests.swift +++ b/Tests/XMTPTests/GroupTests.swift @@ -565,30 +565,6 @@ class GroupTests: XCTestCase { } - func testCanSendMessagesToGroupDecrypted() async throws { - let fixtures = try await localFixtures() - let aliceGroup = try await fixtures.aliceClient.conversations.newGroup(with: [fixtures.bob.address]) - - try await fixtures.bobClient.conversations.sync() - let bobGroup = try await fixtures.bobClient.conversations.listGroups()[0] - - _ = try await aliceGroup.send(content: "sup gang original") - _ = try await aliceGroup.send(content: "sup gang") - - try await aliceGroup.sync() - let aliceGroupsCount = try await aliceGroup.decryptedMessages().count - XCTAssertEqual(3, aliceGroupsCount) - let aliceMessage = try await aliceGroup.decryptedMessages().first! - - try await bobGroup.sync() - let bobGroupsCount = try await bobGroup.decryptedMessages().count - XCTAssertEqual(2, bobGroupsCount) - let bobMessage = try await bobGroup.decryptedMessages().first! - - XCTAssertEqual("sup gang", String(data: Data(aliceMessage.encodedContent.content), encoding: .utf8)) - XCTAssertEqual("sup gang", String(data: Data(bobMessage.encodedContent.content), encoding: .utf8)) - } - func testCanStreamGroupMessages() async throws { let fixtures = try await localFixtures() let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) @@ -740,30 +716,6 @@ class GroupTests: XCTestCase { await fulfillment(of: [expectation1], timeout: 3) } - func testCanStreamAllDecryptedMessages() async throws { - let fixtures = try await localFixtures() - let membershipChange = GroupUpdated() - - let expectation1 = XCTestExpectation(description: "got a conversation") - expectation1.expectedFulfillmentCount = 2 - let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) - let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) - let dm = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) - try await fixtures.aliceClient.conversations.sync() - Task(priority: .userInitiated) { - for try await _ in await fixtures.aliceClient.conversations.streamAllMessages() { - expectation1.fulfill() - } - } - - _ = try await group.send(content: "hi") - _ = try await group.send(content: membershipChange, options: SendOptions(contentType: ContentTypeGroupUpdated)) - _ = try await convo.send(content: "hi") - _ = try await dm.send(content: "hi") - - await fulfillment(of: [expectation1], timeout: 3) - } - func testCanStreamAllGroupMessages() async throws { let fixtures = try await localFixtures() @@ -783,26 +735,6 @@ class GroupTests: XCTestCase { await fulfillment(of: [expectation1], timeout: 3) } - - func testCanStreamAllGroupDecryptedMessages() async throws { - let fixtures = try await localFixtures() - - let expectation1 = XCTestExpectation(description: "got a conversation") - let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) - let dm = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) - - try await fixtures.aliceClient.conversations.sync() - Task(priority: .userInitiated) { - for try await _ in await fixtures.aliceClient.conversations.streamAllMessages() { - expectation1.fulfill() - } - } - - _ = try await group.send(content: "hi") - _ = try await dm.send(content: "hi") - - await fulfillment(of: [expectation1], timeout: 3) - } func testCanUpdateGroupMetadata() async throws { let fixtures = try await localFixtures() @@ -1032,65 +964,4 @@ class GroupTests: XCTestCase { } } } - - func testCanStreamAllDecryptedMessagesAndCancelStream() async throws { - let fixtures = try await localFixtures() - - var messages = 0 - let messagesQueue = DispatchQueue(label: "messages.queue") // Serial queue to synchronize access to `messages` - - let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) - let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) - try await fixtures.aliceClient.conversations.sync() - - let streamingTask = Task(priority: .userInitiated) { - for try await _ in await fixtures.aliceClient.conversations.streamAllMessages() { - messagesQueue.sync { - messages += 1 - } - } - } - - _ = try await group.send(content: "hi") - _ = try await convo.send(content: "hi") - - try await Task.sleep(nanoseconds: 1_000_000_000) - - streamingTask.cancel() - - messagesQueue.sync { - XCTAssertEqual(messages, 2) - } - - try await Task.sleep(nanoseconds: 1_000_000_000) - - _ = try await group.send(content: "hi") - _ = try await group.send(content: "hi") - _ = try await group.send(content: "hi") - _ = try await convo.send(content: "hi") - - try await Task.sleep(nanoseconds: 1_000_000_000) - - messagesQueue.sync { - XCTAssertEqual(messages, 2) - } - - let streamingTask2 = Task(priority: .userInitiated) { - for try await _ in await fixtures.aliceClient.conversations.streamAllMessages() { - // Update the messages count in a thread-safe manner - messagesQueue.sync { - messages += 1 - } - } - } - - _ = try await group.send(content: "hi") - _ = try await convo.send(content: "hi") - - try await Task.sleep(nanoseconds: 1_000_000_000) - - messagesQueue.sync { - XCTAssertEqual(messages, 4) - } - } }