From 67b94e6092786d5854021ccebdc0f9c09298bbbb Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 1 Nov 2024 13:33:14 -0700 Subject: [PATCH] Performance improvements to 1to1 dm peerinboxId (#413) * update package * add performance test and signing key * improve dm performance * update all streams * remove perf test * fix up the test * swift format is finally here * swift format one other file --- Package.swift | 2 +- Sources/XMTPiOS/ApiClient.swift | 1 + Sources/XMTPiOS/Client.swift | 418 ++++--- Sources/XMTPiOS/Conversations.swift | 1099 +++++++++++------ Sources/XMTPiOS/Dm.swift | 127 +- Sources/XMTPiOS/Group.swift | 336 +++-- Tests/XMTPTests/V3ClientTests.swift | 295 +++-- XMTP.podspec | 4 +- .../xcshareddata/swiftpm/Package.resolved | 4 +- 9 files changed, 1471 insertions(+), 815 deletions(-) diff --git a/Package.swift b/Package.swift index ffa43c0e..6b39be7d 100644 --- a/Package.swift +++ b/Package.swift @@ -25,7 +25,7 @@ let package = Package( .package(url: "https://github.com/1024jp/GzipSwift", from: "5.2.0"), .package(url: "https://github.com/bufbuild/connect-swift", exact: "0.12.0"), .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.0.0"), - .package(url: "https://github.com/xmtp/libxmtp-swift.git", exact: "0.5.10"), + .package(url: "https://github.com/xmtp/libxmtp-swift.git", exact: "0.6.0"), ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. diff --git a/Sources/XMTPiOS/ApiClient.swift b/Sources/XMTPiOS/ApiClient.swift index 6e06e8e0..bc58da9d 100644 --- a/Sources/XMTPiOS/ApiClient.swift +++ b/Sources/XMTPiOS/ApiClient.swift @@ -39,6 +39,7 @@ extension GenericErrorDescribing { let .GroupMutablePermissions(message), let .SignatureRequestError(message), let .Erc1271SignatureError(message), + let .FailedToConvertToU32(message), let .Verifier(message): return message } diff --git a/Sources/XMTPiOS/Client.swift b/Sources/XMTPiOS/Client.swift index 19ba7106..bbd9cd0a 100644 --- a/Sources/XMTPiOS/Client.swift +++ b/Sources/XMTPiOS/Client.swift @@ -48,7 +48,10 @@ public struct ClientOptions { /// /// Optional: Specify self-reported version e.g. XMTPInbox/v1.0.0. public var appVersion: String? - public init(env: XMTPEnvironment = .dev, isSecure: Bool = true, appVersion: String? = nil) { + public init( + env: XMTPEnvironment = .dev, isSecure: Bool = true, + appVersion: String? = nil + ) { self.env = env self.isSecure = isSecure self.appVersion = appVersion @@ -63,9 +66,9 @@ public struct ClientOptions { /// `preCreateIdentityCallback` will be called immediately before a Create Identity wallet signature is requested from the user. public var preCreateIdentityCallback: PreEventCallback? - - /// `preAuthenticateToInboxCallback` will be called immediately before an Auth Inbox signature is requested from the user - public var preAuthenticateToInboxCallback: PreEventCallback? + + /// `preAuthenticateToInboxCallback` will be called immediately before an Auth Inbox signature is requested from the user + public var preAuthenticateToInboxCallback: PreEventCallback? public var enableV3 = false public var dbEncryptionKey: Data? @@ -77,7 +80,7 @@ public struct ClientOptions { codecs: [any ContentCodec] = [], preEnableIdentityCallback: PreEventCallback? = nil, preCreateIdentityCallback: PreEventCallback? = nil, - preAuthenticateToInboxCallback: PreEventCallback? = nil, + preAuthenticateToInboxCallback: PreEventCallback? = nil, enableV3: Bool = false, encryptionKey: Data? = nil, dbDirectory: String? = nil, @@ -91,14 +94,16 @@ public struct ClientOptions { self.enableV3 = enableV3 self.dbEncryptionKey = encryptionKey self.dbDirectory = dbDirectory - if (historySyncUrl == nil) { + if historySyncUrl == nil { switch api.env { case .production: - self.historySyncUrl = "https://message-history.production.ephemera.network/" + self.historySyncUrl = + "https://message-history.production.ephemera.network/" case .local: self.historySyncUrl = "http://0.0.0.0:5558" default: - self.historySyncUrl = "https://message-history.dev.ephemera.network/" + self.historySyncUrl = + "https://message-history.dev.ephemera.network/" } } else { self.historySyncUrl = historySyncUrl @@ -126,7 +131,6 @@ public final class Client { public let inboxID: String public var hasV2Client: Bool = true - /// Access ``Conversations`` for this Client. public lazy var conversations: Conversations = .init(client: self) @@ -134,7 +138,8 @@ public final class Client { public lazy var contacts: Contacts = .init(client: self) /// The XMTP environment which specifies which network this Client is connected to. - public lazy var environment: XMTPEnvironment = apiClient?.environment ?? .dev + public lazy var environment: XMTPEnvironment = + apiClient?.environment ?? .dev var codecRegistry = CodecRegistry() @@ -143,16 +148,20 @@ public final class Client { } /// Creates a client. - public static func create(account: SigningKey, options: ClientOptions? = nil) async throws -> Client { + public static func create( + account: SigningKey, options: ClientOptions? = nil + ) async throws -> Client { let options = options ?? ClientOptions() do { - let client = try await LibXMTP.createV2Client(host: options.api.env.url, isSecure: options.api.env.isSecure) + let client = try await LibXMTP.createV2Client( + host: options.api.env.url, isSecure: options.api.env.isSecure) let apiClient = try GRPCApiClient( environment: options.api.env, secure: options.api.isSecure, rustClient: client ) - return try await create(account: account, apiClient: apiClient, options: options) + return try await create( + account: account, apiClient: apiClient, options: options) } catch { let detailedErrorMessage: String if let nsError = error as NSError? { @@ -163,7 +172,7 @@ public final class Client { throw ClientError.creationError(detailedErrorMessage) } } - + static func initializeClient( accountAddress: String, options: ClientOptions, @@ -199,9 +208,12 @@ public final class Client { return client } - public static func createV3(account: SigningKey, options: ClientOptions) async throws -> Client { + public static func createV3(account: SigningKey, options: ClientOptions) + async throws -> Client + { let accountAddress = account.address.lowercased() - let inboxId = try await getOrCreateInboxId(options: options, address: accountAddress) + let inboxId = try await getOrCreateInboxId( + options: options, address: accountAddress) return try await initializeClient( accountAddress: accountAddress, @@ -210,10 +222,13 @@ public final class Client { inboxId: inboxId ) } - - public static func buildV3(address: String, options: ClientOptions) async throws -> Client { + + public static func buildV3(address: String, options: ClientOptions) + async throws -> Client + { let accountAddress = address.lowercased() - let inboxId = try await getOrCreateInboxId(options: options, address: accountAddress) + let inboxId = try await getOrCreateInboxId( + options: options, address: accountAddress) return try await initializeClient( accountAddress: accountAddress, @@ -232,18 +247,22 @@ public final class Client { ) async throws -> (FfiXmtpClient?, String) { if options?.enableV3 == true { let address = accountAddress.lowercased() - + let mlsDbDirectory = options?.dbDirectory var directoryURL: URL if let mlsDbDirectory = mlsDbDirectory { let fileManager = FileManager.default - directoryURL = URL(fileURLWithPath: mlsDbDirectory, isDirectory: true) + directoryURL = URL( + fileURLWithPath: mlsDbDirectory, isDirectory: true) // Check if the directory exists, if not, create it if !fileManager.fileExists(atPath: directoryURL.path) { do { - try fileManager.createDirectory(at: directoryURL, withIntermediateDirectories: true, attributes: nil) + try fileManager.createDirectory( + at: directoryURL, withIntermediateDirectories: true, + attributes: nil) } catch { - throw ClientError.creationError("Failed db directory \(mlsDbDirectory)") + throw ClientError.creationError( + "Failed db directory \(mlsDbDirectory)") } } } else { @@ -252,10 +271,12 @@ public final class Client { let alias = "xmtp-\(options?.api.env.rawValue ?? "")-\(inboxId).db3" let dbURL = directoryURL.appendingPathComponent(alias).path - + let encryptionKey = options?.dbEncryptionKey - if (encryptionKey == nil) { - throw ClientError.creationError("No encryption key passed for the database. Please store and provide a secure encryption key.") + if encryptionKey == nil { + throw ClientError.creationError( + "No encryption key passed for the database. Please store and provide a secure encryption key." + ) } let v3Client = try await LibXMTP.createClient( @@ -267,34 +288,48 @@ public final class Client { inboxId: inboxId, accountAddress: address, nonce: 0, - legacySignedPrivateKeyProto: try privateKeyBundleV1?.toV2().identityKey.serializedData(), + legacySignedPrivateKeyProto: try privateKeyBundleV1?.toV2() + .identityKey.serializedData(), historySyncUrl: options?.historySyncUrl ) - - try await options?.preAuthenticateToInboxCallback?() + + try await options?.preAuthenticateToInboxCallback?() if let signatureRequest = v3Client.signatureRequest() { if let signingKey = signingKey { do { if signingKey.type == WalletType.SCW { guard let chainId = signingKey.chainId else { - throw ClientError.creationError("Chain id must be present to sign Smart Contract Wallet") + throw ClientError.creationError( + "Chain id must be present to sign Smart Contract Wallet" + ) } - let signedData = try await signingKey.signSCW(message: signatureRequest.signatureText()) - try await signatureRequest.addScwSignature(signatureBytes: signedData, - address: signingKey.address, - chainId: UInt64(chainId), - blockNumber: signingKey.blockNumber.flatMap { $0 >= 0 ? UInt64($0) : nil }) + let signedData = try await signingKey.signSCW( + message: signatureRequest.signatureText()) + try await signatureRequest.addScwSignature( + signatureBytes: signedData, + address: signingKey.address, + chainId: UInt64(chainId), + blockNumber: signingKey.blockNumber.flatMap { + $0 >= 0 ? UInt64($0) : nil + }) } else { - let signedData = try await signingKey.sign(message: signatureRequest.signatureText()) - try await signatureRequest.addEcdsaSignature(signatureBytes: signedData.rawData) + let signedData = try await signingKey.sign( + message: signatureRequest.signatureText()) + try await signatureRequest.addEcdsaSignature( + signatureBytes: signedData.rawData) } - try await v3Client.registerIdentity(signatureRequest: signatureRequest) + try await v3Client.registerIdentity( + signatureRequest: signatureRequest) } catch { - throw ClientError.creationError("Failed to sign the message: \(error.localizedDescription)") + throw ClientError.creationError( + "Failed to sign the message: \(error.localizedDescription)" + ) } } else { - throw ClientError.creationError("No v3 keys found, you must pass a SigningKey in order to enable alpha MLS features") + throw ClientError.creationError( + "No v3 keys found, you must pass a SigningKey in order to enable alpha MLS features" + ) } } @@ -306,9 +341,13 @@ public final class Client { } } - static func create(account: SigningKey, apiClient: ApiClient, options: ClientOptions? = nil) async throws -> Client { - let privateKeyBundleV1 = try await loadOrCreateKeys(for: account, apiClient: apiClient, options: options) - let inboxId = try await getOrCreateInboxId(options: options ?? ClientOptions(), address: account.address) + static func create( + account: SigningKey, apiClient: ApiClient, options: ClientOptions? = nil + ) async throws -> Client { + let privateKeyBundleV1 = try await loadOrCreateKeys( + for: account, apiClient: apiClient, options: options) + let inboxId = try await getOrCreateInboxId( + options: options ?? ClientOptions(), address: account.address) let (v3Client, dbPath) = try await initV3Client( accountAddress: account.address, @@ -318,7 +357,11 @@ public final class Client { inboxId: inboxId ) - let client = try Client(address: account.address, privateKeyBundleV1: privateKeyBundleV1, apiClient: apiClient, v3Client: v3Client, dbPath: dbPath, installationID: v3Client?.installationId().toHex ?? "", inboxID: v3Client?.inboxId() ?? inboxId) + let client = try Client( + address: account.address, privateKeyBundleV1: privateKeyBundleV1, + apiClient: apiClient, v3Client: v3Client, dbPath: dbPath, + installationID: v3Client?.installationId().toHex ?? "", + inboxID: v3Client?.inboxId() ?? inboxId) let conversations = client.conversations let contacts = client.contacts try await client.ensureUserContactPublished() @@ -330,8 +373,13 @@ public final class Client { return client } - static func loadOrCreateKeys(for account: SigningKey, apiClient: ApiClient, options: ClientOptions? = nil) async throws -> PrivateKeyBundleV1 { - if let keys = try await loadPrivateKeys(for: account, apiClient: apiClient, options: options) { + static func loadOrCreateKeys( + for account: SigningKey, apiClient: ApiClient, + options: ClientOptions? = nil + ) async throws -> PrivateKeyBundleV1 { + if let keys = try await loadPrivateKeys( + for: account, apiClient: apiClient, options: options) + { print("loading existing private keys.") #if DEBUG print("Loaded existing private keys.") @@ -341,31 +389,43 @@ public final class Client { #if DEBUG print("No existing keys found, creating new bundle.") #endif - let keys = try await PrivateKeyBundleV1.generate(wallet: account, options: options) + let keys = try await PrivateKeyBundleV1.generate( + wallet: account, options: options) let keyBundle = PrivateKeyBundle(v1: keys) - let encryptedKeys = try await keyBundle.encrypted(with: account, preEnableIdentityCallback: options?.preEnableIdentityCallback) - var authorizedIdentity = AuthorizedIdentity(privateKeyBundleV1: keys) + let encryptedKeys = try await keyBundle.encrypted( + with: account, + preEnableIdentityCallback: options?.preEnableIdentityCallback) + var authorizedIdentity = AuthorizedIdentity( + privateKeyBundleV1: keys) authorizedIdentity.address = account.address let authToken = try await authorizedIdentity.createAuthToken() let apiClient = apiClient apiClient.setAuthToken(authToken) _ = try await apiClient.publish(envelopes: [ - Envelope(topic: .userPrivateStoreKeyBundle(account.address), timestamp: Date(), message: encryptedKeys.serializedData()), + Envelope( + topic: .userPrivateStoreKeyBundle(account.address), + timestamp: Date(), message: encryptedKeys.serializedData()) ]) return keys } } - static func loadPrivateKeys(for account: SigningKey, apiClient: ApiClient, options: ClientOptions? = nil) async throws -> PrivateKeyBundleV1? { + static func loadPrivateKeys( + for account: SigningKey, apiClient: ApiClient, + options: ClientOptions? = nil + ) async throws -> PrivateKeyBundleV1? { let res = try await apiClient.query( topic: .userPrivateStoreKeyBundle(account.address), pagination: nil ) for envelope in res.envelopes { - let encryptedBundle = try EncryptedPrivateKeyBundle(serializedData: envelope.message) - let bundle = try await encryptedBundle.decrypted(with: account, preEnableIdentityCallback: options?.preEnableIdentityCallback) + let encryptedBundle = try EncryptedPrivateKeyBundle( + serializedData: envelope.message) + let bundle = try await encryptedBundle.decrypted( + with: account, + preEnableIdentityCallback: options?.preEnableIdentityCallback) if case .v1 = bundle.version { return bundle.v1 } @@ -374,16 +434,19 @@ public final class Client { return nil } - - public static func getOrCreateInboxId(options: ClientOptions, address: String) async throws -> String { + + public static func getOrCreateInboxId( + options: ClientOptions, address: String + ) async throws -> String { var inboxId: String do { - inboxId = try await getInboxIdForAddress( - logger: XMTPLogger(), - host: options.api.env.url, - isSecure: options.api.env.isSecure == true, - accountAddress: address - ) ?? generateInboxId(accountAddress: address, nonce: 0) + inboxId = + try await getInboxIdForAddress( + logger: XMTPLogger(), + host: options.api.env.url, + isSecure: options.api.env.isSecure == true, + accountAddress: address + ) ?? generateInboxId(accountAddress: address, nonce: 0) } catch { inboxId = generateInboxId(accountAddress: address, nonce: 0) } @@ -394,11 +457,13 @@ public final class Client { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } - let canMessage = try await client.canMessage(accountAddresses: [address]) + let canMessage = try await client.canMessage(accountAddresses: [address] + ) return canMessage[address.lowercased()] ?? false } - public func canMessageV3(addresses: [String]) async throws -> [String: Bool] { + public func canMessageV3(addresses: [String]) async throws -> [String: Bool] + { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } @@ -406,7 +471,9 @@ public final class Client { return try await client.canMessage(accountAddresses: addresses) } - public static func from(bundle: PrivateKeyBundle, options: ClientOptions? = nil) async throws -> Client { + public static func from( + bundle: PrivateKeyBundle, options: ClientOptions? = nil + ) async throws -> Client { return try await from(v1Bundle: bundle.v1, options: options) } @@ -416,27 +483,34 @@ public final class Client { options: ClientOptions? = nil, signingKey: SigningKey? = nil ) async throws -> Client { - let address = try v1Bundle.identityKey.publicKey.recoverWalletSignerPublicKey().walletAddress + let address = try v1Bundle.identityKey.publicKey + .recoverWalletSignerPublicKey().walletAddress let options = options ?? ClientOptions() - - let inboxId = try await getOrCreateInboxId(options: options, address: address) + + let inboxId = try await getOrCreateInboxId( + options: options, address: address) let (v3Client, dbPath) = try await initV3Client( accountAddress: address, options: options, privateKeyBundleV1: v1Bundle, - signingKey: nil, + signingKey: signingKey, inboxId: inboxId ) - let client = try await LibXMTP.createV2Client(host: options.api.env.url, isSecure: options.api.env.isSecure) + let client = try await LibXMTP.createV2Client( + host: options.api.env.url, isSecure: options.api.env.isSecure) let apiClient = try GRPCApiClient( environment: options.api.env, secure: options.api.isSecure, rustClient: client ) - let result = try Client(address: address, privateKeyBundleV1: v1Bundle, apiClient: apiClient, v3Client: v3Client, dbPath: dbPath, installationID: v3Client?.installationId().toHex ?? "", inboxID: v3Client?.inboxId() ?? inboxId) + let result = try Client( + address: address, privateKeyBundleV1: v1Bundle, + apiClient: apiClient, v3Client: v3Client, dbPath: dbPath, + installationID: v3Client?.installationId().toHex ?? "", + inboxID: v3Client?.inboxId() ?? inboxId) let conversations = result.conversations let contacts = result.contacts for codec in options.codecs { @@ -446,7 +520,11 @@ public final class Client { return result } - init(address: String, privateKeyBundleV1: PrivateKeyBundleV1, apiClient: ApiClient, v3Client: LibXMTP.FfiXmtpClient?, dbPath: String = "", installationID: String, inboxID: String) throws { + init( + address: String, privateKeyBundleV1: PrivateKeyBundleV1, + apiClient: ApiClient, v3Client: LibXMTP.FfiXmtpClient?, + dbPath: String = "", installationID: String, inboxID: String + ) throws { self.address = address self.privateKeyBundleV1 = privateKeyBundleV1 self.apiClient = apiClient @@ -457,8 +535,11 @@ public final class Client { self.hasV2Client = true self.environment = apiClient.environment } - - init(address: String, v3Client: LibXMTP.FfiXmtpClient, dbPath: String, installationID: String, inboxID: String, environment: XMTPEnvironment) throws { + + init( + address: String, v3Client: LibXMTP.FfiXmtpClient, dbPath: String, + installationID: String, inboxID: String, environment: XMTPEnvironment + ) throws { self.address = address self.v3Client = v3Client self.dbPath = dbPath @@ -499,26 +580,34 @@ public final class Client { return try await query(topic: .contact(peerAddress)).envelopes.count > 0 } - public static func canMessage(_ peerAddress: String, options: ClientOptions? = nil) async throws -> Bool { + public static func canMessage( + _ peerAddress: String, options: ClientOptions? = nil + ) async throws -> Bool { let options = options ?? ClientOptions() - let client = try await LibXMTP.createV2Client(host: options.api.env.url, isSecure: options.api.env.isSecure) + let client = try await LibXMTP.createV2Client( + host: options.api.env.url, isSecure: options.api.env.isSecure) let apiClient = try GRPCApiClient( environment: options.api.env, secure: options.api.isSecure, rustClient: client ) - return try await apiClient.query(topic: Topic.contact(peerAddress)).envelopes.count > 0 + return try await apiClient.query(topic: Topic.contact(peerAddress)) + .envelopes.count > 0 } - public func importConversation(from conversationData: Data) throws -> Conversation? { + public func importConversation(from conversationData: Data) throws + -> Conversation? + { let jsonDecoder = JSONDecoder() do { - let v2Export = try jsonDecoder.decode(ConversationV2Export.self, from: conversationData) + let v2Export = try jsonDecoder.decode( + ConversationV2Export.self, from: conversationData) return try importV2Conversation(export: v2Export) } catch { do { - let v1Export = try jsonDecoder.decode(ConversationV1Export.self, from: conversationData) + let v1Export = try jsonDecoder.decode( + ConversationV1Export.self, from: conversationData) return try importV1Conversation(export: v1Export) } catch { throw ConversationImportError.invalidData @@ -526,35 +615,43 @@ public final class Client { } } - func importV2Conversation(export: ConversationV2Export) throws -> Conversation { - guard let keyMaterial = Data(base64Encoded: Data(export.keyMaterial.utf8)) else { + func importV2Conversation(export: ConversationV2Export) throws + -> Conversation + { + guard + let keyMaterial = Data(base64Encoded: Data(export.keyMaterial.utf8)) + else { throw ConversationImportError.invalidData } - var consentProof: ConsentProofPayload? = nil - if let exportConsentProof = export.consentProof { - var proof = ConsentProofPayload() - proof.signature = exportConsentProof.signature - proof.timestamp = exportConsentProof.timestamp - proof.payloadVersion = ConsentProofPayloadVersion.consentProofPayloadVersion1 - consentProof = proof - } - - return .v2(ConversationV2( - topic: export.topic, - keyMaterial: keyMaterial, - context: InvitationV1.Context( - conversationID: export.context?.conversationId ?? "", - metadata: export.context?.metadata ?? [:] - ), - peerAddress: export.peerAddress, - client: self, - header: SealedInvitationHeaderV1(), - consentProof: consentProof - )) - } - - func importV1Conversation(export: ConversationV1Export) throws -> Conversation { + var consentProof: ConsentProofPayload? = nil + if let exportConsentProof = export.consentProof { + var proof = ConsentProofPayload() + proof.signature = exportConsentProof.signature + proof.timestamp = exportConsentProof.timestamp + proof.payloadVersion = + ConsentProofPayloadVersion.consentProofPayloadVersion1 + consentProof = proof + } + + return .v2( + ConversationV2( + topic: export.topic, + keyMaterial: keyMaterial, + context: InvitationV1.Context( + conversationID: export.context?.conversationId ?? "", + metadata: export.context?.metadata ?? [:] + ), + peerAddress: export.peerAddress, + client: self, + header: SealedInvitationHeaderV1(), + consentProof: consentProof + )) + } + + func importV1Conversation(export: ConversationV1Export) throws + -> Conversation + { let formatter = ISO8601DateFormatter() formatter.formatOptions.insert(.withFractionalSeconds) @@ -562,17 +659,18 @@ public final class Client { throw ConversationImportError.invalidData } - return .v1(ConversationV1( - client: self, - peerAddress: export.peerAddress, - sentAt: sentAt - )) + return .v1( + ConversationV1( + client: self, + peerAddress: export.peerAddress, + sentAt: sentAt + )) } func ensureUserContactPublished() async throws { if let contact = try await getUserContact(peerAddress: address), - case .v2 = contact.version, - try keys.getPublicKeyBundle().equals(contact.v2.keyBundle) + case .v2 = contact.version, + try keys.getPublicKeyBundle().equals(contact.v2.keyBundle) { return } @@ -589,7 +687,8 @@ public final class Client { var envelope = Envelope() envelope.contentTopic = Topic.contact(address).description - envelope.timestampNs = UInt64(Date().millisecondsSinceEpoch * 1_000_000) + envelope.timestampNs = UInt64( + Date().millisecondsSinceEpoch * 1_000_000) envelope.message = try contactBundle.serializedData() envelopes.append(envelope) @@ -608,7 +707,9 @@ public final class Client { _ = try await publish(envelopes: envelopes) } - public func query(topic: Topic, pagination: Pagination? = nil) async throws -> QueryResponse { + public func query(topic: Topic, pagination: Pagination? = nil) async throws + -> QueryResponse + { guard let client = apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } @@ -618,7 +719,9 @@ public final class Client { ) } - public func batchQuery(request: BatchQueryRequest) async throws -> BatchQueryResponse { + public func batchQuery(request: BatchQueryRequest) async throws + -> BatchQueryResponse + { guard let client = apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } @@ -629,7 +732,9 @@ public final class Client { guard let client = apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } - let authorized = try AuthorizedIdentity(address: address, authorized: v1keys.identityKey.publicKey, identity: v1keys.identityKey) + let authorized = try AuthorizedIdentity( + address: address, authorized: v1keys.identityKey.publicKey, + identity: v1keys.identityKey) let authToken = try await authorized.createAuthToken() client.setAuthToken(authToken) @@ -641,7 +746,9 @@ public final class Client { topics: [String], callback: FfiV2SubscriptionCallback ) async throws -> FfiV2Subscription { - return try await subscribe2(request: FfiV2SubscribeRequest(contentTopics: topics), callback: callback) + return try await subscribe2( + request: FfiV2SubscribeRequest(contentTopics: topics), + callback: callback) } public func subscribe2( @@ -659,15 +766,19 @@ public final class Client { let fm = FileManager.default try fm.removeItem(atPath: dbPath) } - - @available(*, deprecated, message: "This function is delicate and should be used with caution. App will error if database not properly reconnected. See: reconnectLocalDatabase()") + + @available( + *, deprecated, + message: + "This function is delicate and should be used with caution. App will error if database not properly reconnected. See: reconnectLocalDatabase()" + ) public func dropLocalDatabaseConnection() throws { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } try client.releaseDbConnection() } - + public func reconnectLocalDatabase() async throws { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") @@ -679,37 +790,41 @@ public final class Client { let peerAddress = EthereumAddress(peerAddress).toChecksumAddress() return try await contacts.find(peerAddress) } - + public func inboxIdFromAddress(address: String) async throws -> String? { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } return try await client.findInboxId(address: address.lowercased()) } - + public func findGroup(groupId: String) throws -> Group? { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } do { - return Group(ffiGroup: try client.conversation(conversationId: groupId.hexToData), client: self) + return Group( + ffiGroup: try client.conversation( + conversationId: groupId.hexToData), client: self) } catch { return nil } } - - public func findConversation(conversationId: String) throws -> Conversation? { + + public func findConversation(conversationId: String) throws -> Conversation? + { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } do { - let conversation = try client.conversation(conversationId: conversationId.hexToData) + let conversation = try client.conversation( + conversationId: conversationId.hexToData) return try conversation.toConversation(client: self) } catch { return nil } } - + public func findConversationByTopic(topic: String) throws -> Conversation? { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") @@ -718,9 +833,13 @@ public final class Client { let regexPattern = #"/xmtp/mls/1/g-(.*?)/proto"# if let regex = try? NSRegularExpression(pattern: regexPattern) { let range = NSRange(location: 0, length: topic.utf16.count) - if let match = regex.firstMatch(in: topic, options: [], range: range) { - let conversationId = (topic as NSString).substring(with: match.range(at: 1)) - let conversation = try client.conversation(conversationId: conversationId.hexToData) + if let match = regex.firstMatch( + in: topic, options: [], range: range) + { + let conversationId = (topic as NSString).substring( + with: match.range(at: 1)) + let conversation = try client.conversation( + conversationId: conversationId.hexToData) return try conversation.toConversation(client: self) } } @@ -729,12 +848,13 @@ public final class Client { } return nil } - + public func findDm(address: String) async throws -> Dm? { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } - guard let inboxId = try await inboxIdFromAddress(address: address) else { + guard let inboxId = try await inboxIdFromAddress(address: address) + else { throw ClientError.creationError("No inboxId present") } do { @@ -750,38 +870,48 @@ public final class Client { throw ClientError.noV3Client("Error no V3 client initialized") } do { - return MessageV3(client: self, ffiMessage: try client.message(messageId: messageId.hexToData)) + return MessageV3( + client: self, + ffiMessage: try client.message(messageId: messageId.hexToData)) } catch { return nil } } - + public func requestMessageHistorySync() async throws { guard let client = v3Client else { throw ClientError.noV3Client("Error no V3 client initialized") } try await client.requestHistorySync() } - - public func revokeAllOtherInstallations(signingKey: SigningKey) async throws { + + public func revokeAllOtherInstallations(signingKey: SigningKey) async throws + { guard let client = v3Client else { throw ClientError.noV3Client("Error: No V3 client initialized") } - + let signatureRequest = try await client.revokeAllOtherInstallations() do { - let signedData = try await signingKey.sign(message: signatureRequest.signatureText()) - try await signatureRequest.addEcdsaSignature(signatureBytes: signedData.rawData) - try await client.applySignatureRequest(signatureRequest: signatureRequest) + let signedData = try await signingKey.sign( + message: signatureRequest.signatureText()) + try await signatureRequest.addEcdsaSignature( + signatureBytes: signedData.rawData) + try await client.applySignatureRequest( + signatureRequest: signatureRequest) } catch { - throw ClientError.creationError("Failed to sign the message: \(error.localizedDescription)") + throw ClientError.creationError( + "Failed to sign the message: \(error.localizedDescription)") } } - - public func inboxState(refreshFromNetwork: Bool) async throws -> InboxState { + + public func inboxState(refreshFromNetwork: Bool) async throws -> InboxState + { guard let client = v3Client else { throw ClientError.noV3Client("Error: No V3 client initialized") } - return InboxState(ffiInboxState: try await client.inboxState(refreshFromNetwork: refreshFromNetwork)) + return InboxState( + ffiInboxState: try await client.inboxState( + refreshFromNetwork: refreshFromNetwork)) } } diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index fb0e399b..1691051c 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -2,48 +2,62 @@ import Foundation import LibXMTP public enum ConversationError: Error, CustomStringConvertible, LocalizedError { - case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String), v2NotSupported(String), v3NotSupported(String) + case recipientNotOnNetwork, recipientIsSender + case v1NotSupported(String) + case v2NotSupported(String) + case v3NotSupported(String) public var description: String { switch self { case .recipientIsSender: - return "ConversationError.recipientIsSender: Recipient cannot be sender" + return + "ConversationError.recipientIsSender: Recipient cannot be sender" case .recipientNotOnNetwork: - return "ConversationError.recipientNotOnNetwork: Recipient is not on network" + return + "ConversationError.recipientNotOnNetwork: Recipient is not on network" case .v1NotSupported(let str): - return "ConversationError.v1NotSupported: V1 does not support: \(str)" + return + "ConversationError.v1NotSupported: V1 does not support: \(str)" case .v2NotSupported(let str): - return "ConversationError.v2NotSupported: V2 does not support: \(str)" + return + "ConversationError.v2NotSupported: V2 does not support: \(str)" case .v3NotSupported(let str): - return "ConversationError.v3NotSupported: V3 does not support: \(str)" + return + "ConversationError.v3NotSupported: V3 does not support: \(str)" } } - + public var errorDescription: String? { return description } } public enum GroupError: Error, CustomStringConvertible, LocalizedError { - case alphaMLSNotEnabled, memberCannotBeSelf, memberNotRegistered([String]), groupsRequireMessagePassed, notSupportedByGroups, streamingFailure + case alphaMLSNotEnabled, memberCannotBeSelf + case memberNotRegistered([String]) + case groupsRequireMessagePassed, notSupportedByGroups, streamingFailure public var description: String { switch self { case .alphaMLSNotEnabled: return "GroupError.alphaMLSNotEnabled" case .memberCannotBeSelf: - return "GroupError.memberCannotBeSelf you cannot add yourself to a group" + return + "GroupError.memberCannotBeSelf you cannot add yourself to a group" case .memberNotRegistered(let array): - return "GroupError.memberNotRegistered members not registered: \(array.joined(separator: ", "))" + return + "GroupError.memberNotRegistered members not registered: \(array.joined(separator: ", "))" case .groupsRequireMessagePassed: - return "GroupError.groupsRequireMessagePassed you cannot call this method without passing a message instead of an envelope" + return + "GroupError.groupsRequireMessagePassed you cannot call this method without passing a message instead of an envelope" case .notSupportedByGroups: - return "GroupError.notSupportedByGroups this method is not supported by groups" + return + "GroupError.notSupportedByGroups this method is not supported by groups" case .streamingFailure: return "GroupError.streamingFailure a stream has failed" } } - + public var errorDescription: String? { return description } @@ -54,6 +68,10 @@ public enum ConversationOrder { } final class ConversationStreamCallback: FfiConversationCallback { + func onError(error: LibXMTP.FfiSubscribeError) { + print("Error ConversationStreamCallback \(error)") + } + let callback: (FfiConversation) -> Void init(callback: @escaping (FfiConversation) -> Void) { @@ -66,12 +84,16 @@ final class ConversationStreamCallback: FfiConversationCallback { } final class V2SubscriptionCallback: FfiV2SubscriptionCallback { + func onError(error: LibXMTP.GenericError) { + print("Error V2SubscriptionCallback \(error)") + } + let callback: (Envelope) -> Void init(callback: @escaping (Envelope) -> Void) { self.callback = callback } - + func onMessage(message: LibXMTP.FfiEnvelope) { self.callback(message.fromFFI) } @@ -94,15 +116,15 @@ class StreamManager { } actor FfiStreamActor { - private var ffiStream: FfiStreamCloser? + private var ffiStream: FfiStreamCloser? - func setFfiStream(_ stream: FfiStreamCloser?) { - ffiStream = stream - } + func setFfiStream(_ stream: FfiStreamCloser?) { + ffiStream = stream + } - func endStream() { - ffiStream?.end() - } + func endStream() { + ffiStream?.end() + } } /// Handles listing and creating Conversations. @@ -120,14 +142,14 @@ public actor Conversations { } try await v3Client.conversations().sync() } - - public func syncAllGroups() async throws -> UInt32 { + + public func syncAllGroups() async throws -> UInt32 { guard let v3Client = client.v3Client else { return 0 } return try await v3Client.conversations().syncAllConversations() } - + public func syncAllConversations() async throws -> UInt32 { guard let v3Client = client.v3Client else { return 0 @@ -135,67 +157,93 @@ public actor Conversations { return try await v3Client.conversations().syncAllConversations() } - public func groups(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil) async throws -> [Group] { + public func groups( + createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil + ) async throws -> [Group] { guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil) + var options = FfiListConversationsOptions( + createdAfterNs: nil, createdBeforeNs: nil, limit: nil, + consentState: nil) if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } if let createdBefore { - options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + options.createdBeforeNs = Int64( + createdBefore.millisecondsSinceEpoch) } if let limit { options.limit = Int64(limit) } - return try await v3Client.conversations().listGroups(opts: options).map { $0.groupFromFFI(client: client) } + return try await v3Client.conversations().listGroups(opts: options).map + { $0.groupFromFFI(client: client) } } - - public func dms(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil) async throws -> [Dm] { - if (client.hasV2Client) { - throw ConversationError.v2NotSupported("Only supported with V3 only clients use newConversation instead") + + public func dms( + createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil + ) async throws -> [Dm] { + if client.hasV2Client { + throw ConversationError.v2NotSupported( + "Only supported with V3 only clients use newConversation instead" + ) } guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil) + var options = FfiListConversationsOptions( + createdAfterNs: nil, createdBeforeNs: nil, limit: nil, + consentState: nil) if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } if let createdBefore { - options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + options.createdBeforeNs = Int64( + createdBefore.millisecondsSinceEpoch) } if let limit { options.limit = Int64(limit) } - return try await v3Client.conversations().listDms(opts: options).map { $0.dmFromFFI(client: client) } + return try await v3Client.conversations().listDms(opts: options).map { + $0.dmFromFFI(client: client) + } } - - public func listConversations(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil, order: ConversationOrder = .createdAt, consentState: ConsentState? = nil) async throws -> [Conversation] { - if (client.hasV2Client) { - throw ConversationError.v2NotSupported("Only supported with V3 only clients use list instead") + + public func listConversations( + createdAfter: Date? = nil, createdBefore: Date? = nil, + limit: Int? = nil, order: ConversationOrder = .createdAt, + consentState: ConsentState? = nil + ) async throws -> [Conversation] { + if client.hasV2Client { + throw ConversationError.v2NotSupported( + "Only supported with V3 only clients use list instead") } // Todo: add ability to order and consent state guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil) + var options = FfiListConversationsOptions( + createdAfterNs: nil, createdBeforeNs: nil, limit: nil, + consentState: consentState?.toFFI) if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } if let createdBefore { - options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + options.createdBeforeNs = Int64( + createdBefore.millisecondsSinceEpoch) } if let limit { options.limit = Int64(limit) } - let ffiConversations = try await v3Client.conversations().list(opts: options) + let ffiConversations = try await v3Client.conversations().list( + opts: options) - let filteredConversations = try filterByConsentState(ffiConversations, consentState: consentState) - let sortedConversations = try sortConversations(filteredConversations, order: order) + let sortedConversations = try sortConversations( + ffiConversations, order: order) - return try sortedConversations.map { try $0.toConversation(client: client) } + return try sortedConversations.map { + try $0.toConversation(client: client) + } } private func sortConversations( @@ -204,18 +252,19 @@ public actor Conversations { ) throws -> [FfiConversation] { switch order { case .lastMessage: - let conversationWithTimestamp: [(FfiConversation, Int64?)] = try conversations.map { conversation in - let message = try conversation.findMessages( - opts: FfiListMessagesOptions( - sentBeforeNs: nil, - sentAfterNs: nil, - limit: 1, - deliveryStatus: nil, - direction: .descending - ) - ).first - return (conversation, message?.sentAtNs) - } + let conversationWithTimestamp: [(FfiConversation, Int64?)] = + try conversations.map { conversation in + let message = try conversation.findMessages( + opts: FfiListMessagesOptions( + sentBeforeNs: nil, + sentAfterNs: nil, + limit: 1, + deliveryStatus: nil, + direction: .descending + ) + ).first + return (conversation, message?.sentAtNs) + } let sortedTuples = conversationWithTimestamp.sorted { (lhs, rhs) in (lhs.1 ?? 0) > (rhs.1 ?? 0) @@ -226,99 +275,111 @@ public actor Conversations { } } - private func filterByConsentState( - _ conversations: [FfiConversation], - consentState: ConsentState? - ) throws -> [FfiConversation] { - guard let state = consentState else { return conversations } - return try conversations.filter { try $0.consentState() == state.toFFI } - } - - public func streamGroups() async throws -> AsyncThrowingStream { + public func streamGroups() async throws -> AsyncThrowingStream + { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() - let task = Task { - let groupCallback = ConversationStreamCallback() { group in + let ffiStreamActor = FfiStreamActor() + let task = Task { + let groupCallback = ConversationStreamCallback { group in guard !Task.isCancelled else { continuation.finish() return } continuation.yield(group.groupFromFFI(client: self.client)) } - guard let stream = await self.client.v3Client?.conversations().streamGroups(callback: groupCallback) else { + guard + let stream = await self.client.v3Client?.conversations() + .streamGroups(callback: groupCallback) + else { continuation.finish(throwing: GroupError.streamingFailure) return } - await ffiStreamActor.setFfiStream(stream) + await ffiStreamActor.setFfiStream(stream) continuation.onTermination = { @Sendable reason in - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } continuation.onTermination = { @Sendable reason in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - private func streamGroupConversations() -> AsyncThrowingStream { + private func streamGroupConversations() -> AsyncThrowingStream< + Conversation, Error + > { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() + let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamGroups( - callback: ConversationStreamCallback() { group in - guard !Task.isCancelled else { - continuation.finish() - return + let stream = await self.client.v3Client?.conversations() + .streamGroups( + callback: ConversationStreamCallback { group in + guard !Task.isCancelled else { + continuation.finish() + return + } + continuation.yield( + Conversation.group( + group.groupFromFFI(client: self.client))) } - continuation.yield(Conversation.group(group.groupFromFFI(client: self.client))) - } - ) - await ffiStreamActor.setFfiStream(stream) + ) + await ffiStreamActor.setFfiStream(stream) continuation.onTermination = { @Sendable reason in - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } continuation.onTermination = { @Sendable reason in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - - public func streamConversations() -> AsyncThrowingStream { + + public func streamConversations() -> AsyncThrowingStream< + Conversation, Error + > { AsyncThrowingStream { continuation in - if (client.hasV2Client) { - continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 only clients use stream instead")) + if client.hasV2Client { + continuation.finish( + throwing: ConversationError.v2NotSupported( + "Only supported with V3 only clients use stream instead" + )) return } let ffiStreamActor = FfiStreamActor() let task = Task { let stream = await self.client.v3Client?.conversations().stream( - callback: ConversationStreamCallback() { conversation in + callback: ConversationStreamCallback { conversation in guard !Task.isCancelled else { continuation.finish() return } do { - let conversationType = try conversation.groupMetadata().conversationType() + let conversationType = + try conversation.groupMetadata() + .conversationType() if conversationType == "dm" { continuation.yield( - Conversation.dm(conversation.dmFromFFI(client: self.client)) + Conversation.dm( + conversation.dmFromFFI( + client: self.client)) ) } else if conversationType == "group" { continuation.yield( - Conversation.group(conversation.groupFromFFI(client: self.client)) + Conversation.group( + conversation.groupFromFFI( + client: self.client)) ) } } catch { @@ -342,10 +403,12 @@ public actor Conversations { } } } - + public func findOrCreateDm(with peerAddress: String) async throws -> Dm { - if (client.hasV2Client) { - throw ConversationError.v2NotSupported("Only supported with V3 only clients use newConversation instead") + if client.hasV2Client { + throw ConversationError.v2NotSupported( + "Only supported with V3 only clients use newConversation instead" + ) } guard let v3Client = client.v3Client else { @@ -354,76 +417,87 @@ public actor Conversations { if peerAddress.lowercased() == client.address.lowercased() { throw ConversationError.recipientIsSender } - let canMessage = try await self.client.canMessageV3(address: peerAddress) - if !canMessage { + let canMessage = try await self.client.canMessageV3( + address: peerAddress) + if !canMessage { throw ConversationError.recipientNotOnNetwork } - + try await client.contacts.allow(addresses: [peerAddress]) if let existingDm = try await client.findDm(address: peerAddress) { return existingDm } - + let newDm = try await v3Client.conversations() .createDm(accountAddress: peerAddress.lowercased()) .dmFromFFI(client: client) - + try await client.contacts.allow(addresses: [peerAddress]) return newDm } - - public func newGroup(with addresses: [String], - permissions: GroupPermissionPreconfiguration = .allMembers, - name: String = "", - imageUrlSquare: String = "", - description: String = "", - pinnedFrameUrl: String = "" - ) async throws -> Group { - return try await newGroupInternal( - with: addresses, - permissions: GroupPermissionPreconfiguration.toFfiGroupPermissionOptions(option: permissions), - name: name, - imageUrlSquare: imageUrlSquare, - description: description, - pinnedFrameUrl: pinnedFrameUrl, - permissionPolicySet: nil - ) - } - - public func newGroupCustomPermissions(with addresses: [String], - permissionPolicySet: PermissionPolicySet, - name: String = "", - imageUrlSquare: String = "", - description: String = "", - pinnedFrameUrl: String = "" - ) async throws -> Group { - return try await newGroupInternal( - with: addresses, - permissions: FfiGroupPermissionsOptions.customPolicy, - name: name, - imageUrlSquare: imageUrlSquare, - description: description, - pinnedFrameUrl: pinnedFrameUrl, - permissionPolicySet: PermissionPolicySet.toFfiPermissionPolicySet(permissionPolicySet) - ) - } - - private func newGroupInternal(with addresses: [String], - permissions: FfiGroupPermissionsOptions = .allMembers, - name: String = "", - imageUrlSquare: String = "", - description: String = "", - pinnedFrameUrl: String = "", - permissionPolicySet: FfiPermissionPolicySet? = nil + + public func newGroup( + with addresses: [String], + permissions: GroupPermissionPreconfiguration = .allMembers, + name: String = "", + imageUrlSquare: String = "", + description: String = "", + pinnedFrameUrl: String = "" + ) async throws -> Group { + return try await newGroupInternal( + with: addresses, + permissions: + GroupPermissionPreconfiguration.toFfiGroupPermissionOptions( + option: permissions), + name: name, + imageUrlSquare: imageUrlSquare, + description: description, + pinnedFrameUrl: pinnedFrameUrl, + permissionPolicySet: nil + ) + } + + public func newGroupCustomPermissions( + with addresses: [String], + permissionPolicySet: PermissionPolicySet, + name: String = "", + imageUrlSquare: String = "", + description: String = "", + pinnedFrameUrl: String = "" + ) async throws -> Group { + return try await newGroupInternal( + with: addresses, + permissions: FfiGroupPermissionsOptions.customPolicy, + name: name, + imageUrlSquare: imageUrlSquare, + description: description, + pinnedFrameUrl: pinnedFrameUrl, + permissionPolicySet: PermissionPolicySet.toFfiPermissionPolicySet( + permissionPolicySet) + ) + } + + private func newGroupInternal( + with addresses: [String], + permissions: FfiGroupPermissionsOptions = .allMembers, + name: String = "", + imageUrlSquare: String = "", + description: String = "", + pinnedFrameUrl: String = "", + permissionPolicySet: FfiPermissionPolicySet? = nil ) async throws -> Group { guard let v3Client = client.v3Client else { throw GroupError.alphaMLSNotEnabled } - if addresses.first(where: { $0.lowercased() == client.address.lowercased() }) != nil { + if addresses.first(where: { + $0.lowercased() == client.address.lowercased() + }) != nil { throw GroupError.memberCannotBeSelf } - let erroredAddresses = try await withThrowingTaskGroup(of: (String?).self) { group in + let erroredAddresses = try await withThrowingTaskGroup( + of: (String?).self + ) { group in for address in addresses { group.addTask { if try await self.client.canMessageV3(address: address) { @@ -444,42 +518,55 @@ public actor Conversations { if !erroredAddresses.isEmpty { throw GroupError.memberNotRegistered(erroredAddresses) } - let group = try await v3Client.conversations().createGroup(accountAddresses: addresses, - opts: FfiCreateGroupOptions(permissions: permissions, - groupName: name, - groupImageUrlSquare: imageUrlSquare, - groupDescription: description, - groupPinnedFrameUrl: pinnedFrameUrl, - customPermissionPolicySet: permissionPolicySet - )).groupFromFFI(client: client) + let group = try await v3Client.conversations().createGroup( + accountAddresses: addresses, + opts: FfiCreateGroupOptions( + permissions: permissions, + groupName: name, + groupImageUrlSquare: imageUrlSquare, + groupDescription: description, + groupPinnedFrameUrl: pinnedFrameUrl, + customPermissionPolicySet: permissionPolicySet + ) + ).groupFromFFI(client: client) try await client.contacts.allowGroups(groupIds: [group.id]) return group } - - public func streamAllConversationMessages() -> AsyncThrowingStream { + + public func streamAllConversationMessages() -> AsyncThrowingStream< + DecodedMessage, Error + > { AsyncThrowingStream { continuation in - if (client.hasV2Client) { - continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 clients. Use streamAllMessages instead.")) + if client.hasV2Client { + continuation.finish( + throwing: ConversationError.v2NotSupported( + "Only supported with V3 clients. Use streamAllMessages instead." + )) return } let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation + let stream = await self.client.v3Client?.conversations() + .streamAllMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) + } catch { + print("Error onMessage \(error)") } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) - } catch { - print("Error onMessage \(error)") } - } - ) + ) await ffiStreamActor.setFfiStream(stream) } @@ -491,31 +578,41 @@ public actor Conversations { } } } - - public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream { + + public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in - if (client.hasV2Client) { - continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 clients. Use streamAllMessages instead.")) + if client.hasV2Client { + continuation.finish( + throwing: ConversationError.v2NotSupported( + "Only supported with V3 clients. Use streamAllMessages instead." + )) return } let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation + let stream = await self.client.v3Client?.conversations() + .streamAllMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) + } catch { + print("Error onMessage \(error)") } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) - } catch { - print("Error onMessage \(error)") } - } - ) + ) await ffiStreamActor.setFfiStream(stream) } @@ -528,42 +625,52 @@ public actor Conversations { } } - - public func streamAllGroupMessages() -> AsyncThrowingStream { + public func streamAllGroupMessages() -> AsyncThrowingStream< + DecodedMessage, Error + > { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() + let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllGroupMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation - } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) - } catch { - print("Error onMessage \(error)") + let stream = await self.client.v3Client?.conversations() + .streamAllGroupMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) + } catch { + print("Error onMessage \(error)") + } } - } - ) - await ffiStreamActor.setFfiStream(stream) + ) + await ffiStreamActor.setFfiStream(stream) } continuation.onTermination = { _ in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - public func streamAllMessages(includeGroups: Bool = false) -> AsyncThrowingStream { + public func streamAllMessages(includeGroups: Bool = false) + -> AsyncThrowingStream + { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged( + stream: AsyncThrowingStream + ) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { @@ -580,55 +687,69 @@ public actor Conversations { } let task = Task { - await forwardStreamToMerged(stream: streamAllV2Messages()) + await forwardStreamToMerged(stream: streamAllV2Messages()) } - - let groupTask = includeGroups ? Task { - await forwardStreamToMerged(stream: streamAllGroupMessages()) - } : nil + + let groupTask = + includeGroups + ? Task { + await forwardStreamToMerged( + stream: streamAllGroupMessages()) + } : nil continuation.onTermination = { _ in task.cancel() - groupTask?.cancel() + groupTask?.cancel() } } } - public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream { + public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() + let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllGroupMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation - } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) - } catch { - print("Error onMessage \(error)") + let stream = await self.client.v3Client?.conversations() + .streamAllGroupMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) + } catch { + print("Error onMessage \(error)") + } } - } - ) - await ffiStreamActor.setFfiStream(stream) + ) + await ffiStreamActor.setFfiStream(stream) } continuation.onTermination = { _ in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream { + public func streamAllDecryptedMessages(includeGroups: Bool = false) + -> AsyncThrowingStream + { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged( + stream: AsyncThrowingStream + ) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { @@ -645,23 +766,30 @@ public actor Conversations { } let task = Task { - await forwardStreamToMerged(stream: streamAllV2DecryptedMessages()) + await forwardStreamToMerged( + stream: streamAllV2DecryptedMessages()) } - - let groupTask = includeGroups ? Task { - await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages()) - } : nil + + let groupTask = + includeGroups + ? Task { + await forwardStreamToMerged( + stream: streamAllGroupDecryptedMessages()) + } : nil continuation.onTermination = { _ in - task.cancel() - groupTask?.cancel() + task.cancel() + groupTask?.cancel() } } } - private func findExistingConversation(with peerAddress: String, conversationID: String?) throws -> Conversation? { - return try conversationsByTopic.first(where: { try $0.value.peerAddress == peerAddress && - (($0.value.conversationID ?? "") == (conversationID ?? "")) + private func findExistingConversation( + with peerAddress: String, conversationID: String? + ) throws -> Conversation? { + return try conversationsByTopic.first(where: { + try $0.value.peerAddress == peerAddress + && (($0.value.conversationID ?? "") == (conversationID ?? "")) })?.value } @@ -669,31 +797,42 @@ public actor Conversations { guard let v3Client = client.v3Client else { return nil } - let group = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) + let group = try await v3Client.conversations() + .processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) return Group(ffiGroup: group, client: client) } - - public func conversationFromWelcome(envelopeBytes: Data) async throws -> Conversation? { + + public func conversationFromWelcome(envelopeBytes: Data) async throws + -> Conversation? + { guard let v3Client = client.v3Client else { return nil } - let conversation = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) + let conversation = try await v3Client.conversations() + .processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) return try conversation.toConversation(client: client) } - public func newConversation(with peerAddress: String, context: InvitationV1.Context? = nil, consentProofPayload: ConsentProofPayload? = nil) async throws -> Conversation { + public func newConversation( + with peerAddress: String, context: InvitationV1.Context? = nil, + consentProofPayload: ConsentProofPayload? = nil + ) async throws -> Conversation { if peerAddress.lowercased() == client.address.lowercased() { throw ConversationError.recipientIsSender } print("\(client.address) starting conversation with \(peerAddress)") - if let existing = try findExistingConversation(with: peerAddress, conversationID: context?.conversationID) { + if let existing = try findExistingConversation( + with: peerAddress, conversationID: context?.conversationID) + { return existing } guard let contact = try await client.contacts.find(peerAddress) else { throw ConversationError.recipientNotOnNetwork } - _ = try await list() // cache old conversations and check again - if let existing = try findExistingConversation(with: peerAddress, conversationID: context?.conversationID) { + _ = try await list() // cache old conversations and check again + if let existing = try findExistingConversation( + with: peerAddress, conversationID: context?.conversationID) + { return existing } // We don't have an existing conversation, make a v2 one @@ -704,8 +843,11 @@ public actor Conversations { context: context, consentProofPayload: consentProofPayload ) - let sealedInvitation = try await sendInvitation(recipient: recipient, invitation: invitation, created: Date()) - let conversationV2 = try ConversationV2.create(client: client, invitation: invitation, header: sealedInvitation.v1.header) + let sealedInvitation = try await sendInvitation( + recipient: recipient, invitation: invitation, created: Date()) + let conversationV2 = try ConversationV2.create( + client: client, invitation: invitation, + header: sealedInvitation.v1.header) try await client.contacts.allow(addresses: [peerAddress]) let conversation: Conversation = .v2(conversationV2) Task { @@ -714,31 +856,49 @@ public actor Conversations { return conversation } - public func stream() async throws -> AsyncThrowingStream { + public func stream() async throws -> AsyncThrowingStream< + Conversation, Error + > { AsyncThrowingStream { continuation in Task { var streamedConversationTopics: Set = [] let subscriptionCallback = V2SubscriptionCallback { envelope in Task { - if envelope.contentTopic == Topic.userIntro(self.client.address).description { - let conversationV1 = try self.fromIntro(envelope: envelope) - if !streamedConversationTopics.contains(conversationV1.topic.description) { - streamedConversationTopics.insert(conversationV1.topic.description) + if envelope.contentTopic + == Topic.userIntro(self.client.address).description + { + let conversationV1 = try self.fromIntro( + envelope: envelope) + if !streamedConversationTopics.contains( + conversationV1.topic.description) + { + streamedConversationTopics.insert( + conversationV1.topic.description) continuation.yield(conversationV1) } } - if envelope.contentTopic == Topic.userInvite(self.client.address).description { - let conversationV2 = try self.fromInvite(envelope: envelope) - if !streamedConversationTopics.contains(conversationV2.topic) { - streamedConversationTopics.insert(conversationV2.topic) + if envelope.contentTopic + == Topic.userInvite(self.client.address).description + { + let conversationV2 = try self.fromInvite( + envelope: envelope) + if !streamedConversationTopics.contains( + conversationV2.topic) + { + streamedConversationTopics.insert( + conversationV2.topic) continuation.yield(conversationV2) } } } } - - let stream = try await client.subscribe(topics: [Topic.userIntro(client.address).description, Topic.userInvite(client.address).description], callback: subscriptionCallback) - + + let stream = try await client.subscribe( + topics: [ + Topic.userIntro(client.address).description, + Topic.userInvite(client.address).description, + ], callback: subscriptionCallback) + continuation.onTermination = { @Sendable reason in Task { try await stream.end() @@ -750,7 +910,9 @@ public actor Conversations { public func streamAll() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged( + stream: AsyncThrowingStream + ) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { @@ -769,48 +931,63 @@ public actor Conversations { } } } - - private func validateConsentSignature(signature: String, clientAddress: String, peerAddress: String, timestamp: UInt64) -> Bool { + + private func validateConsentSignature( + signature: String, clientAddress: String, peerAddress: String, + timestamp: UInt64 + ) -> Bool { // timestamp should be in the past if timestamp > UInt64(Date().timeIntervalSince1970 * 1000) { return false } let thirtyDaysAgo = Date().addingTimeInterval(-30 * 24 * 60 * 60) - let thirtyDaysAgoTimestamp = UInt64(thirtyDaysAgo.timeIntervalSince1970 * 1000) + let thirtyDaysAgoTimestamp = UInt64( + thirtyDaysAgo.timeIntervalSince1970 * 1000) if timestamp < thirtyDaysAgoTimestamp { return false } - let message = Signature.consentProofText(peerAddress: peerAddress, timestamp: timestamp) + let message = Signature.consentProofText( + peerAddress: peerAddress, timestamp: timestamp) guard let signatureData = Data(hex: signature) else { print("Invalid signature format") return false } do { let ethMessage = try Signature.ethHash(message) - let recoveredKey = try KeyUtilx.recoverPublicKey(message: ethMessage, signature: signatureData) - let address = KeyUtilx.generateAddress(from: recoveredKey).toChecksumAddress() + let recoveredKey = try KeyUtilx.recoverPublicKey( + message: ethMessage, signature: signatureData) + let address = KeyUtilx.generateAddress(from: recoveredKey) + .toChecksumAddress() return clientAddress == address } catch { return false } } - private func handleConsentProof(consentProof: ConsentProofPayload, peerAddress: String) async throws { + private func handleConsentProof( + consentProof: ConsentProofPayload, peerAddress: String + ) async throws { let signature = consentProof.signature - if (signature == "") { + if signature == "" { return } - if (!validateConsentSignature(signature: signature, clientAddress: client.address, peerAddress: peerAddress, timestamp: consentProof.timestamp)) { + if !validateConsentSignature( + signature: signature, clientAddress: client.address, + peerAddress: peerAddress, timestamp: consentProof.timestamp) + { return } let contacts = client.contacts _ = try await contacts.refreshConsentList() - if try await (contacts.consentList.state(address: peerAddress) == .unknown) { + if try await + (contacts.consentList.state(address: peerAddress) == .unknown) + { try await contacts.allow(addresses: [peerAddress]) } } - public func list(includeGroups: Bool = false) async throws -> [Conversation] { + public func list(includeGroups: Bool = false) async throws -> [Conversation] + { if includeGroups { try await sync() let groups = try await groups() @@ -822,27 +999,40 @@ public actor Conversations { let mostRecent = await self.getMostRecentConversation() let pagination = Pagination(after: mostRecent?.createdAt) do { - let seenPeers = try await listIntroductionPeers(pagination: pagination) + let seenPeers = try await listIntroductionPeers( + pagination: pagination) for (peerAddress, sentAt) in seenPeers { - let newConversation = Conversation.v1(ConversationV1(client: client, peerAddress: peerAddress, sentAt: sentAt)) + let newConversation = Conversation.v1( + ConversationV1( + client: client, peerAddress: peerAddress, sentAt: sentAt + )) newConversations.append(newConversation) } } catch { print("Error loading introduction peers: \(error)") } - for sealedInvitation in try await listInvitations(pagination: pagination) { + for sealedInvitation in try await listInvitations( + pagination: pagination) + { do { - let newConversation = Conversation.v2(try makeConversation(from: sealedInvitation)) + let newConversation = Conversation.v2( + try makeConversation(from: sealedInvitation)) newConversations.append(newConversation) - if let consentProof = newConversation.consentProof, consentProof.signature != "" { - try await self.handleConsentProof(consentProof: consentProof, peerAddress: newConversation.peerAddress) + if let consentProof = newConversation.consentProof, + consentProof.signature != "" + { + try await self.handleConsentProof( + consentProof: consentProof, + peerAddress: newConversation.peerAddress) } } catch { print("Error loading invitations: \(error)") } } for conversation in newConversations { - if try conversation.peerAddress != client.address && Topic.isValidTopic(topic: conversation.topic) { + if try conversation.peerAddress != client.address + && Topic.isValidTopic(topic: conversation.topic) + { await self.addConversation(conversation) } } @@ -865,54 +1055,78 @@ public actor Conversations { } } - public func getHmacKeys(request: Xmtp_KeystoreApi_V1_GetConversationHmacKeysRequest? = nil) -> Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse { - let thirtyDayPeriodsSinceEpoch = Int(Date().timeIntervalSince1970) / (60 * 60 * 24 * 30) - var hmacKeysResponse = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse() + public func getHmacKeys( + request: Xmtp_KeystoreApi_V1_GetConversationHmacKeysRequest? = nil + ) -> Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse { + let thirtyDayPeriodsSinceEpoch = + Int(Date().timeIntervalSince1970) / (60 * 60 * 24 * 30) + var hmacKeysResponse = + Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse() var topics = conversationsByTopic if let requestTopics = request?.topics, !requestTopics.isEmpty { topics = topics.filter { requestTopics.contains($0.key) } } for (topic, conversation) in topics { guard let keyMaterial = conversation.keyMaterial else { continue } - var hmacKeys = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeys() - for period in (thirtyDayPeriodsSinceEpoch - 1)...(thirtyDayPeriodsSinceEpoch + 1) { + var hmacKeys = + Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeys() + for period + in (thirtyDayPeriodsSinceEpoch - 1)...(thirtyDayPeriodsSinceEpoch + + 1) + { let info = "\(period)-\(client.address)" do { - let hmacKey = try Crypto.deriveKey(secret: keyMaterial, nonce: Data(), info: Data(info.utf8)) - var hmacKeyData = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeyData() + let hmacKey = try Crypto.deriveKey( + secret: keyMaterial, nonce: Data(), + info: Data(info.utf8)) + var hmacKeyData = + Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse + .HmacKeyData() hmacKeyData.hmacKey = hmacKey hmacKeyData.thirtyDayPeriodsSinceEpoch = Int32(period) hmacKeys.values.append(hmacKeyData) } catch { - print("Error calculating HMAC key for topic \(topic): \(error)") + print( + "Error calculating HMAC key for topic \(topic): \(error)" + ) } } hmacKeysResponse.hmacKeys[topic] = hmacKeys } return hmacKeysResponse } - + // ------- V1 V2 to be deprecated ------ - + /// Import a previously seen conversation. /// See Conversation.toTopicData() - public func importTopicData(data: Xmtp_KeystoreApi_V1_TopicMap.TopicData) throws -> Conversation { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("importTopicData only supported with V2 clients") + public func importTopicData(data: Xmtp_KeystoreApi_V1_TopicMap.TopicData) + throws -> Conversation + { + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "importTopicData only supported with V2 clients") } let conversation: Conversation if !data.hasInvitation { - let sentAt = Date(timeIntervalSince1970: TimeInterval(data.createdNs / 1_000_000_000)) - conversation = .v1(ConversationV1(client: client, peerAddress: data.peerAddress, sentAt: sentAt)) + let sentAt = Date( + timeIntervalSince1970: TimeInterval( + data.createdNs / 1_000_000_000)) + conversation = .v1( + ConversationV1( + client: client, peerAddress: data.peerAddress, + sentAt: sentAt)) } else { - conversation = .v2(ConversationV2( - topic: data.invitation.topic, - keyMaterial: data.invitation.aes256GcmHkdfSha256.keyMaterial, - context: data.invitation.context, - peerAddress: data.peerAddress, - client: client, - createdAtNs: data.createdNs - )) + conversation = .v2( + ConversationV2( + topic: data.invitation.topic, + keyMaterial: data.invitation.aes256GcmHkdfSha256 + .keyMaterial, + context: data.invitation.context, + peerAddress: data.peerAddress, + client: client, + createdAtNs: data.createdNs + )) } Task { await self.addConversation(conversation) @@ -920,9 +1134,13 @@ public actor Conversations { return conversation } - public func listBatchMessages(topics: [String: Pagination?]) async throws -> [DecodedMessage] { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("listBatchMessages only supported with V2 clients. Use listConversations order lastMessage") + public func listBatchMessages(topics: [String: Pagination?]) async throws + -> [DecodedMessage] + { + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "listBatchMessages only supported with V2 clients. Use listConversations order lastMessage" + ) } let requests = topics.map { topic, page in makeQueryRequest(topic: topic, pagination: page) @@ -930,7 +1148,8 @@ public actor Conversations { /// The maximum number of requests permitted in a single batch call. let maxQueryRequestsPerBatch = 50 let batches = requests.chunks(maxQueryRequestsPerBatch) - .map { requests in BatchQueryRequest.with { $0.requests = requests } } + .map { requests in BatchQueryRequest.with { $0.requests = requests } + } var messages: [DecodedMessage] = [] // TODO: consider using a task group here for parallel batch calls guard let apiClient = client.apiClient else { @@ -940,15 +1159,20 @@ public actor Conversations { messages += try await apiClient.batchQuery(request: batch) .responses.flatMap { res in res.envelopes.compactMap { envelope in - let conversation = conversationsByTopic[envelope.contentTopic] + let conversation = conversationsByTopic[ + envelope.contentTopic] if conversation == nil { - print("discarding message, unknown conversation \(envelope)") + print( + "discarding message, unknown conversation \(envelope)" + ) return nil } do { return try conversation?.decode(envelope) } catch { - print("discarding message, unable to decode \(envelope)") + print( + "discarding message, unable to decode \(envelope)" + ) return nil } } @@ -957,9 +1181,13 @@ public actor Conversations { return messages } - public func listBatchDecryptedMessages(topics: [String: Pagination?]) async throws -> [DecryptedMessage] { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("listBatchMessages only supported with V2 clients. Use listConversations order lastMessage") + public func listBatchDecryptedMessages(topics: [String: Pagination?]) + async throws -> [DecryptedMessage] + { + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "listBatchMessages only supported with V2 clients. Use listConversations order lastMessage" + ) } let requests = topics.map { topic, page in makeQueryRequest(topic: topic, pagination: page) @@ -967,7 +1195,8 @@ public actor Conversations { /// The maximum number of requests permitted in a single batch call. let maxQueryRequestsPerBatch = 50 let batches = requests.chunks(maxQueryRequestsPerBatch) - .map { requests in BatchQueryRequest.with { $0.requests = requests } } + .map { requests in BatchQueryRequest.with { $0.requests = requests } + } var messages: [DecryptedMessage] = [] // TODO: consider using a task group here for parallel batch calls guard let apiClient = client.apiClient else { @@ -977,15 +1206,20 @@ public actor Conversations { messages += try await apiClient.batchQuery(request: batch) .responses.flatMap { res in res.envelopes.compactMap { envelope in - let conversation = conversationsByTopic[envelope.contentTopic] + let conversation = conversationsByTopic[ + envelope.contentTopic] if conversation == nil { - print("discarding message, unknown conversation \(envelope)") + print( + "discarding message, unknown conversation \(envelope)" + ) return nil } do { return try conversation?.decrypt(envelope) } catch { - print("discarding message, unable to decode \(envelope)") + print( + "discarding message, unable to decode \(envelope)" + ) return nil } } @@ -994,51 +1228,72 @@ public actor Conversations { return messages } - private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 { - let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) + private func makeConversation(from sealedInvitation: SealedInvitation) + throws -> ConversationV2 + { + let unsealed = try sealedInvitation.v1.getInvitation( + viewer: client.keys) + return try ConversationV2.create( + client: client, invitation: unsealed, + header: sealedInvitation.v1.header) } - + func streamAllV2Messages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - if (!client.hasV2Client) { - continuation.finish(throwing: ConversationError.v3NotSupported("Only supported with V2 clients. Use streamAllConversationMessages instead.")) + if !client.hasV2Client { + continuation.finish( + throwing: ConversationError.v3NotSupported( + "Only supported with V2 clients. Use streamAllConversationMessages instead." + )) return } let streamManager = StreamManager() - + Task { var topics: [String] = [ Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description + Topic.userIntro(client.address).description, ] for conversation in try await list() { topics.append(conversation.topic) } - var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + var subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) let subscriptionCallback = V2SubscriptionCallback { envelope in Task { do { - if let conversation = self.conversationsByTopic[envelope.contentTopic] { + if let conversation = self.conversationsByTopic[ + envelope.contentTopic] + { let decoded = try conversation.decode(envelope) continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try self.fromInvite(envelope: envelope) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/invite-") + { + let conversation = try self.fromInvite( + envelope: envelope) await self.addConversation(conversation) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try self.fromIntro(envelope: envelope) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/intro-") + { + let conversation = try self.fromIntro( + envelope: envelope) await self.addConversation(conversation) let decoded = try conversation.decode(envelope) continuation.yield(decoded) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) } else { print("huh \(envelope)") } @@ -1047,9 +1302,11 @@ public actor Conversations { } } } - let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) + let newStream = try await client.subscribe2( + request: subscriptionRequest, callback: subscriptionCallback + ) streamManager.setStream(newStream) - + continuation.onTermination = { @Sendable reason in Task { try await streamManager.endStream() @@ -1058,46 +1315,66 @@ public actor Conversations { } } } - - func streamAllV2DecryptedMessages() -> AsyncThrowingStream { + + func streamAllV2DecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in let streamManager = StreamManager() - if (!client.hasV2Client) { - continuation.finish(throwing: ConversationError.v3NotSupported("Only supported with V2 clients. Use streamAllDecryptedConversationMessages instead.")) + if !client.hasV2Client { + continuation.finish( + throwing: ConversationError.v3NotSupported( + "Only supported with V2 clients. Use streamAllDecryptedConversationMessages instead." + )) return } Task { var topics: [String] = [ Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description + Topic.userIntro(client.address).description, ] for conversation in try await list() { topics.append(conversation.topic) } - var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + var subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) let subscriptionCallback = V2SubscriptionCallback { envelope in Task { do { - if let conversation = self.conversationsByTopic[envelope.contentTopic] { - let decrypted = try conversation.decrypt(envelope) + if let conversation = self.conversationsByTopic[ + envelope.contentTopic] + { + let decrypted = try conversation.decrypt( + envelope) continuation.yield(decrypted) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try self.fromInvite(envelope: envelope) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/invite-") + { + let conversation = try self.fromInvite( + envelope: envelope) await self.addConversation(conversation) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try self.fromIntro(envelope: envelope) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/intro-") + { + let conversation = try self.fromIntro( + envelope: envelope) await self.addConversation(conversation) - let decrypted = try conversation.decrypt(envelope) + let decrypted = try conversation.decrypt( + envelope) continuation.yield(decrypted) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) } else { print("huh \(envelope)") } @@ -1106,9 +1383,11 @@ public actor Conversations { } } } - let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) + let newStream = try await client.subscribe2( + request: subscriptionRequest, callback: subscriptionCallback + ) streamManager.setStream(newStream) - + continuation.onTermination = { @Sendable reason in Task { try await streamManager.endStream() @@ -1119,28 +1398,40 @@ public actor Conversations { } public func fromInvite(envelope: Envelope) throws -> Conversation { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("fromIntro only supported with V2 clients use fromWelcome instead") - } - let sealedInvitation = try SealedInvitation(serializedData: envelope.message) - let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try .v2(ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header)) + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "fromIntro only supported with V2 clients use fromWelcome instead" + ) + } + let sealedInvitation = try SealedInvitation( + serializedData: envelope.message) + let unsealed = try sealedInvitation.v1.getInvitation( + viewer: client.keys) + return try .v2( + ConversationV2.create( + client: client, invitation: unsealed, + header: sealedInvitation.v1.header)) } public func fromIntro(envelope: Envelope) throws -> Conversation { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("fromIntro only supported with V2 clients use fromWelcome instead") + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "fromIntro only supported with V2 clients use fromWelcome instead" + ) } let messageV1 = try MessageV1.fromBytes(envelope.message) let senderAddress = try messageV1.header.sender.walletAddress let recipientAddress = try messageV1.header.recipient.walletAddress - let peerAddress = client.address == senderAddress ? recipientAddress : senderAddress - let conversationV1 = ConversationV1(client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) + let peerAddress = + client.address == senderAddress ? recipientAddress : senderAddress + let conversationV1 = ConversationV1( + client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) return .v1(conversationV1) } - - private func listIntroductionPeers(pagination: Pagination?) async throws -> [String: Date] { + private func listIntroductionPeers(pagination: Pagination?) async throws + -> [String: Date] + { guard let apiClient = client.apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } @@ -1161,11 +1452,14 @@ public actor Conversations { var seenPeers: [String: Date] = [:] for message in messages { guard let recipientAddress = message.recipientAddress, - let senderAddress = message.senderAddress else { + let senderAddress = message.senderAddress + else { continue } let sentAt = message.sentAt - let peerAddress = recipientAddress == client.address ? senderAddress : recipientAddress + let peerAddress = + recipientAddress == client.address + ? senderAddress : recipientAddress guard let existing = seenPeers[peerAddress] else { seenPeers[peerAddress] = sentAt continue @@ -1177,7 +1471,9 @@ public actor Conversations { return seenPeers } - private func listInvitations(pagination: Pagination?) async throws -> [SealedInvitation] { + private func listInvitations(pagination: Pagination?) async throws + -> [SealedInvitation] + { guard let apiClient = client.apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } @@ -1192,7 +1488,10 @@ public actor Conversations { } } - func sendInvitation(recipient: SignedPublicKeyBundle, invitation: InvitationV1, created: Date) async throws -> SealedInvitation { + func sendInvitation( + recipient: SignedPublicKeyBundle, invitation: InvitationV1, + created: Date + ) async throws -> SealedInvitation { let sealed = try SealedInvitation.createV1( sender: client.keys, recipient: recipient, @@ -1201,8 +1500,12 @@ public actor Conversations { ) let peerAddress = try recipient.walletAddress try await client.publish(envelopes: [ - Envelope(topic: .userInvite(client.address), timestamp: created, message: sealed.serializedData()), - Envelope(topic: .userInvite(peerAddress), timestamp: created, message: sealed.serializedData()), + Envelope( + topic: .userInvite(client.address), timestamp: created, + message: sealed.serializedData()), + Envelope( + topic: .userInvite(peerAddress), timestamp: created, + message: sealed.serializedData()), ]) return sealed } diff --git a/Sources/XMTPiOS/Dm.swift b/Sources/XMTPiOS/Dm.swift index d788710f..37522e7e 100644 --- a/Sources/XMTPiOS/Dm.swift +++ b/Sources/XMTPiOS/Dm.swift @@ -16,7 +16,7 @@ public struct Dm: Identifiable, Equatable, Hashable { public var id: String { ffiConversation.id().toHex } - + public var topic: String { Topic.groupMessage(id).description } @@ -44,29 +44,23 @@ public struct Dm: Identifiable, Equatable, Hashable { public func creatorInboxId() throws -> String { return try metadata().creatorInboxId() } - + public func addedByInboxId() throws -> String { return try ffiConversation.addedByInboxId() } public var members: [Member] { get async throws { - return try await ffiConversation.listMembers().map { ffiGroupMember in + return try await ffiConversation.listMembers().map { + ffiGroupMember in Member(ffiGroupMember: ffiGroupMember) } } } public var peerInboxId: String { - get async throws { - var ids = try await members.map(\.inboxId) - if let index = ids.firstIndex(of: client.inboxID) { - ids.remove(at: index) - } - guard let inboxId = ids.first else { - throw ClientError.missingInboxId - } - return inboxId + get throws { + try ffiConversation.dmPeerInboxId() } } @@ -75,8 +69,8 @@ public struct Dm: Identifiable, Equatable, Hashable { } public func updateConsentState(state: ConsentState) async throws { - if (client.hasV2Client) { - switch (state) { + if client.hasV2Client { + switch state { case .allowed: try await client.contacts.allowGroups(groupIds: [id]) case .denied: try await client.contacts.denyGroups(groupIds: [id]) case .unknown: () @@ -86,33 +80,43 @@ public struct Dm: Identifiable, Equatable, Hashable { try ffiConversation.updateConsentState(state: state.toFFI) } - public func consentState() throws -> ConsentState{ + public func consentState() throws -> ConsentState { return try ffiConversation.consentState().fromFFI } - + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { - let message = try await ffiConversation.processStreamedConversationMessage(envelopeBytes: envelopeBytes) + let message = + try await ffiConversation.processStreamedConversationMessage( + envelopeBytes: envelopeBytes) return MessageV3(client: client, ffiMessage: message) } - public func send(content: T, options: SendOptions? = nil) async throws -> String { - let encodeContent = try await encodeContent(content: content, options: options) + public func send(content: T, options: SendOptions? = nil) async throws + -> String + { + let encodeContent = try await encodeContent( + content: content, options: options) return try await send(encodedContent: encodeContent) } public func send(encodedContent: EncodedContent) async throws -> String { - if (try consentState() == .unknown) { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - let messageId = try await ffiConversation.send(contentBytes: encodedContent.serializedData()) + let messageId = try await ffiConversation.send( + contentBytes: encodedContent.serializedData()) return messageId.toHex } - public func encodeContent(content: T, options: SendOptions?) async throws -> EncodedContent { + public func encodeContent(content: T, options: SendOptions?) async throws + -> EncodedContent + { let codec = client.codecRegistry.find(for: options?.contentType) - func encode(codec: Codec, content: Any) throws -> EncodedContent { + func encode(codec: Codec, content: Any) throws + -> EncodedContent + { if let content = content as? Codec.T { return try codec.encode(content: content, client: client) } else { @@ -122,7 +126,9 @@ public struct Dm: Identifiable, Equatable, Hashable { var encoded = try encode(codec: codec, content: content) - func fallback(codec: Codec, content: Any) throws -> String? { + func fallback(codec: Codec, content: Any) throws + -> String? + { if let content = content as? Codec.T { return try codec.fallback(content: content) } else { @@ -140,14 +146,19 @@ public struct Dm: Identifiable, Equatable, Hashable { return encoded } - - public func prepareMessage(content: T, options: SendOptions? = nil) async throws -> String { - if (try consentState() == .unknown) { + + public func prepareMessage(content: T, options: SendOptions? = nil) + async throws -> String + { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - - let encodeContent = try await encodeContent(content: content, options: options) - return try ffiConversation.sendOptimistic(contentBytes: try encodeContent.serializedData()).toHex + + let encodeContent = try await encodeContent( + content: content, options: options) + return try ffiConversation.sendOptimistic( + contentBytes: try encodeContent.serializedData() + ).toHex } public func publishMessages() async throws { @@ -162,20 +173,24 @@ public struct Dm: Identifiable, Equatable, Hashable { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiConversation.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -188,24 +203,30 @@ public struct Dm: Identifiable, Equatable, Hashable { } } - public func streamDecryptedMessages() -> AsyncThrowingStream { + public func streamDecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiConversation.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -234,11 +255,13 @@ public struct Dm: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { @@ -259,7 +282,7 @@ public struct Dm: Identifiable, Equatable, Hashable { }() options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -271,8 +294,10 @@ public struct Dm: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiConversation.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decodeOrNull() + return try ffiConversation.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decodeOrNull() } } @@ -292,17 +317,19 @@ public struct Dm: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { options.limit = Int64(limit) } - + let status: FfiDeliveryStatus? = { switch deliveryStatus { case .published: @@ -315,9 +342,9 @@ public struct Dm: Identifiable, Equatable, Hashable { return nil } }() - + options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -329,8 +356,10 @@ public struct Dm: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiConversation.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decryptOrNull() + return try ffiConversation.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decryptOrNull() } } } diff --git a/Sources/XMTPiOS/Group.swift b/Sources/XMTPiOS/Group.swift index 3116356a..cef154a6 100644 --- a/Sources/XMTPiOS/Group.swift +++ b/Sources/XMTPiOS/Group.swift @@ -9,6 +9,10 @@ import Foundation import LibXMTP final class MessageCallback: FfiMessageCallback { + func onError(error: LibXMTP.FfiSubscribeError) { + print("Error MessageCallback \(error)") + } + let client: Client let callback: (LibXMTP.FfiMessage) -> Void @@ -34,7 +38,7 @@ public struct Group: Identifiable, Equatable, Hashable { public var id: String { ffiGroup.id().toHex } - + public var topic: String { Topic.groupMessage(id).description } @@ -42,10 +46,10 @@ public struct Group: Identifiable, Equatable, Hashable { func metadata() throws -> FfiConversationMetadata { return try ffiGroup.groupMetadata() } - - func permissions() throws -> FfiGroupPermissions { - return try ffiGroup.groupPermissions() - } + + func permissions() throws -> FfiGroupPermissions { + return try ffiGroup.groupPermissions() + } public func sync() async throws { try await ffiGroup.sync() @@ -66,47 +70,48 @@ public struct Group: Identifiable, Equatable, Hashable { public func isCreator() throws -> Bool { return try metadata().creatorInboxId() == client.inboxID } - - public func isAdmin(inboxId: String) throws -> Bool { - return try ffiGroup.isAdmin(inboxId: inboxId) - } - - public func isSuperAdmin(inboxId: String) throws -> Bool { - return try ffiGroup.isSuperAdmin(inboxId: inboxId) - } - - public func addAdmin(inboxId: String) async throws { - try await ffiGroup.addAdmin(inboxId: inboxId) - } - - public func removeAdmin(inboxId: String) async throws { - try await ffiGroup.removeAdmin(inboxId: inboxId) - } - - public func addSuperAdmin(inboxId: String) async throws { - try await ffiGroup.addSuperAdmin(inboxId: inboxId) - } - - public func removeSuperAdmin(inboxId: String) async throws { - try await ffiGroup.removeSuperAdmin(inboxId: inboxId) - } - - public func listAdmins() throws -> [String] { - try ffiGroup.adminList() - } - - public func listSuperAdmins() throws -> [String] { - try ffiGroup.superAdminList() - } + + public func isAdmin(inboxId: String) throws -> Bool { + return try ffiGroup.isAdmin(inboxId: inboxId) + } + + public func isSuperAdmin(inboxId: String) throws -> Bool { + return try ffiGroup.isSuperAdmin(inboxId: inboxId) + } + + public func addAdmin(inboxId: String) async throws { + try await ffiGroup.addAdmin(inboxId: inboxId) + } + + public func removeAdmin(inboxId: String) async throws { + try await ffiGroup.removeAdmin(inboxId: inboxId) + } + + public func addSuperAdmin(inboxId: String) async throws { + try await ffiGroup.addSuperAdmin(inboxId: inboxId) + } + + public func removeSuperAdmin(inboxId: String) async throws { + try await ffiGroup.removeSuperAdmin(inboxId: inboxId) + } + + public func listAdmins() throws -> [String] { + try ffiGroup.adminList() + } + + public func listSuperAdmins() throws -> [String] { + try ffiGroup.superAdminList() + } public func permissionPolicySet() throws -> PermissionPolicySet { - return PermissionPolicySet.fromFfiPermissionPolicySet(try permissions().policySet()) + return PermissionPolicySet.fromFfiPermissionPolicySet( + try permissions().policySet()) } public func creatorInboxId() throws -> String { return try metadata().creatorInboxId() } - + public func addedByInboxId() throws -> String { return try ffiGroup.addedByInboxId() } @@ -140,7 +145,7 @@ public struct Group: Identifiable, Equatable, Hashable { public func removeMembers(addresses: [String]) async throws { try await ffiGroup.removeMembers(accountAddresses: addresses) } - + public func addMembersByInboxId(inboxIds: [String]) async throws { try await ffiGroup.addMembersByInboxId(inboxIds: inboxIds) } @@ -148,11 +153,11 @@ public struct Group: Identifiable, Equatable, Hashable { public func removeMembersByInboxId(inboxIds: [String]) async throws { try await ffiGroup.removeMembersByInboxId(inboxIds: inboxIds) } - - public func groupName() throws -> String { - return try ffiGroup.groupName() - } - + + public func groupName() throws -> String { + return try ffiGroup.groupName() + } + public func groupImageUrlSquare() throws -> String { return try ffiGroup.groupImageUrlSquare() } @@ -165,57 +170,106 @@ public struct Group: Identifiable, Equatable, Hashable { return try ffiGroup.groupPinnedFrameUrl() } - public func updateGroupName(groupName: String) async throws { - try await ffiGroup.updateGroupName(groupName: groupName) - } - - public func updateGroupImageUrlSquare(imageUrlSquare: String) async throws { - try await ffiGroup.updateGroupImageUrlSquare(groupImageUrlSquare: imageUrlSquare) + public func updateGroupName(groupName: String) async throws { + try await ffiGroup.updateGroupName(groupName: groupName) } - public func updateGroupDescription(groupDescription: String) async throws { - try await ffiGroup.updateGroupDescription(groupDescription: groupDescription) - } - - public func updateGroupPinnedFrameUrl(groupPinnedFrameUrl: String) async throws { - try await ffiGroup.updateGroupPinnedFrameUrl(pinnedFrameUrl: groupPinnedFrameUrl) - } - - public func updateAddMemberPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.addMember, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateRemoveMemberPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.removeMember, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateAddAdminPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.addAdmin, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateRemoveAdminPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.removeAdmin, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateGroupNamePermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.groupName) - } - - public func updateGroupDescriptionPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.description) - } - - public func updateGroupImageUrlSquarePermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.imageUrlSquare) + public func updateGroupImageUrlSquare(imageUrlSquare: String) async throws { + try await ffiGroup.updateGroupImageUrlSquare( + groupImageUrlSquare: imageUrlSquare) } - public func updateGroupPinnedFrameUrlPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.pinnedFrameUrl) + public func updateGroupDescription(groupDescription: String) async throws { + try await ffiGroup.updateGroupDescription( + groupDescription: groupDescription) + } + + public func updateGroupPinnedFrameUrl(groupPinnedFrameUrl: String) + async throws + { + try await ffiGroup.updateGroupPinnedFrameUrl( + pinnedFrameUrl: groupPinnedFrameUrl) + } + + public func updateAddMemberPermission(newPermissionOption: PermissionOption) + async throws + { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.addMember, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateRemoveMemberPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.removeMember, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateAddAdminPermission(newPermissionOption: PermissionOption) + async throws + { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.addAdmin, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateRemoveAdminPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.removeAdmin, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateGroupNamePermission(newPermissionOption: PermissionOption) + async throws + { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.groupName) + } + + public func updateGroupDescriptionPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.description) + } + + public func updateGroupImageUrlSquarePermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.imageUrlSquare) + } + + public func updateGroupPinnedFrameUrlPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.pinnedFrameUrl) } public func updateConsentState(state: ConsentState) async throws { - if (client.hasV2Client) { - switch (state) { + if client.hasV2Client { + switch state { case .allowed: try await client.contacts.allowGroups(groupIds: [id]) case .denied: try await client.contacts.denyGroups(groupIds: [id]) case .unknown: () @@ -225,33 +279,42 @@ public struct Group: Identifiable, Equatable, Hashable { try ffiGroup.updateConsentState(state: state.toFFI) } - public func consentState() throws -> ConsentState{ + public func consentState() throws -> ConsentState { return try ffiGroup.consentState().fromFFI } - + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { - let message = try await ffiGroup.processStreamedConversationMessage(envelopeBytes: envelopeBytes) + let message = try await ffiGroup.processStreamedConversationMessage( + envelopeBytes: envelopeBytes) return MessageV3(client: client, ffiMessage: message) } - public func send(content: T, options: SendOptions? = nil) async throws -> String { - let encodeContent = try await encodeContent(content: content, options: options) + public func send(content: T, options: SendOptions? = nil) async throws + -> String + { + let encodeContent = try await encodeContent( + content: content, options: options) return try await send(encodedContent: encodeContent) } public func send(encodedContent: EncodedContent) async throws -> String { - if (try consentState() == .unknown) { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - let messageId = try await ffiGroup.send(contentBytes: encodedContent.serializedData()) + let messageId = try await ffiGroup.send( + contentBytes: encodedContent.serializedData()) return messageId.toHex } - public func encodeContent(content: T, options: SendOptions?) async throws -> EncodedContent { + public func encodeContent(content: T, options: SendOptions?) async throws + -> EncodedContent + { let codec = client.codecRegistry.find(for: options?.contentType) - func encode(codec: Codec, content: Any) throws -> EncodedContent { + func encode(codec: Codec, content: Any) throws + -> EncodedContent + { if let content = content as? Codec.T { return try codec.encode(content: content, client: client) } else { @@ -261,7 +324,9 @@ public struct Group: Identifiable, Equatable, Hashable { var encoded = try encode(codec: codec, content: content) - func fallback(codec: Codec, content: Any) throws -> String? { + func fallback(codec: Codec, content: Any) throws + -> String? + { if let content = content as? Codec.T { return try codec.fallback(content: content) } else { @@ -279,14 +344,19 @@ public struct Group: Identifiable, Equatable, Hashable { return encoded } - - public func prepareMessage(content: T, options: SendOptions? = nil) async throws -> String { - if (try consentState() == .unknown) { + + public func prepareMessage(content: T, options: SendOptions? = nil) + async throws -> String + { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - - let encodeContent = try await encodeContent(content: content, options: options) - return try ffiGroup.sendOptimistic(contentBytes: try encodeContent.serializedData()).toHex + + let encodeContent = try await encodeContent( + content: content, options: options) + return try ffiGroup.sendOptimistic( + contentBytes: try encodeContent.serializedData() + ).toHex } public func publishMessages() async throws { @@ -301,20 +371,24 @@ public struct Group: Identifiable, Equatable, Hashable { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiGroup.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -327,24 +401,30 @@ public struct Group: Identifiable, Equatable, Hashable { } } - public func streamDecryptedMessages() -> AsyncThrowingStream { + public func streamDecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiGroup.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -373,11 +453,13 @@ public struct Group: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { @@ -398,7 +480,7 @@ public struct Group: Identifiable, Equatable, Hashable { }() options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -410,8 +492,10 @@ public struct Group: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiGroup.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decodeOrNull() + return try ffiGroup.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decodeOrNull() } } @@ -431,17 +515,19 @@ public struct Group: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { options.limit = Int64(limit) } - + let status: FfiDeliveryStatus? = { switch deliveryStatus { case .published: @@ -454,9 +540,9 @@ public struct Group: Identifiable, Equatable, Hashable { return nil } }() - + options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -468,8 +554,10 @@ public struct Group: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiGroup.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decryptOrNull() + return try ffiGroup.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decryptOrNull() } } } diff --git a/Tests/XMTPTests/V3ClientTests.swift b/Tests/XMTPTests/V3ClientTests.swift index 97a3c333..fe6f8085 100644 --- a/Tests/XMTPTests/V3ClientTests.swift +++ b/Tests/XMTPTests/V3ClientTests.swift @@ -1,15 +1,16 @@ // // V3ClientTests.swift -// +// // // Created by Naomi Plasterer on 9/19/24. // -import XCTest -@testable import XMTPiOS import LibXMTP +import XCTest import XMTPTestHelpers +@testable import XMTPiOS + @available(iOS 16, *) class V3ClientTests: XCTestCase { // Use these fixtures to talk to the local node @@ -21,7 +22,7 @@ class V3ClientTests: XCTestCase { var boV3Client: Client! var caroV2V3Client: Client! } - + func localFixtures() async throws -> LocalFixtures { let key = try Crypto.secureRandomBytes(count: 32) let alixV2 = try PrivateKey.generate() @@ -59,45 +60,60 @@ class V3ClientTests: XCTestCase { caroV2V3Client: caroV2V3Client ) } - + func testsCanCreateGroup() async throws { let fixtures = try await localFixtures() - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.address + ]) let members = try await group.members.map(\.inboxId).sorted() - XCTAssertEqual([fixtures.caroV2V3Client.inboxID, fixtures.boV3Client.inboxID].sorted(), members) + XCTAssertEqual( + [fixtures.caroV2V3Client.inboxID, fixtures.boV3Client.inboxID] + .sorted(), members) await assertThrowsAsyncError( - try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.alixV2.address]) + try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.alixV2.address + ]) ) } func testCanCreateDm() async throws { let fixtures = try await localFixtures() - let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.walletAddress) let members = try await dm.members XCTAssertEqual(members.count, 2) - - let sameDm = try await fixtures.boV3Client.findDm(address: fixtures.caroV2V3.walletAddress) + + let sameDm = try await fixtures.boV3Client.findDm( + address: fixtures.caroV2V3.walletAddress) XCTAssertEqual(sameDm?.id, dm.id) try await fixtures.caroV2V3Client.conversations.sync() - let caroDm = try await fixtures.caroV2V3Client.findDm(address: fixtures.boV3Client.address) + let caroDm = try await fixtures.caroV2V3Client.findDm( + address: fixtures.boV3Client.address) XCTAssertEqual(caroDm?.id, dm.id) await assertThrowsAsyncError( - try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.alixV2.walletAddress) + try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.alixV2.walletAddress) ) } func testCanFindConversationByTopic() async throws { let fixtures = try await localFixtures() - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) - let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.walletAddress + ]) + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.walletAddress) - let sameDm = try fixtures.boV3Client.findConversationByTopic(topic: dm.topic) - let sameGroup = try fixtures.boV3Client.findConversationByTopic(topic: group.topic) + let sameDm = try fixtures.boV3Client.findConversationByTopic( + topic: dm.topic) + let sameGroup = try fixtures.boV3Client.findConversationByTopic( + topic: group.topic) XCTAssertEqual(group.id, try sameGroup?.id) XCTAssertEqual(dm.id, try sameDm?.id) @@ -106,19 +122,27 @@ class V3ClientTests: XCTestCase { func testCanListConversations() async throws { let fixtures = try await localFixtures() - let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.walletAddress) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.walletAddress + ]) - let convoCount = try await fixtures.boV3Client.conversations.listConversations().count + let convoCount = try await fixtures.boV3Client.conversations + .listConversations().count let dmCount = try await fixtures.boV3Client.conversations.dms().count - let groupCount = try await fixtures.boV3Client.conversations.groups().count + let groupCount = try await fixtures.boV3Client.conversations.groups() + .count XCTAssertEqual(convoCount, 2) XCTAssertEqual(dmCount, 1) XCTAssertEqual(groupCount, 1) try await fixtures.caroV2V3Client.conversations.sync() - let convoCount2 = try await fixtures.caroV2V3Client.conversations.list(includeGroups: true).count - let groupCount2 = try await fixtures.caroV2V3Client.conversations.groups().count + let convoCount2 = try await fixtures.caroV2V3Client.conversations.list( + includeGroups: true + ).count + let groupCount2 = try await fixtures.caroV2V3Client.conversations + .groups().count XCTAssertEqual(convoCount2, 1) XCTAssertEqual(groupCount2, 1) } @@ -126,19 +150,26 @@ class V3ClientTests: XCTestCase { func testCanListConversationsFiltered() async throws { let fixtures = try await localFixtures() - let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) - - let convoCount = try await fixtures.boV3Client.conversations.listConversations().count - let convoCountConsent = try await fixtures.boV3Client.conversations.listConversations(consentState: .allowed).count + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.walletAddress) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.walletAddress + ]) + + let convoCount = try await fixtures.boV3Client.conversations + .listConversations().count + let convoCountConsent = try await fixtures.boV3Client.conversations + .listConversations(consentState: .allowed).count XCTAssertEqual(convoCount, 2) XCTAssertEqual(convoCountConsent, 2) try await group.updateConsentState(state: .denied) - - let convoCountAllowed = try await fixtures.boV3Client.conversations.listConversations(consentState: .allowed).count - let convoCountDenied = try await fixtures.boV3Client.conversations.listConversations(consentState: .denied).count + + let convoCountAllowed = try await fixtures.boV3Client.conversations + .listConversations(consentState: .allowed).count + let convoCountDenied = try await fixtures.boV3Client.conversations + .listConversations(consentState: .denied).count XCTAssertEqual(convoCountAllowed, 1) XCTAssertEqual(convoCountDenied, 1) @@ -147,136 +178,176 @@ class V3ClientTests: XCTestCase { func testCanListConversationsOrder() async throws { let fixtures = try await localFixtures() - let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) - let group1 = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) - let group2 = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.walletAddress) + let group1 = try await fixtures.boV3Client.conversations.newGroup( + with: [fixtures.caroV2V3.walletAddress]) + let group2 = try await fixtures.boV3Client.conversations.newGroup( + with: [fixtures.caroV2V3.walletAddress]) _ = try await dm.send(content: "Howdy") _ = try await group2.send(content: "Howdy") _ = try await fixtures.boV3Client.conversations.syncAllConversations() - let conversations = try await fixtures.boV3Client.conversations.listConversations() - let conversationsOrdered = try await fixtures.boV3Client.conversations.listConversations(order: .lastMessage) + let conversations = try await fixtures.boV3Client.conversations + .listConversations() + let conversationsOrdered = try await fixtures.boV3Client.conversations + .listConversations(order: .lastMessage) XCTAssertEqual(conversations.count, 3) XCTAssertEqual(conversationsOrdered.count, 3) - XCTAssertEqual(try conversations.map { try $0.id }, [dm.id, group1.id, group2.id]) - XCTAssertEqual(try conversationsOrdered.map { try $0.id }, [group2.id, dm.id, group1.id]) + XCTAssertEqual( + try conversations.map { try $0.id }, [dm.id, group1.id, group2.id]) + XCTAssertEqual( + try conversationsOrdered.map { try $0.id }, + [group2.id, dm.id, group1.id]) } - + func testsCanSendMessages() async throws { let fixtures = try await localFixtures() - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.address + ]) try await group.send(content: "howdy") let messageId = try await group.send(content: "gm") try await group.sync() - + let groupMessages = try await group.messages() XCTAssertEqual(groupMessages.first?.body, "gm") XCTAssertEqual(groupMessages.first?.id, messageId) XCTAssertEqual(groupMessages.first?.deliveryStatus, .published) XCTAssertEqual(groupMessages.count, 3) - try await fixtures.caroV2V3Client.conversations.sync() - let sameGroup = try await fixtures.caroV2V3Client.conversations.groups().last + let sameGroup = try await fixtures.caroV2V3Client.conversations.groups() + .last try await sameGroup?.sync() let sameGroupMessages = try await sameGroup?.messages() XCTAssertEqual(sameGroupMessages?.count, 2) XCTAssertEqual(sameGroupMessages?.first?.body, "gm") } - + func testsCanSendMessagesToDm() async throws { let fixtures = try await localFixtures() - let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.address) try await dm.send(content: "howdy") let messageId = try await dm.send(content: "gm") try await dm.sync() - + let dmMessages = try await dm.messages() XCTAssertEqual(dmMessages.first?.body, "gm") XCTAssertEqual(dmMessages.first?.id, messageId) XCTAssertEqual(dmMessages.first?.deliveryStatus, .published) XCTAssertEqual(dmMessages.count, 3) - try await fixtures.caroV2V3Client.conversations.sync() - let sameDm = try await fixtures.caroV2V3Client.findDm(address: fixtures.boV3Client.address) + let sameDm = try await fixtures.caroV2V3Client.findDm( + address: fixtures.boV3Client.address) try await sameDm?.sync() let sameDmMessages = try await sameDm?.messages() XCTAssertEqual(sameDmMessages?.count, 2) XCTAssertEqual(sameDmMessages?.first?.body, "gm") } - + func testGroupConsent() async throws { let fixtures = try await localFixtures() - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) - let isAllowed = try await fixtures.boV3Client.contacts.isGroupAllowed(groupId: group.id) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.address + ]) + let isAllowed = try await fixtures.boV3Client.contacts.isGroupAllowed( + groupId: group.id) XCTAssert(isAllowed) XCTAssertEqual(try group.consentState(), .allowed) - + try await fixtures.boV3Client.contacts.denyGroups(groupIds: [group.id]) - let isDenied = try await fixtures.boV3Client.contacts.isGroupDenied(groupId: group.id) + let isDenied = try await fixtures.boV3Client.contacts.isGroupDenied( + groupId: group.id) XCTAssert(isDenied) XCTAssertEqual(try group.consentState(), .denied) - + try await group.updateConsentState(state: .allowed) - let isAllowed2 = try await fixtures.boV3Client.contacts.isGroupAllowed(groupId: group.id) + let isAllowed2 = try await fixtures.boV3Client.contacts.isGroupAllowed( + groupId: group.id) XCTAssert(isAllowed2) XCTAssertEqual(try group.consentState(), .allowed) } - + func testCanAllowAndDenyInboxId() async throws { let fixtures = try await localFixtures() - let boGroup = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) - var isInboxAllowed = try await fixtures.boV3Client.contacts.isInboxAllowed(inboxId: fixtures.caroV2V3.address) - var isInboxDenied = try await fixtures.boV3Client.contacts.isInboxDenied(inboxId: fixtures.caroV2V3.address) + let boGroup = try await fixtures.boV3Client.conversations.newGroup( + with: [fixtures.caroV2V3.address]) + var isInboxAllowed = try await fixtures.boV3Client.contacts + .isInboxAllowed(inboxId: fixtures.caroV2V3.address) + var isInboxDenied = try await fixtures.boV3Client.contacts + .isInboxDenied(inboxId: fixtures.caroV2V3.address) XCTAssert(!isInboxAllowed) XCTAssert(!isInboxDenied) - - try await fixtures.boV3Client.contacts.allowInboxes(inboxIds: [fixtures.caroV2V3Client.inboxID]) - var caroMember = try await boGroup.members.first(where: { member in member.inboxId == fixtures.caroV2V3Client.inboxID }) + try await fixtures.boV3Client.contacts.allowInboxes(inboxIds: [ + fixtures.caroV2V3Client.inboxID + ]) + var caroMember = try await boGroup.members.first(where: { member in + member.inboxId == fixtures.caroV2V3Client.inboxID + }) XCTAssertEqual(caroMember?.consentState, .allowed) - isInboxAllowed = try await fixtures.boV3Client.contacts.isInboxAllowed(inboxId: fixtures.caroV2V3Client.inboxID) + isInboxAllowed = try await fixtures.boV3Client.contacts.isInboxAllowed( + inboxId: fixtures.caroV2V3Client.inboxID) XCTAssert(isInboxAllowed) - isInboxDenied = try await fixtures.boV3Client.contacts.isInboxDenied(inboxId: fixtures.caroV2V3Client.inboxID) + isInboxDenied = try await fixtures.boV3Client.contacts.isInboxDenied( + inboxId: fixtures.caroV2V3Client.inboxID) XCTAssert(!isInboxDenied) - var isAddressAllowed = try await fixtures.boV3Client.contacts.isAllowed(fixtures.caroV2V3Client.address) + var isAddressAllowed = try await fixtures.boV3Client.contacts.isAllowed( + fixtures.caroV2V3Client.address) XCTAssert(isAddressAllowed) - var isAddressDenied = try await fixtures.boV3Client.contacts.isDenied(fixtures.caroV2V3Client.address) + var isAddressDenied = try await fixtures.boV3Client.contacts.isDenied( + fixtures.caroV2V3Client.address) XCTAssert(!isAddressDenied) - - try await fixtures.boV3Client.contacts.denyInboxes(inboxIds: [fixtures.caroV2V3Client.inboxID]) - caroMember = try await boGroup.members.first(where: { member in member.inboxId == fixtures.caroV2V3Client.inboxID }) + + try await fixtures.boV3Client.contacts.denyInboxes(inboxIds: [ + fixtures.caroV2V3Client.inboxID + ]) + caroMember = try await boGroup.members.first(where: { member in + member.inboxId == fixtures.caroV2V3Client.inboxID + }) XCTAssertEqual(caroMember?.consentState, .denied) - - isInboxAllowed = try await fixtures.boV3Client.contacts.isInboxAllowed(inboxId: fixtures.caroV2V3Client.inboxID) - isInboxDenied = try await fixtures.boV3Client.contacts.isInboxDenied(inboxId: fixtures.caroV2V3Client.inboxID) + + isInboxAllowed = try await fixtures.boV3Client.contacts.isInboxAllowed( + inboxId: fixtures.caroV2V3Client.inboxID) + isInboxDenied = try await fixtures.boV3Client.contacts.isInboxDenied( + inboxId: fixtures.caroV2V3Client.inboxID) XCTAssert(!isInboxAllowed) XCTAssert(isInboxDenied) - - try await fixtures.boV3Client.contacts.allow(addresses: [fixtures.alixV2.address]) - isAddressAllowed = try await fixtures.boV3Client.contacts.isAllowed(fixtures.alixV2.address) - isAddressDenied = try await fixtures.boV3Client.contacts.isDenied(fixtures.alixV2.address) + + try await fixtures.boV3Client.contacts.allow(addresses: [ + fixtures.alixV2.address + ]) + isAddressAllowed = try await fixtures.boV3Client.contacts.isAllowed( + fixtures.alixV2.address) + isAddressDenied = try await fixtures.boV3Client.contacts.isDenied( + fixtures.alixV2.address) XCTAssert(isAddressAllowed) XCTAssert(!isAddressDenied) } - + func testCanStreamAllMessagesFromV3Users() async throws { let fixtures = try await localFixtures() let expectation1 = XCTestExpectation(description: "got a conversation") expectation1.expectedFulfillmentCount = 2 - let convo = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) - let group = try await fixtures.caroV2V3Client.conversations.newGroup(with: [fixtures.boV3.address]) + let convo = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.address) + let group = try await fixtures.caroV2V3Client.conversations.newGroup( + with: [fixtures.boV3.address]) try await fixtures.boV3Client.conversations.sync() Task(priority: .userInitiated) { - for try await _ in await fixtures.boV3Client.conversations.streamAllConversationMessages() { + for try await _ in await fixtures.boV3Client.conversations + .streamAllConversationMessages() + { expectation1.fulfill() } } @@ -286,17 +357,21 @@ class V3ClientTests: XCTestCase { await fulfillment(of: [expectation1], timeout: 3) } - + func testCanStreamAllDecryptedMessagesFromV3Users() async throws { let fixtures = try await localFixtures() let expectation1 = XCTestExpectation(description: "got a conversation") expectation1.expectedFulfillmentCount = 2 - let convo = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) - let group = try await fixtures.caroV2V3Client.conversations.newGroup(with: [fixtures.boV3.address]) + let convo = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.address) + let group = try await fixtures.caroV2V3Client.conversations.newGroup( + with: [fixtures.boV3.address]) try await fixtures.boV3Client.conversations.sync() Task(priority: .userInitiated) { - for try await _ in await fixtures.boV3Client.conversations.streamAllDecryptedConversationMessages() { + for try await _ in await fixtures.boV3Client.conversations + .streamAllDecryptedConversationMessages() + { expectation1.fulfill() } } @@ -306,7 +381,7 @@ class V3ClientTests: XCTestCase { await fulfillment(of: [expectation1], timeout: 3) } - + func testCanStreamGroupsAndConversationsFromV3Users() async throws { let fixtures = try await localFixtures() @@ -314,13 +389,18 @@ class V3ClientTests: XCTestCase { expectation1.expectedFulfillmentCount = 2 Task(priority: .userInitiated) { - for try await _ in await fixtures.boV3Client.conversations.streamConversations() { + for try await _ in await fixtures.boV3Client.conversations + .streamConversations() + { expectation1.fulfill() } } - _ = try await fixtures.caroV2V3Client.conversations.newGroup(with: [fixtures.boV3.address]) - _ = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) + _ = try await fixtures.caroV2V3Client.conversations.newGroup(with: [ + fixtures.boV3.address + ]) + _ = try await fixtures.boV3Client.conversations.findOrCreateDm( + with: fixtures.caroV2V3.address) await fulfillment(of: [expectation1], timeout: 3) } @@ -330,11 +410,16 @@ class V3ClientTests: XCTestCase { let expectation1 = XCTestExpectation(description: "got a conversation") expectation1.expectedFulfillmentCount = 2 - let convo = try await fixtures.alixV2Client.conversations.newConversation(with: fixtures.caroV2V3.address) - let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) + let convo = try await fixtures.alixV2Client.conversations + .newConversation(with: fixtures.caroV2V3.address) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.address + ]) try await fixtures.caroV2V3Client.conversations.sync() Task(priority: .userInitiated) { - for try await _ in await fixtures.caroV2V3Client.conversations.streamAllMessages(includeGroups: true) { + for try await _ in await fixtures.caroV2V3Client.conversations + .streamAllMessages(includeGroups: true) + { expectation1.fulfill() } } @@ -344,7 +429,7 @@ class V3ClientTests: XCTestCase { await fulfillment(of: [expectation1], timeout: 3) } - + func testCanStreamGroupsAndConversationsFromV2andV3Users() async throws { let fixtures = try await localFixtures() @@ -352,14 +437,34 @@ class V3ClientTests: XCTestCase { expectation1.expectedFulfillmentCount = 2 Task(priority: .userInitiated) { - for try await _ in await fixtures.caroV2V3Client.conversations.streamAll() { + for try await _ in await fixtures.caroV2V3Client.conversations + .streamAll() + { expectation1.fulfill() } } - _ = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) - _ = try await fixtures.alixV2Client.conversations.newConversation(with: fixtures.caroV2V3.address) + _ = try await fixtures.boV3Client.conversations.newGroup(with: [ + fixtures.caroV2V3.address + ]) + _ = try await fixtures.alixV2Client.conversations.newConversation( + with: fixtures.caroV2V3.address) await fulfillment(of: [expectation1], timeout: 3) } + + func createDms(client: Client, peers: [Client], numMessages: Int) + async throws -> [Dm] + { + var dms: [Dm] = [] + for peer in peers { + let dm = try await peer.conversations.findOrCreateDm( + with: client.address) + dms.append(dm) + for i in 0..