From e64e419380b5ec5ef1ac5beda5a1c62d62db87e3 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 1 Nov 2024 13:25:12 -0700 Subject: [PATCH] swift format one other file --- Sources/XMTPiOS/Conversations.swift | 1086 +++++++++++++++++---------- Sources/XMTPiOS/Dm.swift | 116 ++- Sources/XMTPiOS/Group.swift | 334 +++++--- 3 files changed, 980 insertions(+), 556 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index b3a8e112..1691051c 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -2,48 +2,62 @@ import Foundation import LibXMTP public enum ConversationError: Error, CustomStringConvertible, LocalizedError { - case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String), v2NotSupported(String), v3NotSupported(String) + case recipientNotOnNetwork, recipientIsSender + case v1NotSupported(String) + case v2NotSupported(String) + case v3NotSupported(String) public var description: String { switch self { case .recipientIsSender: - return "ConversationError.recipientIsSender: Recipient cannot be sender" + return + "ConversationError.recipientIsSender: Recipient cannot be sender" case .recipientNotOnNetwork: - return "ConversationError.recipientNotOnNetwork: Recipient is not on network" + return + "ConversationError.recipientNotOnNetwork: Recipient is not on network" case .v1NotSupported(let str): - return "ConversationError.v1NotSupported: V1 does not support: \(str)" + return + "ConversationError.v1NotSupported: V1 does not support: \(str)" case .v2NotSupported(let str): - return "ConversationError.v2NotSupported: V2 does not support: \(str)" + return + "ConversationError.v2NotSupported: V2 does not support: \(str)" case .v3NotSupported(let str): - return "ConversationError.v3NotSupported: V3 does not support: \(str)" + return + "ConversationError.v3NotSupported: V3 does not support: \(str)" } } - + public var errorDescription: String? { return description } } public enum GroupError: Error, CustomStringConvertible, LocalizedError { - case alphaMLSNotEnabled, memberCannotBeSelf, memberNotRegistered([String]), groupsRequireMessagePassed, notSupportedByGroups, streamingFailure + case alphaMLSNotEnabled, memberCannotBeSelf + case memberNotRegistered([String]) + case groupsRequireMessagePassed, notSupportedByGroups, streamingFailure public var description: String { switch self { case .alphaMLSNotEnabled: return "GroupError.alphaMLSNotEnabled" case .memberCannotBeSelf: - return "GroupError.memberCannotBeSelf you cannot add yourself to a group" + return + "GroupError.memberCannotBeSelf you cannot add yourself to a group" case .memberNotRegistered(let array): - return "GroupError.memberNotRegistered members not registered: \(array.joined(separator: ", "))" + return + "GroupError.memberNotRegistered members not registered: \(array.joined(separator: ", "))" case .groupsRequireMessagePassed: - return "GroupError.groupsRequireMessagePassed you cannot call this method without passing a message instead of an envelope" + return + "GroupError.groupsRequireMessagePassed you cannot call this method without passing a message instead of an envelope" case .notSupportedByGroups: - return "GroupError.notSupportedByGroups this method is not supported by groups" + return + "GroupError.notSupportedByGroups this method is not supported by groups" case .streamingFailure: return "GroupError.streamingFailure a stream has failed" } } - + public var errorDescription: String? { return description } @@ -57,7 +71,7 @@ final class ConversationStreamCallback: FfiConversationCallback { func onError(error: LibXMTP.FfiSubscribeError) { print("Error ConversationStreamCallback \(error)") } - + let callback: (FfiConversation) -> Void init(callback: @escaping (FfiConversation) -> Void) { @@ -73,13 +87,13 @@ final class V2SubscriptionCallback: FfiV2SubscriptionCallback { func onError(error: LibXMTP.GenericError) { print("Error V2SubscriptionCallback \(error)") } - + let callback: (Envelope) -> Void init(callback: @escaping (Envelope) -> Void) { self.callback = callback } - + func onMessage(message: LibXMTP.FfiEnvelope) { self.callback(message.fromFFI) } @@ -102,15 +116,15 @@ class StreamManager { } actor FfiStreamActor { - private var ffiStream: FfiStreamCloser? + private var ffiStream: FfiStreamCloser? - func setFfiStream(_ stream: FfiStreamCloser?) { - ffiStream = stream - } + func setFfiStream(_ stream: FfiStreamCloser?) { + ffiStream = stream + } - func endStream() { - ffiStream?.end() - } + func endStream() { + ffiStream?.end() + } } /// Handles listing and creating Conversations. @@ -128,14 +142,14 @@ public actor Conversations { } try await v3Client.conversations().sync() } - - public func syncAllGroups() async throws -> UInt32 { + + public func syncAllGroups() async throws -> UInt32 { guard let v3Client = client.v3Client else { return 0 } return try await v3Client.conversations().syncAllConversations() } - + public func syncAllConversations() async throws -> UInt32 { guard let v3Client = client.v3Client else { return 0 @@ -143,66 +157,93 @@ public actor Conversations { return try await v3Client.conversations().syncAllConversations() } - public func groups(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil) async throws -> [Group] { + public func groups( + createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil + ) async throws -> [Group] { guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil, consentState: nil) + var options = FfiListConversationsOptions( + createdAfterNs: nil, createdBeforeNs: nil, limit: nil, + consentState: nil) if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } if let createdBefore { - options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + options.createdBeforeNs = Int64( + createdBefore.millisecondsSinceEpoch) } if let limit { options.limit = Int64(limit) } - return try await v3Client.conversations().listGroups(opts: options).map { $0.groupFromFFI(client: client) } + return try await v3Client.conversations().listGroups(opts: options).map + { $0.groupFromFFI(client: client) } } - - public func dms(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil) async throws -> [Dm] { - if (client.hasV2Client) { - throw ConversationError.v2NotSupported("Only supported with V3 only clients use newConversation instead") + + public func dms( + createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil + ) async throws -> [Dm] { + if client.hasV2Client { + throw ConversationError.v2NotSupported( + "Only supported with V3 only clients use newConversation instead" + ) } guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil, consentState: nil) + var options = FfiListConversationsOptions( + createdAfterNs: nil, createdBeforeNs: nil, limit: nil, + consentState: nil) if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } if let createdBefore { - options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + options.createdBeforeNs = Int64( + createdBefore.millisecondsSinceEpoch) } if let limit { options.limit = Int64(limit) } - return try await v3Client.conversations().listDms(opts: options).map { $0.dmFromFFI(client: client) } + return try await v3Client.conversations().listDms(opts: options).map { + $0.dmFromFFI(client: client) + } } - - public func listConversations(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil, order: ConversationOrder = .createdAt, consentState: ConsentState? = nil) async throws -> [Conversation] { - if (client.hasV2Client) { - throw ConversationError.v2NotSupported("Only supported with V3 only clients use list instead") + + public func listConversations( + createdAfter: Date? = nil, createdBefore: Date? = nil, + limit: Int? = nil, order: ConversationOrder = .createdAt, + consentState: ConsentState? = nil + ) async throws -> [Conversation] { + if client.hasV2Client { + throw ConversationError.v2NotSupported( + "Only supported with V3 only clients use list instead") } // Todo: add ability to order and consent state guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil, consentState: consentState?.toFFI) + var options = FfiListConversationsOptions( + createdAfterNs: nil, createdBeforeNs: nil, limit: nil, + consentState: consentState?.toFFI) if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } if let createdBefore { - options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + options.createdBeforeNs = Int64( + createdBefore.millisecondsSinceEpoch) } if let limit { options.limit = Int64(limit) } - let ffiConversations = try await v3Client.conversations().list(opts: options) + let ffiConversations = try await v3Client.conversations().list( + opts: options) - let sortedConversations = try sortConversations(ffiConversations, order: order) + let sortedConversations = try sortConversations( + ffiConversations, order: order) - return try sortedConversations.map { try $0.toConversation(client: client) } + return try sortedConversations.map { + try $0.toConversation(client: client) + } } private func sortConversations( @@ -211,18 +252,19 @@ public actor Conversations { ) throws -> [FfiConversation] { switch order { case .lastMessage: - let conversationWithTimestamp: [(FfiConversation, Int64?)] = try conversations.map { conversation in - let message = try conversation.findMessages( - opts: FfiListMessagesOptions( - sentBeforeNs: nil, - sentAfterNs: nil, - limit: 1, - deliveryStatus: nil, - direction: .descending - ) - ).first - return (conversation, message?.sentAtNs) - } + let conversationWithTimestamp: [(FfiConversation, Int64?)] = + try conversations.map { conversation in + let message = try conversation.findMessages( + opts: FfiListMessagesOptions( + sentBeforeNs: nil, + sentAfterNs: nil, + limit: 1, + deliveryStatus: nil, + direction: .descending + ) + ).first + return (conversation, message?.sentAtNs) + } let sortedTuples = conversationWithTimestamp.sorted { (lhs, rhs) in (lhs.1 ?? 0) > (rhs.1 ?? 0) @@ -233,91 +275,111 @@ public actor Conversations { } } - public func streamGroups() async throws -> AsyncThrowingStream { + public func streamGroups() async throws -> AsyncThrowingStream + { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() - let task = Task { - let groupCallback = ConversationStreamCallback() { group in + let ffiStreamActor = FfiStreamActor() + let task = Task { + let groupCallback = ConversationStreamCallback { group in guard !Task.isCancelled else { continuation.finish() return } continuation.yield(group.groupFromFFI(client: self.client)) } - guard let stream = await self.client.v3Client?.conversations().streamGroups(callback: groupCallback) else { + guard + let stream = await self.client.v3Client?.conversations() + .streamGroups(callback: groupCallback) + else { continuation.finish(throwing: GroupError.streamingFailure) return } - await ffiStreamActor.setFfiStream(stream) + await ffiStreamActor.setFfiStream(stream) continuation.onTermination = { @Sendable reason in - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } continuation.onTermination = { @Sendable reason in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - private func streamGroupConversations() -> AsyncThrowingStream { + private func streamGroupConversations() -> AsyncThrowingStream< + Conversation, Error + > { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() + let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamGroups( - callback: ConversationStreamCallback() { group in - guard !Task.isCancelled else { - continuation.finish() - return + let stream = await self.client.v3Client?.conversations() + .streamGroups( + callback: ConversationStreamCallback { group in + guard !Task.isCancelled else { + continuation.finish() + return + } + continuation.yield( + Conversation.group( + group.groupFromFFI(client: self.client))) } - continuation.yield(Conversation.group(group.groupFromFFI(client: self.client))) - } - ) - await ffiStreamActor.setFfiStream(stream) + ) + await ffiStreamActor.setFfiStream(stream) continuation.onTermination = { @Sendable reason in - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } continuation.onTermination = { @Sendable reason in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - - public func streamConversations() -> AsyncThrowingStream { + + public func streamConversations() -> AsyncThrowingStream< + Conversation, Error + > { AsyncThrowingStream { continuation in - if (client.hasV2Client) { - continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 only clients use stream instead")) + if client.hasV2Client { + continuation.finish( + throwing: ConversationError.v2NotSupported( + "Only supported with V3 only clients use stream instead" + )) return } let ffiStreamActor = FfiStreamActor() let task = Task { let stream = await self.client.v3Client?.conversations().stream( - callback: ConversationStreamCallback() { conversation in + callback: ConversationStreamCallback { conversation in guard !Task.isCancelled else { continuation.finish() return } do { - let conversationType = try conversation.groupMetadata().conversationType() + let conversationType = + try conversation.groupMetadata() + .conversationType() if conversationType == "dm" { continuation.yield( - Conversation.dm(conversation.dmFromFFI(client: self.client)) + Conversation.dm( + conversation.dmFromFFI( + client: self.client)) ) } else if conversationType == "group" { continuation.yield( - Conversation.group(conversation.groupFromFFI(client: self.client)) + Conversation.group( + conversation.groupFromFFI( + client: self.client)) ) } } catch { @@ -341,10 +403,12 @@ public actor Conversations { } } } - + public func findOrCreateDm(with peerAddress: String) async throws -> Dm { - if (client.hasV2Client) { - throw ConversationError.v2NotSupported("Only supported with V3 only clients use newConversation instead") + if client.hasV2Client { + throw ConversationError.v2NotSupported( + "Only supported with V3 only clients use newConversation instead" + ) } guard let v3Client = client.v3Client else { @@ -353,76 +417,87 @@ public actor Conversations { if peerAddress.lowercased() == client.address.lowercased() { throw ConversationError.recipientIsSender } - let canMessage = try await self.client.canMessageV3(address: peerAddress) - if !canMessage { + let canMessage = try await self.client.canMessageV3( + address: peerAddress) + if !canMessage { throw ConversationError.recipientNotOnNetwork } - + try await client.contacts.allow(addresses: [peerAddress]) if let existingDm = try await client.findDm(address: peerAddress) { return existingDm } - + let newDm = try await v3Client.conversations() .createDm(accountAddress: peerAddress.lowercased()) .dmFromFFI(client: client) - + try await client.contacts.allow(addresses: [peerAddress]) return newDm } - - public func newGroup(with addresses: [String], - permissions: GroupPermissionPreconfiguration = .allMembers, - name: String = "", - imageUrlSquare: String = "", - description: String = "", - pinnedFrameUrl: String = "" - ) async throws -> Group { - return try await newGroupInternal( - with: addresses, - permissions: GroupPermissionPreconfiguration.toFfiGroupPermissionOptions(option: permissions), - name: name, - imageUrlSquare: imageUrlSquare, - description: description, - pinnedFrameUrl: pinnedFrameUrl, - permissionPolicySet: nil - ) - } - - public func newGroupCustomPermissions(with addresses: [String], - permissionPolicySet: PermissionPolicySet, - name: String = "", - imageUrlSquare: String = "", - description: String = "", - pinnedFrameUrl: String = "" - ) async throws -> Group { - return try await newGroupInternal( - with: addresses, - permissions: FfiGroupPermissionsOptions.customPolicy, - name: name, - imageUrlSquare: imageUrlSquare, - description: description, - pinnedFrameUrl: pinnedFrameUrl, - permissionPolicySet: PermissionPolicySet.toFfiPermissionPolicySet(permissionPolicySet) - ) - } - - private func newGroupInternal(with addresses: [String], - permissions: FfiGroupPermissionsOptions = .allMembers, - name: String = "", - imageUrlSquare: String = "", - description: String = "", - pinnedFrameUrl: String = "", - permissionPolicySet: FfiPermissionPolicySet? = nil + + public func newGroup( + with addresses: [String], + permissions: GroupPermissionPreconfiguration = .allMembers, + name: String = "", + imageUrlSquare: String = "", + description: String = "", + pinnedFrameUrl: String = "" + ) async throws -> Group { + return try await newGroupInternal( + with: addresses, + permissions: + GroupPermissionPreconfiguration.toFfiGroupPermissionOptions( + option: permissions), + name: name, + imageUrlSquare: imageUrlSquare, + description: description, + pinnedFrameUrl: pinnedFrameUrl, + permissionPolicySet: nil + ) + } + + public func newGroupCustomPermissions( + with addresses: [String], + permissionPolicySet: PermissionPolicySet, + name: String = "", + imageUrlSquare: String = "", + description: String = "", + pinnedFrameUrl: String = "" + ) async throws -> Group { + return try await newGroupInternal( + with: addresses, + permissions: FfiGroupPermissionsOptions.customPolicy, + name: name, + imageUrlSquare: imageUrlSquare, + description: description, + pinnedFrameUrl: pinnedFrameUrl, + permissionPolicySet: PermissionPolicySet.toFfiPermissionPolicySet( + permissionPolicySet) + ) + } + + private func newGroupInternal( + with addresses: [String], + permissions: FfiGroupPermissionsOptions = .allMembers, + name: String = "", + imageUrlSquare: String = "", + description: String = "", + pinnedFrameUrl: String = "", + permissionPolicySet: FfiPermissionPolicySet? = nil ) async throws -> Group { guard let v3Client = client.v3Client else { throw GroupError.alphaMLSNotEnabled } - if addresses.first(where: { $0.lowercased() == client.address.lowercased() }) != nil { + if addresses.first(where: { + $0.lowercased() == client.address.lowercased() + }) != nil { throw GroupError.memberCannotBeSelf } - let erroredAddresses = try await withThrowingTaskGroup(of: (String?).self) { group in + let erroredAddresses = try await withThrowingTaskGroup( + of: (String?).self + ) { group in for address in addresses { group.addTask { if try await self.client.canMessageV3(address: address) { @@ -443,42 +518,55 @@ public actor Conversations { if !erroredAddresses.isEmpty { throw GroupError.memberNotRegistered(erroredAddresses) } - let group = try await v3Client.conversations().createGroup(accountAddresses: addresses, - opts: FfiCreateGroupOptions(permissions: permissions, - groupName: name, - groupImageUrlSquare: imageUrlSquare, - groupDescription: description, - groupPinnedFrameUrl: pinnedFrameUrl, - customPermissionPolicySet: permissionPolicySet - )).groupFromFFI(client: client) + let group = try await v3Client.conversations().createGroup( + accountAddresses: addresses, + opts: FfiCreateGroupOptions( + permissions: permissions, + groupName: name, + groupImageUrlSquare: imageUrlSquare, + groupDescription: description, + groupPinnedFrameUrl: pinnedFrameUrl, + customPermissionPolicySet: permissionPolicySet + ) + ).groupFromFFI(client: client) try await client.contacts.allowGroups(groupIds: [group.id]) return group } - - public func streamAllConversationMessages() -> AsyncThrowingStream { + + public func streamAllConversationMessages() -> AsyncThrowingStream< + DecodedMessage, Error + > { AsyncThrowingStream { continuation in - if (client.hasV2Client) { - continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 clients. Use streamAllMessages instead.")) + if client.hasV2Client { + continuation.finish( + throwing: ConversationError.v2NotSupported( + "Only supported with V3 clients. Use streamAllMessages instead." + )) return } let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation + let stream = await self.client.v3Client?.conversations() + .streamAllMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) + } catch { + print("Error onMessage \(error)") } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) - } catch { - print("Error onMessage \(error)") } - } - ) + ) await ffiStreamActor.setFfiStream(stream) } @@ -490,31 +578,41 @@ public actor Conversations { } } } - - public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream { + + public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in - if (client.hasV2Client) { - continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 clients. Use streamAllMessages instead.")) + if client.hasV2Client { + continuation.finish( + throwing: ConversationError.v2NotSupported( + "Only supported with V3 clients. Use streamAllMessages instead." + )) return } let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation + let stream = await self.client.v3Client?.conversations() + .streamAllMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) + } catch { + print("Error onMessage \(error)") } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) - } catch { - print("Error onMessage \(error)") } - } - ) + ) await ffiStreamActor.setFfiStream(stream) } @@ -527,42 +625,52 @@ public actor Conversations { } } - - public func streamAllGroupMessages() -> AsyncThrowingStream { + public func streamAllGroupMessages() -> AsyncThrowingStream< + DecodedMessage, Error + > { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() + let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllGroupMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation - } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) - } catch { - print("Error onMessage \(error)") + let stream = await self.client.v3Client?.conversations() + .streamAllGroupMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) + } catch { + print("Error onMessage \(error)") + } } - } - ) - await ffiStreamActor.setFfiStream(stream) + ) + await ffiStreamActor.setFfiStream(stream) } continuation.onTermination = { _ in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - public func streamAllMessages(includeGroups: Bool = false) -> AsyncThrowingStream { + public func streamAllMessages(includeGroups: Bool = false) + -> AsyncThrowingStream + { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged( + stream: AsyncThrowingStream + ) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { @@ -579,55 +687,69 @@ public actor Conversations { } let task = Task { - await forwardStreamToMerged(stream: streamAllV2Messages()) + await forwardStreamToMerged(stream: streamAllV2Messages()) } - - let groupTask = includeGroups ? Task { - await forwardStreamToMerged(stream: streamAllGroupMessages()) - } : nil + + let groupTask = + includeGroups + ? Task { + await forwardStreamToMerged( + stream: streamAllGroupMessages()) + } : nil continuation.onTermination = { _ in task.cancel() - groupTask?.cancel() + groupTask?.cancel() } } } - public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream { + public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in - let ffiStreamActor = FfiStreamActor() + let ffiStreamActor = FfiStreamActor() let task = Task { - let stream = await self.client.v3Client?.conversations().streamAllGroupMessages( - messageCallback: MessageCallback(client: self.client) { message in - guard !Task.isCancelled else { - continuation.finish() - Task { - await ffiStreamActor.endStream() // End the stream upon cancellation - } - return - } - do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) - } catch { - print("Error onMessage \(error)") + let stream = await self.client.v3Client?.conversations() + .streamAllGroupMessages( + messageCallback: MessageCallback(client: self.client) { + message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return + } + do { + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) + } catch { + print("Error onMessage \(error)") + } } - } - ) - await ffiStreamActor.setFfiStream(stream) + ) + await ffiStreamActor.setFfiStream(stream) } continuation.onTermination = { _ in task.cancel() - Task { - await ffiStreamActor.endStream() - } + Task { + await ffiStreamActor.endStream() + } } } } - public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream { + public func streamAllDecryptedMessages(includeGroups: Bool = false) + -> AsyncThrowingStream + { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged( + stream: AsyncThrowingStream + ) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { @@ -644,23 +766,30 @@ public actor Conversations { } let task = Task { - await forwardStreamToMerged(stream: streamAllV2DecryptedMessages()) + await forwardStreamToMerged( + stream: streamAllV2DecryptedMessages()) } - - let groupTask = includeGroups ? Task { - await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages()) - } : nil + + let groupTask = + includeGroups + ? Task { + await forwardStreamToMerged( + stream: streamAllGroupDecryptedMessages()) + } : nil continuation.onTermination = { _ in - task.cancel() - groupTask?.cancel() + task.cancel() + groupTask?.cancel() } } } - private func findExistingConversation(with peerAddress: String, conversationID: String?) throws -> Conversation? { - return try conversationsByTopic.first(where: { try $0.value.peerAddress == peerAddress && - (($0.value.conversationID ?? "") == (conversationID ?? "")) + private func findExistingConversation( + with peerAddress: String, conversationID: String? + ) throws -> Conversation? { + return try conversationsByTopic.first(where: { + try $0.value.peerAddress == peerAddress + && (($0.value.conversationID ?? "") == (conversationID ?? "")) })?.value } @@ -668,31 +797,42 @@ public actor Conversations { guard let v3Client = client.v3Client else { return nil } - let group = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) + let group = try await v3Client.conversations() + .processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) return Group(ffiGroup: group, client: client) } - - public func conversationFromWelcome(envelopeBytes: Data) async throws -> Conversation? { + + public func conversationFromWelcome(envelopeBytes: Data) async throws + -> Conversation? + { guard let v3Client = client.v3Client else { return nil } - let conversation = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) + let conversation = try await v3Client.conversations() + .processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) return try conversation.toConversation(client: client) } - public func newConversation(with peerAddress: String, context: InvitationV1.Context? = nil, consentProofPayload: ConsentProofPayload? = nil) async throws -> Conversation { + public func newConversation( + with peerAddress: String, context: InvitationV1.Context? = nil, + consentProofPayload: ConsentProofPayload? = nil + ) async throws -> Conversation { if peerAddress.lowercased() == client.address.lowercased() { throw ConversationError.recipientIsSender } print("\(client.address) starting conversation with \(peerAddress)") - if let existing = try findExistingConversation(with: peerAddress, conversationID: context?.conversationID) { + if let existing = try findExistingConversation( + with: peerAddress, conversationID: context?.conversationID) + { return existing } guard let contact = try await client.contacts.find(peerAddress) else { throw ConversationError.recipientNotOnNetwork } - _ = try await list() // cache old conversations and check again - if let existing = try findExistingConversation(with: peerAddress, conversationID: context?.conversationID) { + _ = try await list() // cache old conversations and check again + if let existing = try findExistingConversation( + with: peerAddress, conversationID: context?.conversationID) + { return existing } // We don't have an existing conversation, make a v2 one @@ -703,8 +843,11 @@ public actor Conversations { context: context, consentProofPayload: consentProofPayload ) - let sealedInvitation = try await sendInvitation(recipient: recipient, invitation: invitation, created: Date()) - let conversationV2 = try ConversationV2.create(client: client, invitation: invitation, header: sealedInvitation.v1.header) + let sealedInvitation = try await sendInvitation( + recipient: recipient, invitation: invitation, created: Date()) + let conversationV2 = try ConversationV2.create( + client: client, invitation: invitation, + header: sealedInvitation.v1.header) try await client.contacts.allow(addresses: [peerAddress]) let conversation: Conversation = .v2(conversationV2) Task { @@ -713,31 +856,49 @@ public actor Conversations { return conversation } - public func stream() async throws -> AsyncThrowingStream { + public func stream() async throws -> AsyncThrowingStream< + Conversation, Error + > { AsyncThrowingStream { continuation in Task { var streamedConversationTopics: Set = [] let subscriptionCallback = V2SubscriptionCallback { envelope in Task { - if envelope.contentTopic == Topic.userIntro(self.client.address).description { - let conversationV1 = try self.fromIntro(envelope: envelope) - if !streamedConversationTopics.contains(conversationV1.topic.description) { - streamedConversationTopics.insert(conversationV1.topic.description) + if envelope.contentTopic + == Topic.userIntro(self.client.address).description + { + let conversationV1 = try self.fromIntro( + envelope: envelope) + if !streamedConversationTopics.contains( + conversationV1.topic.description) + { + streamedConversationTopics.insert( + conversationV1.topic.description) continuation.yield(conversationV1) } } - if envelope.contentTopic == Topic.userInvite(self.client.address).description { - let conversationV2 = try self.fromInvite(envelope: envelope) - if !streamedConversationTopics.contains(conversationV2.topic) { - streamedConversationTopics.insert(conversationV2.topic) + if envelope.contentTopic + == Topic.userInvite(self.client.address).description + { + let conversationV2 = try self.fromInvite( + envelope: envelope) + if !streamedConversationTopics.contains( + conversationV2.topic) + { + streamedConversationTopics.insert( + conversationV2.topic) continuation.yield(conversationV2) } } } } - - let stream = try await client.subscribe(topics: [Topic.userIntro(client.address).description, Topic.userInvite(client.address).description], callback: subscriptionCallback) - + + let stream = try await client.subscribe( + topics: [ + Topic.userIntro(client.address).description, + Topic.userInvite(client.address).description, + ], callback: subscriptionCallback) + continuation.onTermination = { @Sendable reason in Task { try await stream.end() @@ -749,7 +910,9 @@ public actor Conversations { public func streamAll() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged( + stream: AsyncThrowingStream + ) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { @@ -768,48 +931,63 @@ public actor Conversations { } } } - - private func validateConsentSignature(signature: String, clientAddress: String, peerAddress: String, timestamp: UInt64) -> Bool { + + private func validateConsentSignature( + signature: String, clientAddress: String, peerAddress: String, + timestamp: UInt64 + ) -> Bool { // timestamp should be in the past if timestamp > UInt64(Date().timeIntervalSince1970 * 1000) { return false } let thirtyDaysAgo = Date().addingTimeInterval(-30 * 24 * 60 * 60) - let thirtyDaysAgoTimestamp = UInt64(thirtyDaysAgo.timeIntervalSince1970 * 1000) + let thirtyDaysAgoTimestamp = UInt64( + thirtyDaysAgo.timeIntervalSince1970 * 1000) if timestamp < thirtyDaysAgoTimestamp { return false } - let message = Signature.consentProofText(peerAddress: peerAddress, timestamp: timestamp) + let message = Signature.consentProofText( + peerAddress: peerAddress, timestamp: timestamp) guard let signatureData = Data(hex: signature) else { print("Invalid signature format") return false } do { let ethMessage = try Signature.ethHash(message) - let recoveredKey = try KeyUtilx.recoverPublicKey(message: ethMessage, signature: signatureData) - let address = KeyUtilx.generateAddress(from: recoveredKey).toChecksumAddress() + let recoveredKey = try KeyUtilx.recoverPublicKey( + message: ethMessage, signature: signatureData) + let address = KeyUtilx.generateAddress(from: recoveredKey) + .toChecksumAddress() return clientAddress == address } catch { return false } } - private func handleConsentProof(consentProof: ConsentProofPayload, peerAddress: String) async throws { + private func handleConsentProof( + consentProof: ConsentProofPayload, peerAddress: String + ) async throws { let signature = consentProof.signature - if (signature == "") { + if signature == "" { return } - if (!validateConsentSignature(signature: signature, clientAddress: client.address, peerAddress: peerAddress, timestamp: consentProof.timestamp)) { + if !validateConsentSignature( + signature: signature, clientAddress: client.address, + peerAddress: peerAddress, timestamp: consentProof.timestamp) + { return } let contacts = client.contacts _ = try await contacts.refreshConsentList() - if try await (contacts.consentList.state(address: peerAddress) == .unknown) { + if try await + (contacts.consentList.state(address: peerAddress) == .unknown) + { try await contacts.allow(addresses: [peerAddress]) } } - public func list(includeGroups: Bool = false) async throws -> [Conversation] { + public func list(includeGroups: Bool = false) async throws -> [Conversation] + { if includeGroups { try await sync() let groups = try await groups() @@ -821,27 +999,40 @@ public actor Conversations { let mostRecent = await self.getMostRecentConversation() let pagination = Pagination(after: mostRecent?.createdAt) do { - let seenPeers = try await listIntroductionPeers(pagination: pagination) + let seenPeers = try await listIntroductionPeers( + pagination: pagination) for (peerAddress, sentAt) in seenPeers { - let newConversation = Conversation.v1(ConversationV1(client: client, peerAddress: peerAddress, sentAt: sentAt)) + let newConversation = Conversation.v1( + ConversationV1( + client: client, peerAddress: peerAddress, sentAt: sentAt + )) newConversations.append(newConversation) } } catch { print("Error loading introduction peers: \(error)") } - for sealedInvitation in try await listInvitations(pagination: pagination) { + for sealedInvitation in try await listInvitations( + pagination: pagination) + { do { - let newConversation = Conversation.v2(try makeConversation(from: sealedInvitation)) + let newConversation = Conversation.v2( + try makeConversation(from: sealedInvitation)) newConversations.append(newConversation) - if let consentProof = newConversation.consentProof, consentProof.signature != "" { - try await self.handleConsentProof(consentProof: consentProof, peerAddress: newConversation.peerAddress) + if let consentProof = newConversation.consentProof, + consentProof.signature != "" + { + try await self.handleConsentProof( + consentProof: consentProof, + peerAddress: newConversation.peerAddress) } } catch { print("Error loading invitations: \(error)") } } for conversation in newConversations { - if try conversation.peerAddress != client.address && Topic.isValidTopic(topic: conversation.topic) { + if try conversation.peerAddress != client.address + && Topic.isValidTopic(topic: conversation.topic) + { await self.addConversation(conversation) } } @@ -864,54 +1055,78 @@ public actor Conversations { } } - public func getHmacKeys(request: Xmtp_KeystoreApi_V1_GetConversationHmacKeysRequest? = nil) -> Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse { - let thirtyDayPeriodsSinceEpoch = Int(Date().timeIntervalSince1970) / (60 * 60 * 24 * 30) - var hmacKeysResponse = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse() + public func getHmacKeys( + request: Xmtp_KeystoreApi_V1_GetConversationHmacKeysRequest? = nil + ) -> Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse { + let thirtyDayPeriodsSinceEpoch = + Int(Date().timeIntervalSince1970) / (60 * 60 * 24 * 30) + var hmacKeysResponse = + Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse() var topics = conversationsByTopic if let requestTopics = request?.topics, !requestTopics.isEmpty { topics = topics.filter { requestTopics.contains($0.key) } } for (topic, conversation) in topics { guard let keyMaterial = conversation.keyMaterial else { continue } - var hmacKeys = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeys() - for period in (thirtyDayPeriodsSinceEpoch - 1)...(thirtyDayPeriodsSinceEpoch + 1) { + var hmacKeys = + Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeys() + for period + in (thirtyDayPeriodsSinceEpoch - 1)...(thirtyDayPeriodsSinceEpoch + + 1) + { let info = "\(period)-\(client.address)" do { - let hmacKey = try Crypto.deriveKey(secret: keyMaterial, nonce: Data(), info: Data(info.utf8)) - var hmacKeyData = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeyData() + let hmacKey = try Crypto.deriveKey( + secret: keyMaterial, nonce: Data(), + info: Data(info.utf8)) + var hmacKeyData = + Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse + .HmacKeyData() hmacKeyData.hmacKey = hmacKey hmacKeyData.thirtyDayPeriodsSinceEpoch = Int32(period) hmacKeys.values.append(hmacKeyData) } catch { - print("Error calculating HMAC key for topic \(topic): \(error)") + print( + "Error calculating HMAC key for topic \(topic): \(error)" + ) } } hmacKeysResponse.hmacKeys[topic] = hmacKeys } return hmacKeysResponse } - + // ------- V1 V2 to be deprecated ------ - + /// Import a previously seen conversation. /// See Conversation.toTopicData() - public func importTopicData(data: Xmtp_KeystoreApi_V1_TopicMap.TopicData) throws -> Conversation { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("importTopicData only supported with V2 clients") + public func importTopicData(data: Xmtp_KeystoreApi_V1_TopicMap.TopicData) + throws -> Conversation + { + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "importTopicData only supported with V2 clients") } let conversation: Conversation if !data.hasInvitation { - let sentAt = Date(timeIntervalSince1970: TimeInterval(data.createdNs / 1_000_000_000)) - conversation = .v1(ConversationV1(client: client, peerAddress: data.peerAddress, sentAt: sentAt)) + let sentAt = Date( + timeIntervalSince1970: TimeInterval( + data.createdNs / 1_000_000_000)) + conversation = .v1( + ConversationV1( + client: client, peerAddress: data.peerAddress, + sentAt: sentAt)) } else { - conversation = .v2(ConversationV2( - topic: data.invitation.topic, - keyMaterial: data.invitation.aes256GcmHkdfSha256.keyMaterial, - context: data.invitation.context, - peerAddress: data.peerAddress, - client: client, - createdAtNs: data.createdNs - )) + conversation = .v2( + ConversationV2( + topic: data.invitation.topic, + keyMaterial: data.invitation.aes256GcmHkdfSha256 + .keyMaterial, + context: data.invitation.context, + peerAddress: data.peerAddress, + client: client, + createdAtNs: data.createdNs + )) } Task { await self.addConversation(conversation) @@ -919,9 +1134,13 @@ public actor Conversations { return conversation } - public func listBatchMessages(topics: [String: Pagination?]) async throws -> [DecodedMessage] { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("listBatchMessages only supported with V2 clients. Use listConversations order lastMessage") + public func listBatchMessages(topics: [String: Pagination?]) async throws + -> [DecodedMessage] + { + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "listBatchMessages only supported with V2 clients. Use listConversations order lastMessage" + ) } let requests = topics.map { topic, page in makeQueryRequest(topic: topic, pagination: page) @@ -929,7 +1148,8 @@ public actor Conversations { /// The maximum number of requests permitted in a single batch call. let maxQueryRequestsPerBatch = 50 let batches = requests.chunks(maxQueryRequestsPerBatch) - .map { requests in BatchQueryRequest.with { $0.requests = requests } } + .map { requests in BatchQueryRequest.with { $0.requests = requests } + } var messages: [DecodedMessage] = [] // TODO: consider using a task group here for parallel batch calls guard let apiClient = client.apiClient else { @@ -939,15 +1159,20 @@ public actor Conversations { messages += try await apiClient.batchQuery(request: batch) .responses.flatMap { res in res.envelopes.compactMap { envelope in - let conversation = conversationsByTopic[envelope.contentTopic] + let conversation = conversationsByTopic[ + envelope.contentTopic] if conversation == nil { - print("discarding message, unknown conversation \(envelope)") + print( + "discarding message, unknown conversation \(envelope)" + ) return nil } do { return try conversation?.decode(envelope) } catch { - print("discarding message, unable to decode \(envelope)") + print( + "discarding message, unable to decode \(envelope)" + ) return nil } } @@ -956,9 +1181,13 @@ public actor Conversations { return messages } - public func listBatchDecryptedMessages(topics: [String: Pagination?]) async throws -> [DecryptedMessage] { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("listBatchMessages only supported with V2 clients. Use listConversations order lastMessage") + public func listBatchDecryptedMessages(topics: [String: Pagination?]) + async throws -> [DecryptedMessage] + { + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "listBatchMessages only supported with V2 clients. Use listConversations order lastMessage" + ) } let requests = topics.map { topic, page in makeQueryRequest(topic: topic, pagination: page) @@ -966,7 +1195,8 @@ public actor Conversations { /// The maximum number of requests permitted in a single batch call. let maxQueryRequestsPerBatch = 50 let batches = requests.chunks(maxQueryRequestsPerBatch) - .map { requests in BatchQueryRequest.with { $0.requests = requests } } + .map { requests in BatchQueryRequest.with { $0.requests = requests } + } var messages: [DecryptedMessage] = [] // TODO: consider using a task group here for parallel batch calls guard let apiClient = client.apiClient else { @@ -976,15 +1206,20 @@ public actor Conversations { messages += try await apiClient.batchQuery(request: batch) .responses.flatMap { res in res.envelopes.compactMap { envelope in - let conversation = conversationsByTopic[envelope.contentTopic] + let conversation = conversationsByTopic[ + envelope.contentTopic] if conversation == nil { - print("discarding message, unknown conversation \(envelope)") + print( + "discarding message, unknown conversation \(envelope)" + ) return nil } do { return try conversation?.decrypt(envelope) } catch { - print("discarding message, unable to decode \(envelope)") + print( + "discarding message, unable to decode \(envelope)" + ) return nil } } @@ -993,51 +1228,72 @@ public actor Conversations { return messages } - private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 { - let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) + private func makeConversation(from sealedInvitation: SealedInvitation) + throws -> ConversationV2 + { + let unsealed = try sealedInvitation.v1.getInvitation( + viewer: client.keys) + return try ConversationV2.create( + client: client, invitation: unsealed, + header: sealedInvitation.v1.header) } - + func streamAllV2Messages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - if (!client.hasV2Client) { - continuation.finish(throwing: ConversationError.v3NotSupported("Only supported with V2 clients. Use streamAllConversationMessages instead.")) + if !client.hasV2Client { + continuation.finish( + throwing: ConversationError.v3NotSupported( + "Only supported with V2 clients. Use streamAllConversationMessages instead." + )) return } let streamManager = StreamManager() - + Task { var topics: [String] = [ Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description + Topic.userIntro(client.address).description, ] for conversation in try await list() { topics.append(conversation.topic) } - var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + var subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) let subscriptionCallback = V2SubscriptionCallback { envelope in Task { do { - if let conversation = self.conversationsByTopic[envelope.contentTopic] { + if let conversation = self.conversationsByTopic[ + envelope.contentTopic] + { let decoded = try conversation.decode(envelope) continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try self.fromInvite(envelope: envelope) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/invite-") + { + let conversation = try self.fromInvite( + envelope: envelope) await self.addConversation(conversation) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try self.fromIntro(envelope: envelope) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/intro-") + { + let conversation = try self.fromIntro( + envelope: envelope) await self.addConversation(conversation) let decoded = try conversation.decode(envelope) continuation.yield(decoded) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) } else { print("huh \(envelope)") } @@ -1046,9 +1302,11 @@ public actor Conversations { } } } - let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) + let newStream = try await client.subscribe2( + request: subscriptionRequest, callback: subscriptionCallback + ) streamManager.setStream(newStream) - + continuation.onTermination = { @Sendable reason in Task { try await streamManager.endStream() @@ -1057,46 +1315,66 @@ public actor Conversations { } } } - - func streamAllV2DecryptedMessages() -> AsyncThrowingStream { + + func streamAllV2DecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in let streamManager = StreamManager() - if (!client.hasV2Client) { - continuation.finish(throwing: ConversationError.v3NotSupported("Only supported with V2 clients. Use streamAllDecryptedConversationMessages instead.")) + if !client.hasV2Client { + continuation.finish( + throwing: ConversationError.v3NotSupported( + "Only supported with V2 clients. Use streamAllDecryptedConversationMessages instead." + )) return } Task { var topics: [String] = [ Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description + Topic.userIntro(client.address).description, ] for conversation in try await list() { topics.append(conversation.topic) } - var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + var subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) let subscriptionCallback = V2SubscriptionCallback { envelope in Task { do { - if let conversation = self.conversationsByTopic[envelope.contentTopic] { - let decrypted = try conversation.decrypt(envelope) + if let conversation = self.conversationsByTopic[ + envelope.contentTopic] + { + let decrypted = try conversation.decrypt( + envelope) continuation.yield(decrypted) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try self.fromInvite(envelope: envelope) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/invite-") + { + let conversation = try self.fromInvite( + envelope: envelope) await self.addConversation(conversation) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try self.fromIntro(envelope: envelope) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) + } else if envelope.contentTopic.hasPrefix( + "/xmtp/0/intro-") + { + let conversation = try self.fromIntro( + envelope: envelope) await self.addConversation(conversation) - let decrypted = try conversation.decrypt(envelope) + let decrypted = try conversation.decrypt( + envelope) continuation.yield(decrypted) topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) + subscriptionRequest = FfiV2SubscribeRequest( + contentTopics: topics) + try await streamManager.updateStream( + with: subscriptionRequest) } else { print("huh \(envelope)") } @@ -1105,9 +1383,11 @@ public actor Conversations { } } } - let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) + let newStream = try await client.subscribe2( + request: subscriptionRequest, callback: subscriptionCallback + ) streamManager.setStream(newStream) - + continuation.onTermination = { @Sendable reason in Task { try await streamManager.endStream() @@ -1118,28 +1398,40 @@ public actor Conversations { } public func fromInvite(envelope: Envelope) throws -> Conversation { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("fromIntro only supported with V2 clients use fromWelcome instead") - } - let sealedInvitation = try SealedInvitation(serializedData: envelope.message) - let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try .v2(ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header)) + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "fromIntro only supported with V2 clients use fromWelcome instead" + ) + } + let sealedInvitation = try SealedInvitation( + serializedData: envelope.message) + let unsealed = try sealedInvitation.v1.getInvitation( + viewer: client.keys) + return try .v2( + ConversationV2.create( + client: client, invitation: unsealed, + header: sealedInvitation.v1.header)) } public func fromIntro(envelope: Envelope) throws -> Conversation { - if (!client.hasV2Client) { - throw ConversationError.v3NotSupported("fromIntro only supported with V2 clients use fromWelcome instead") + if !client.hasV2Client { + throw ConversationError.v3NotSupported( + "fromIntro only supported with V2 clients use fromWelcome instead" + ) } let messageV1 = try MessageV1.fromBytes(envelope.message) let senderAddress = try messageV1.header.sender.walletAddress let recipientAddress = try messageV1.header.recipient.walletAddress - let peerAddress = client.address == senderAddress ? recipientAddress : senderAddress - let conversationV1 = ConversationV1(client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) + let peerAddress = + client.address == senderAddress ? recipientAddress : senderAddress + let conversationV1 = ConversationV1( + client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) return .v1(conversationV1) } - - private func listIntroductionPeers(pagination: Pagination?) async throws -> [String: Date] { + private func listIntroductionPeers(pagination: Pagination?) async throws + -> [String: Date] + { guard let apiClient = client.apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } @@ -1160,11 +1452,14 @@ public actor Conversations { var seenPeers: [String: Date] = [:] for message in messages { guard let recipientAddress = message.recipientAddress, - let senderAddress = message.senderAddress else { + let senderAddress = message.senderAddress + else { continue } let sentAt = message.sentAt - let peerAddress = recipientAddress == client.address ? senderAddress : recipientAddress + let peerAddress = + recipientAddress == client.address + ? senderAddress : recipientAddress guard let existing = seenPeers[peerAddress] else { seenPeers[peerAddress] = sentAt continue @@ -1176,7 +1471,9 @@ public actor Conversations { return seenPeers } - private func listInvitations(pagination: Pagination?) async throws -> [SealedInvitation] { + private func listInvitations(pagination: Pagination?) async throws + -> [SealedInvitation] + { guard let apiClient = client.apiClient else { throw ClientError.noV2Client("Error no V2 client initialized") } @@ -1191,7 +1488,10 @@ public actor Conversations { } } - func sendInvitation(recipient: SignedPublicKeyBundle, invitation: InvitationV1, created: Date) async throws -> SealedInvitation { + func sendInvitation( + recipient: SignedPublicKeyBundle, invitation: InvitationV1, + created: Date + ) async throws -> SealedInvitation { let sealed = try SealedInvitation.createV1( sender: client.keys, recipient: recipient, @@ -1200,8 +1500,12 @@ public actor Conversations { ) let peerAddress = try recipient.walletAddress try await client.publish(envelopes: [ - Envelope(topic: .userInvite(client.address), timestamp: created, message: sealed.serializedData()), - Envelope(topic: .userInvite(peerAddress), timestamp: created, message: sealed.serializedData()), + Envelope( + topic: .userInvite(client.address), timestamp: created, + message: sealed.serializedData()), + Envelope( + topic: .userInvite(peerAddress), timestamp: created, + message: sealed.serializedData()), ]) return sealed } diff --git a/Sources/XMTPiOS/Dm.swift b/Sources/XMTPiOS/Dm.swift index 2a8f5fca..37522e7e 100644 --- a/Sources/XMTPiOS/Dm.swift +++ b/Sources/XMTPiOS/Dm.swift @@ -16,7 +16,7 @@ public struct Dm: Identifiable, Equatable, Hashable { public var id: String { ffiConversation.id().toHex } - + public var topic: String { Topic.groupMessage(id).description } @@ -44,14 +44,15 @@ public struct Dm: Identifiable, Equatable, Hashable { public func creatorInboxId() throws -> String { return try metadata().creatorInboxId() } - + public func addedByInboxId() throws -> String { return try ffiConversation.addedByInboxId() } public var members: [Member] { get async throws { - return try await ffiConversation.listMembers().map { ffiGroupMember in + return try await ffiConversation.listMembers().map { + ffiGroupMember in Member(ffiGroupMember: ffiGroupMember) } } @@ -68,8 +69,8 @@ public struct Dm: Identifiable, Equatable, Hashable { } public func updateConsentState(state: ConsentState) async throws { - if (client.hasV2Client) { - switch (state) { + if client.hasV2Client { + switch state { case .allowed: try await client.contacts.allowGroups(groupIds: [id]) case .denied: try await client.contacts.denyGroups(groupIds: [id]) case .unknown: () @@ -79,33 +80,43 @@ public struct Dm: Identifiable, Equatable, Hashable { try ffiConversation.updateConsentState(state: state.toFFI) } - public func consentState() throws -> ConsentState{ + public func consentState() throws -> ConsentState { return try ffiConversation.consentState().fromFFI } - + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { - let message = try await ffiConversation.processStreamedConversationMessage(envelopeBytes: envelopeBytes) + let message = + try await ffiConversation.processStreamedConversationMessage( + envelopeBytes: envelopeBytes) return MessageV3(client: client, ffiMessage: message) } - public func send(content: T, options: SendOptions? = nil) async throws -> String { - let encodeContent = try await encodeContent(content: content, options: options) + public func send(content: T, options: SendOptions? = nil) async throws + -> String + { + let encodeContent = try await encodeContent( + content: content, options: options) return try await send(encodedContent: encodeContent) } public func send(encodedContent: EncodedContent) async throws -> String { - if (try consentState() == .unknown) { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - let messageId = try await ffiConversation.send(contentBytes: encodedContent.serializedData()) + let messageId = try await ffiConversation.send( + contentBytes: encodedContent.serializedData()) return messageId.toHex } - public func encodeContent(content: T, options: SendOptions?) async throws -> EncodedContent { + public func encodeContent(content: T, options: SendOptions?) async throws + -> EncodedContent + { let codec = client.codecRegistry.find(for: options?.contentType) - func encode(codec: Codec, content: Any) throws -> EncodedContent { + func encode(codec: Codec, content: Any) throws + -> EncodedContent + { if let content = content as? Codec.T { return try codec.encode(content: content, client: client) } else { @@ -115,7 +126,9 @@ public struct Dm: Identifiable, Equatable, Hashable { var encoded = try encode(codec: codec, content: content) - func fallback(codec: Codec, content: Any) throws -> String? { + func fallback(codec: Codec, content: Any) throws + -> String? + { if let content = content as? Codec.T { return try codec.fallback(content: content) } else { @@ -133,14 +146,19 @@ public struct Dm: Identifiable, Equatable, Hashable { return encoded } - - public func prepareMessage(content: T, options: SendOptions? = nil) async throws -> String { - if (try consentState() == .unknown) { + + public func prepareMessage(content: T, options: SendOptions? = nil) + async throws -> String + { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - - let encodeContent = try await encodeContent(content: content, options: options) - return try ffiConversation.sendOptimistic(contentBytes: try encodeContent.serializedData()).toHex + + let encodeContent = try await encodeContent( + content: content, options: options) + return try ffiConversation.sendOptimistic( + contentBytes: try encodeContent.serializedData() + ).toHex } public func publishMessages() async throws { @@ -155,20 +173,24 @@ public struct Dm: Identifiable, Equatable, Hashable { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiConversation.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -181,24 +203,30 @@ public struct Dm: Identifiable, Equatable, Hashable { } } - public func streamDecryptedMessages() -> AsyncThrowingStream { + public func streamDecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiConversation.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -227,11 +255,13 @@ public struct Dm: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { @@ -252,7 +282,7 @@ public struct Dm: Identifiable, Equatable, Hashable { }() options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -264,8 +294,10 @@ public struct Dm: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiConversation.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decodeOrNull() + return try ffiConversation.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decodeOrNull() } } @@ -285,17 +317,19 @@ public struct Dm: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { options.limit = Int64(limit) } - + let status: FfiDeliveryStatus? = { switch deliveryStatus { case .published: @@ -308,9 +342,9 @@ public struct Dm: Identifiable, Equatable, Hashable { return nil } }() - + options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -322,8 +356,10 @@ public struct Dm: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiConversation.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decryptOrNull() + return try ffiConversation.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decryptOrNull() } } } diff --git a/Sources/XMTPiOS/Group.swift b/Sources/XMTPiOS/Group.swift index 1b95bf2d..cef154a6 100644 --- a/Sources/XMTPiOS/Group.swift +++ b/Sources/XMTPiOS/Group.swift @@ -12,7 +12,7 @@ final class MessageCallback: FfiMessageCallback { func onError(error: LibXMTP.FfiSubscribeError) { print("Error MessageCallback \(error)") } - + let client: Client let callback: (LibXMTP.FfiMessage) -> Void @@ -38,7 +38,7 @@ public struct Group: Identifiable, Equatable, Hashable { public var id: String { ffiGroup.id().toHex } - + public var topic: String { Topic.groupMessage(id).description } @@ -46,10 +46,10 @@ public struct Group: Identifiable, Equatable, Hashable { func metadata() throws -> FfiConversationMetadata { return try ffiGroup.groupMetadata() } - - func permissions() throws -> FfiGroupPermissions { - return try ffiGroup.groupPermissions() - } + + func permissions() throws -> FfiGroupPermissions { + return try ffiGroup.groupPermissions() + } public func sync() async throws { try await ffiGroup.sync() @@ -70,47 +70,48 @@ public struct Group: Identifiable, Equatable, Hashable { public func isCreator() throws -> Bool { return try metadata().creatorInboxId() == client.inboxID } - - public func isAdmin(inboxId: String) throws -> Bool { - return try ffiGroup.isAdmin(inboxId: inboxId) - } - - public func isSuperAdmin(inboxId: String) throws -> Bool { - return try ffiGroup.isSuperAdmin(inboxId: inboxId) - } - - public func addAdmin(inboxId: String) async throws { - try await ffiGroup.addAdmin(inboxId: inboxId) - } - - public func removeAdmin(inboxId: String) async throws { - try await ffiGroup.removeAdmin(inboxId: inboxId) - } - - public func addSuperAdmin(inboxId: String) async throws { - try await ffiGroup.addSuperAdmin(inboxId: inboxId) - } - - public func removeSuperAdmin(inboxId: String) async throws { - try await ffiGroup.removeSuperAdmin(inboxId: inboxId) - } - - public func listAdmins() throws -> [String] { - try ffiGroup.adminList() - } - - public func listSuperAdmins() throws -> [String] { - try ffiGroup.superAdminList() - } + + public func isAdmin(inboxId: String) throws -> Bool { + return try ffiGroup.isAdmin(inboxId: inboxId) + } + + public func isSuperAdmin(inboxId: String) throws -> Bool { + return try ffiGroup.isSuperAdmin(inboxId: inboxId) + } + + public func addAdmin(inboxId: String) async throws { + try await ffiGroup.addAdmin(inboxId: inboxId) + } + + public func removeAdmin(inboxId: String) async throws { + try await ffiGroup.removeAdmin(inboxId: inboxId) + } + + public func addSuperAdmin(inboxId: String) async throws { + try await ffiGroup.addSuperAdmin(inboxId: inboxId) + } + + public func removeSuperAdmin(inboxId: String) async throws { + try await ffiGroup.removeSuperAdmin(inboxId: inboxId) + } + + public func listAdmins() throws -> [String] { + try ffiGroup.adminList() + } + + public func listSuperAdmins() throws -> [String] { + try ffiGroup.superAdminList() + } public func permissionPolicySet() throws -> PermissionPolicySet { - return PermissionPolicySet.fromFfiPermissionPolicySet(try permissions().policySet()) + return PermissionPolicySet.fromFfiPermissionPolicySet( + try permissions().policySet()) } public func creatorInboxId() throws -> String { return try metadata().creatorInboxId() } - + public func addedByInboxId() throws -> String { return try ffiGroup.addedByInboxId() } @@ -144,7 +145,7 @@ public struct Group: Identifiable, Equatable, Hashable { public func removeMembers(addresses: [String]) async throws { try await ffiGroup.removeMembers(accountAddresses: addresses) } - + public func addMembersByInboxId(inboxIds: [String]) async throws { try await ffiGroup.addMembersByInboxId(inboxIds: inboxIds) } @@ -152,11 +153,11 @@ public struct Group: Identifiable, Equatable, Hashable { public func removeMembersByInboxId(inboxIds: [String]) async throws { try await ffiGroup.removeMembersByInboxId(inboxIds: inboxIds) } - - public func groupName() throws -> String { - return try ffiGroup.groupName() - } - + + public func groupName() throws -> String { + return try ffiGroup.groupName() + } + public func groupImageUrlSquare() throws -> String { return try ffiGroup.groupImageUrlSquare() } @@ -169,57 +170,106 @@ public struct Group: Identifiable, Equatable, Hashable { return try ffiGroup.groupPinnedFrameUrl() } - public func updateGroupName(groupName: String) async throws { - try await ffiGroup.updateGroupName(groupName: groupName) - } - - public func updateGroupImageUrlSquare(imageUrlSquare: String) async throws { - try await ffiGroup.updateGroupImageUrlSquare(groupImageUrlSquare: imageUrlSquare) - } - - public func updateGroupDescription(groupDescription: String) async throws { - try await ffiGroup.updateGroupDescription(groupDescription: groupDescription) - } - - public func updateGroupPinnedFrameUrl(groupPinnedFrameUrl: String) async throws { - try await ffiGroup.updateGroupPinnedFrameUrl(pinnedFrameUrl: groupPinnedFrameUrl) - } - - public func updateAddMemberPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.addMember, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateRemoveMemberPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.removeMember, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateAddAdminPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.addAdmin, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateRemoveAdminPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.removeAdmin, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: nil) - } - - public func updateGroupNamePermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.groupName) + public func updateGroupName(groupName: String) async throws { + try await ffiGroup.updateGroupName(groupName: groupName) } - public func updateGroupDescriptionPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.description) - } - - public func updateGroupImageUrlSquarePermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.imageUrlSquare) + public func updateGroupImageUrlSquare(imageUrlSquare: String) async throws { + try await ffiGroup.updateGroupImageUrlSquare( + groupImageUrlSquare: imageUrlSquare) } - public func updateGroupPinnedFrameUrlPermission(newPermissionOption: PermissionOption) async throws { - try await ffiGroup.updatePermissionPolicy(permissionUpdateType: FfiPermissionUpdateType.updateMetadata, permissionPolicyOption: PermissionOption.toFfiPermissionPolicy(option: newPermissionOption), metadataField: FfiMetadataField.pinnedFrameUrl) + public func updateGroupDescription(groupDescription: String) async throws { + try await ffiGroup.updateGroupDescription( + groupDescription: groupDescription) + } + + public func updateGroupPinnedFrameUrl(groupPinnedFrameUrl: String) + async throws + { + try await ffiGroup.updateGroupPinnedFrameUrl( + pinnedFrameUrl: groupPinnedFrameUrl) + } + + public func updateAddMemberPermission(newPermissionOption: PermissionOption) + async throws + { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.addMember, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateRemoveMemberPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.removeMember, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateAddAdminPermission(newPermissionOption: PermissionOption) + async throws + { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.addAdmin, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateRemoveAdminPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.removeAdmin, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), metadataField: nil) + } + + public func updateGroupNamePermission(newPermissionOption: PermissionOption) + async throws + { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.groupName) + } + + public func updateGroupDescriptionPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.description) + } + + public func updateGroupImageUrlSquarePermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.imageUrlSquare) + } + + public func updateGroupPinnedFrameUrlPermission( + newPermissionOption: PermissionOption + ) async throws { + try await ffiGroup.updatePermissionPolicy( + permissionUpdateType: FfiPermissionUpdateType.updateMetadata, + permissionPolicyOption: PermissionOption.toFfiPermissionPolicy( + option: newPermissionOption), + metadataField: FfiMetadataField.pinnedFrameUrl) } public func updateConsentState(state: ConsentState) async throws { - if (client.hasV2Client) { - switch (state) { + if client.hasV2Client { + switch state { case .allowed: try await client.contacts.allowGroups(groupIds: [id]) case .denied: try await client.contacts.denyGroups(groupIds: [id]) case .unknown: () @@ -229,33 +279,42 @@ public struct Group: Identifiable, Equatable, Hashable { try ffiGroup.updateConsentState(state: state.toFFI) } - public func consentState() throws -> ConsentState{ + public func consentState() throws -> ConsentState { return try ffiGroup.consentState().fromFFI } - + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { - let message = try await ffiGroup.processStreamedConversationMessage(envelopeBytes: envelopeBytes) + let message = try await ffiGroup.processStreamedConversationMessage( + envelopeBytes: envelopeBytes) return MessageV3(client: client, ffiMessage: message) } - public func send(content: T, options: SendOptions? = nil) async throws -> String { - let encodeContent = try await encodeContent(content: content, options: options) + public func send(content: T, options: SendOptions? = nil) async throws + -> String + { + let encodeContent = try await encodeContent( + content: content, options: options) return try await send(encodedContent: encodeContent) } public func send(encodedContent: EncodedContent) async throws -> String { - if (try consentState() == .unknown) { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - let messageId = try await ffiGroup.send(contentBytes: encodedContent.serializedData()) + let messageId = try await ffiGroup.send( + contentBytes: encodedContent.serializedData()) return messageId.toHex } - public func encodeContent(content: T, options: SendOptions?) async throws -> EncodedContent { + public func encodeContent(content: T, options: SendOptions?) async throws + -> EncodedContent + { let codec = client.codecRegistry.find(for: options?.contentType) - func encode(codec: Codec, content: Any) throws -> EncodedContent { + func encode(codec: Codec, content: Any) throws + -> EncodedContent + { if let content = content as? Codec.T { return try codec.encode(content: content, client: client) } else { @@ -265,7 +324,9 @@ public struct Group: Identifiable, Equatable, Hashable { var encoded = try encode(codec: codec, content: content) - func fallback(codec: Codec, content: Any) throws -> String? { + func fallback(codec: Codec, content: Any) throws + -> String? + { if let content = content as? Codec.T { return try codec.fallback(content: content) } else { @@ -283,14 +344,19 @@ public struct Group: Identifiable, Equatable, Hashable { return encoded } - - public func prepareMessage(content: T, options: SendOptions? = nil) async throws -> String { - if (try consentState() == .unknown) { + + public func prepareMessage(content: T, options: SendOptions? = nil) + async throws -> String + { + if try consentState() == .unknown { try await updateConsentState(state: .allowed) } - - let encodeContent = try await encodeContent(content: content, options: options) - return try ffiGroup.sendOptimistic(contentBytes: try encodeContent.serializedData()).toHex + + let encodeContent = try await encodeContent( + content: content, options: options) + return try ffiGroup.sendOptimistic( + contentBytes: try encodeContent.serializedData() + ).toHex } public func publishMessages() async throws { @@ -305,20 +371,24 @@ public struct Group: Identifiable, Equatable, Hashable { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiGroup.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decode()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -331,24 +401,30 @@ public struct Group: Identifiable, Equatable, Hashable { } } - public func streamDecryptedMessages() -> AsyncThrowingStream { + public func streamDecryptedMessages() -> AsyncThrowingStream< + DecryptedMessage, Error + > { AsyncThrowingStream { continuation in let task = Task.detached { self.streamHolder.stream = await self.ffiGroup.stream( - messageCallback: MessageCallback(client: self.client) { message in + messageCallback: MessageCallback(client: self.client) { + message in guard !Task.isCancelled else { continuation.finish() return } do { - continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) + continuation.yield( + try MessageV3( + client: self.client, ffiMessage: message + ).decrypt()) } catch { print("Error onMessage \(error)") continuation.finish(throwing: error) } } ) - + continuation.onTermination = { @Sendable reason in self.streamHolder.stream?.end() } @@ -377,11 +453,13 @@ public struct Group: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { @@ -402,7 +480,7 @@ public struct Group: Identifiable, Equatable, Hashable { }() options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -414,8 +492,10 @@ public struct Group: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiGroup.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decodeOrNull() + return try ffiGroup.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decodeOrNull() } } @@ -435,17 +515,19 @@ public struct Group: Identifiable, Equatable, Hashable { ) if let before { - options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + options.sentBeforeNs = Int64( + before.millisecondsSinceEpoch * 1_000_000) } if let after { - options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + options.sentAfterNs = Int64( + after.millisecondsSinceEpoch * 1_000_000) } if let limit { options.limit = Int64(limit) } - + let status: FfiDeliveryStatus? = { switch deliveryStatus { case .published: @@ -458,9 +540,9 @@ public struct Group: Identifiable, Equatable, Hashable { return nil } }() - + options.deliveryStatus = status - + let direction: FfiDirection? = { switch direction { case .ascending: @@ -472,8 +554,10 @@ public struct Group: Identifiable, Equatable, Hashable { options.direction = direction - return try ffiGroup.findMessages(opts: options).compactMap { ffiMessage in - return MessageV3(client: self.client, ffiMessage: ffiMessage).decryptOrNull() + return try ffiGroup.findMessages(opts: options).compactMap { + ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage) + .decryptOrNull() } } }