From 0a8b7080f02a8b8fee08750ac4654b3b631aa21e Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 24 Oct 2024 17:14:15 -0700 Subject: [PATCH] feat: V3 only dms (#411) * update package * add chain id and SCW check * add implementation * fix a little formatting * change defaults * make a test release pod * bump the latest libxmtp * fix up all the async tests * add installation timestamps and async members * fix up the tests and bump the pod * bump to the next version * bad merge * update the package * fix up bad merge * make block number optional * add a test to reproduce the scw error * update to latest libxmtp * update the signers * update to the latest libxmtp functions * fix the linter * get on a working version * check the chain id * chain id is optional * fix the lint issue * tag * remove chain id from inbox id creation * update the SCW functionality and message listing * small tweak to message listing * get closer * small test clean up * add the basic functionality to conversation and client * put V2 stuff below the line * more common functions * reorder the conversations class and add additional functionality * get everything compiling * write a bunch of dm tests * more tests * beefing up all the tests * fix up some tests * Update Sources/XMTPiOS/Conversations.swift Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update Tests/XMTPTests/V3ClientTests.swift Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update Sources/XMTPiOS/Conversation.swift Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update Tests/XMTPTests/GroupTests.swift Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * add return statements * get all the tests passing --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- Sources/XMTPiOS/Client.swift | 57 +- Sources/XMTPiOS/Conversation.swift | 458 ++++++++++------ Sources/XMTPiOS/Conversations.swift | 719 +++++++++++++++++-------- Sources/XMTPiOS/Dm.swift | 336 ++++++++++++ Sources/XMTPiOS/Extensions/Ffi.swift | 14 +- Sources/XMTPiOS/Group.swift | 9 +- Tests/XMTPTests/DmTests.swift | 205 +++++++ Tests/XMTPTests/GroupTests.swift | 63 ++- Tests/XMTPTests/IntegrationTests.swift | 2 +- Tests/XMTPTests/V3ClientTests.swift | 174 ++++++ XMTP.podspec | 2 +- 11 files changed, 1625 insertions(+), 414 deletions(-) create mode 100644 Sources/XMTPiOS/Dm.swift create mode 100644 Tests/XMTPTests/DmTests.swift diff --git a/Sources/XMTPiOS/Client.swift b/Sources/XMTPiOS/Client.swift index 68ecf5be..19ba7106 100644 --- a/Sources/XMTPiOS/Client.swift +++ b/Sources/XMTPiOS/Client.swift @@ -15,6 +15,7 @@ public enum ClientError: Error, CustomStringConvertible, LocalizedError { case creationError(String) case noV3Client(String) case noV2Client(String) + case missingInboxId public var description: String { switch self { @@ -24,6 +25,8 @@ public enum ClientError: Error, CustomStringConvertible, LocalizedError { return "ClientError.noV3Client: \(err)" case .noV2Client(let err): return "ClientError.noV2Client: \(err)" + case .missingInboxId: + return "ClientError.missingInboxId" } } @@ -197,7 +200,7 @@ public final class Client { } public static func createV3(account: SigningKey, options: ClientOptions) async throws -> Client { - let accountAddress = account.address + let accountAddress = account.address.lowercased() let inboxId = try await getOrCreateInboxId(options: options, address: accountAddress) return try await initializeClient( @@ -209,10 +212,11 @@ public final class Client { } public static func buildV3(address: String, options: ClientOptions) async throws -> Client { - let inboxId = try await getOrCreateInboxId(options: options, address: address) + let accountAddress = address.lowercased() + let inboxId = try await getOrCreateInboxId(options: options, address: accountAddress) return try await initializeClient( - accountAddress: address, + accountAddress: accountAddress, options: options, signingKey: nil, inboxId: inboxId @@ -693,6 +697,53 @@ public final class Client { return nil } } + + public func findConversation(conversationId: String) throws -> Conversation? { + guard let client = v3Client else { + throw ClientError.noV3Client("Error no V3 client initialized") + } + do { + let conversation = try client.conversation(conversationId: conversationId.hexToData) + return try conversation.toConversation(client: self) + } catch { + return nil + } + } + + public func findConversationByTopic(topic: String) throws -> Conversation? { + guard let client = v3Client else { + throw ClientError.noV3Client("Error no V3 client initialized") + } + do { + let regexPattern = #"/xmtp/mls/1/g-(.*?)/proto"# + if let regex = try? NSRegularExpression(pattern: regexPattern) { + let range = NSRange(location: 0, length: topic.utf16.count) + if let match = regex.firstMatch(in: topic, options: [], range: range) { + let conversationId = (topic as NSString).substring(with: match.range(at: 1)) + let conversation = try client.conversation(conversationId: conversationId.hexToData) + return try conversation.toConversation(client: self) + } + } + } catch { + return nil + } + return nil + } + + public func findDm(address: String) async throws -> Dm? { + guard let client = v3Client else { + throw ClientError.noV3Client("Error no V3 client initialized") + } + guard let inboxId = try await inboxIdFromAddress(address: address) else { + throw ClientError.creationError("No inboxId present") + } + do { + let conversation = try client.dmConversation(targetInboxId: inboxId) + return Dm(ffiConversation: conversation, client: self) + } catch { + return nil + } + } public func findMessage(messageId: String) throws -> MessageV3? { guard let client = v3Client else { diff --git a/Sources/XMTPiOS/Conversation.swift b/Sources/XMTPiOS/Conversation.swift index 737bb1f2..0509b924 100644 --- a/Sources/XMTPiOS/Conversation.swift +++ b/Sources/XMTPiOS/Conversation.swift @@ -24,10 +24,51 @@ public enum ConversationContainer: Codable { /// Wrapper that provides a common interface between ``ConversationV1`` and ``ConversationV2`` objects. public enum Conversation: Sendable { // TODO: It'd be nice to not have to expose these types as public, maybe we make this a struct with an enum prop instead of just an enum - case v1(ConversationV1), v2(ConversationV2), group(Group) + case v1(ConversationV1), v2(ConversationV2), group(Group), dm(Dm) public enum Version { - case v1, v2, group + case v1, v2, group, dm + } + + public var id: String { + get throws { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("id") + case .v2(_): + throw ConversationError.v2NotSupported("id") + case let .group(group): + return group.id + case let .dm(dm): + return dm.id + } + } + } + + public func isCreator() async throws -> Bool { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("isCreator") + case .v2(_): + throw ConversationError.v2NotSupported("isCreator") + case let .group(group): + return try group.isCreator() + case let .dm(dm): + return try dm.isCreator() + } + } + + public func members() async throws -> [Member] { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("members") + case .v2(_): + throw ConversationError.v2NotSupported("members") + case let .group(group): + return try await group.members + case let .dm(dm): + return try await dm.members + } } public func consentState() async throws -> ConsentState { @@ -38,6 +79,60 @@ public enum Conversation: Sendable { return try await conversationV2.client.contacts.consentList.state(address: peerAddress) case let .group(group): return try group.consentState() + case let .dm(dm): + return try dm.consentState() + } + } + + public func updateConsentState(state: ConsentState) async throws { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("updateConsentState use contact.allowAddresses instead") + case .v2(_): + throw ConversationError.v2NotSupported("updateConsentState use contact.allowAddresses instead") + case let .group(group): + try await group.updateConsentState(state: state) + case let .dm(dm): + try await dm.updateConsentState(state: state) + } + } + + public func sync() async throws { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("sync") + case .v2(_): + throw ConversationError.v2NotSupported("sync") + case let .group(group): + try await group.sync() + case let .dm(dm): + try await dm.sync() + } + } + + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("processMessage") + case .v2(_): + throw ConversationError.v2NotSupported("processMessage") + case let .group(group): + return try await group.processMessage(envelopeBytes: envelopeBytes) + case let .dm(dm): + return try await dm.processMessage(envelopeBytes: envelopeBytes) + } + } + + public func prepareMessageV3(content: T, options: SendOptions? = nil) async throws -> String { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("prepareMessageV3 use prepareMessage instead") + case .v2(_): + throw ConversationError.v2NotSupported("prepareMessageV3 use prepareMessage instead") + case let .group(group): + return try await group.prepareMessage(content: content, options: options) + case let .dm(dm): + return try await dm.prepareMessage(content: content, options: options) } } @@ -49,6 +144,8 @@ public enum Conversation: Sendable { return .v2 case .group: return .group + case let .dm(dm): + return .dm } } @@ -60,17 +157,164 @@ public enum Conversation: Sendable { return conversationV2.createdAt case let .group(group): return group.createdAt + case let .dm(dm): + return dm.createdAt + } + } + + @discardableResult public func send(content: T, options: SendOptions? = nil, fallback _: String? = nil) async throws -> String { + switch self { + case let .v1(conversationV1): + return try await conversationV1.send(content: content, options: options) + case let .v2(conversationV2): + return try await conversationV2.send(content: content, options: options) + case let .group(group): + return try await group.send(content: content, options: options) + case let .dm(dm): + return try await dm.send(content: content, options: options) + } + } + + @discardableResult public func send(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> String { + switch self { + case let .v1(conversationV1): + return try await conversationV1.send(encodedContent: encodedContent, options: options) + case let .v2(conversationV2): + return try await conversationV2.send(encodedContent: encodedContent, options: options) + case let .group(group): + return try await group.send(content: encodedContent, options: options) + case let .dm(dm): + return try await dm.send(content: encodedContent, options: options) + } + } + + /// Send a message to the conversation + public func send(text: String, options: SendOptions? = nil) async throws -> String { + switch self { + case let .v1(conversationV1): + return try await conversationV1.send(content: text, options: options) + case let .v2(conversationV2): + return try await conversationV2.send(content: text, options: options) + case let .group(group): + return try await group.send(content: text, options: options) + case let .dm(dm): + return try await dm.send(content: text, options: options) + } + } + + public var clientAddress: String { + return client.address + } + + /// The topic identifier for this conversation + public var topic: String { + switch self { + case let .v1(conversation): + return conversation.topic.description + case let .v2(conversation): + return conversation.topic + case let .group(group): + return group.topic + case let .dm(dm): + return dm.topic + } + } + + /// Returns a stream you can iterate through to receive new messages in this conversation. + /// + /// > Note: All messages in the conversation are returned by this stream. If you want to filter out messages + /// by a sender, you can check the ``Client`` address against the message's ``peerAddress``. + public func streamMessages() -> AsyncThrowingStream { + switch self { + case let .v1(conversation): + return conversation.streamMessages() + case let .v2(conversation): + return conversation.streamMessages() + case let .group(group): + return group.streamMessages() + case let .dm(dm): + return dm.streamMessages() + } + } + + public func streamDecryptedMessages() -> AsyncThrowingStream { + switch self { + case let .v1(conversation): + return conversation.streamDecryptedMessages() + case let .v2(conversation): + return conversation.streamDecryptedMessages() + case let .group(group): + return group.streamDecryptedMessages() + case let .dm(dm): + return dm.streamDecryptedMessages() + } + } + + /// List messages in the conversation + public func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecodedMessage] { + switch self { + case let .v1(conversationV1): + return try await conversationV1.messages(limit: limit, before: before, after: after, direction: direction) + case let .v2(conversationV2): + return try await conversationV2.messages(limit: limit, before: before, after: after, direction: direction) + case let .group(group): + return try await group.messages(before: before, after: after, limit: limit, direction: direction) + case let .dm(dm): + return try await dm.messages(before: before, after: after, limit: limit, direction: direction) + } + } + + public func decryptedMessages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecryptedMessage] { + switch self { + case let .v1(conversationV1): + return try await conversationV1.decryptedMessages(limit: limit, before: before, after: after, direction: direction) + case let .v2(conversationV2): + return try await conversationV2.decryptedMessages(limit: limit, before: before, after: after, direction: direction) + case let .group(group): + return try await group.decryptedMessages(before: before, after: after, limit: limit, direction: direction) + case let .dm(dm): + return try await dm.decryptedMessages(before: before, after: after, limit: limit, direction: direction) } } + public var consentProof: ConsentProofPayload? { + switch self { + case .v1(_): + return nil + case let .v2(conversationV2): + return conversationV2.consentProof + case .group(_): + return nil + case let .dm(dm): + return nil + } + } + + var client: Client { + switch self { + case let .v1(conversationV1): + return conversationV1.client + case let .v2(conversationV2): + return conversationV2.client + case let .group(group): + return group.client + case let .dm(dm): + return dm.client + } + } + + // ------- V1 V2 to be deprecated ------ + public func encodedContainer() throws -> ConversationContainer { switch self { case let .v1(conversationV1): return .v1(conversationV1.encodedContainer) case let .v2(conversationV2): return .v2(conversationV2.encodedContainer) - case let .group(group): - throw GroupError.notSupportedByGroups + case .group(_): + throw ConversationError.v3NotSupported("encodedContainer") + case .dm(_): + throw ConversationError.v3NotSupported("encodedContainer") } } @@ -82,8 +326,10 @@ public enum Conversation: Sendable { return conversationV1.peerAddress case let .v2(conversationV2): return conversationV2.peerAddress - case let .group(group): - throw GroupError.notSupportedByGroups + case .group(_): + throw ConversationError.v3NotSupported("peerAddress use members inboxId instead") + case .dm(_): + throw ConversationError.v3NotSupported("peerAddress use members inboxId instead") } } } @@ -95,8 +341,10 @@ public enum Conversation: Sendable { return [conversationV1.peerAddress] case let .v2(conversationV2): return [conversationV2.peerAddress] - case let .group(group): - throw GroupError.notSupportedByGroups + case .group(_): + throw ConversationError.v3NotSupported("peerAddresses use members inboxIds instead") + case .dm(_): + throw ConversationError.v3NotSupported("peerAddresses use members inboxIds instead") } } } @@ -107,7 +355,9 @@ public enum Conversation: Sendable { return nil case let .v2(conversationV2): return conversationV2.keyMaterial - case let .group(group): + case .group(_): + return nil + case .dm(_): return nil } } @@ -121,7 +371,9 @@ public enum Conversation: Sendable { return nil case let .v2(conversation): return conversation.context.conversationID - case let .group(group): + case .group(_): + return nil + case .dm(_): return nil } } @@ -144,31 +396,29 @@ public enum Conversation: Sendable { } } - public func decode(_ envelope: Envelope, message: FfiMessage? = nil) throws -> DecodedMessage { + public func decode(_ envelope: Envelope) throws -> DecodedMessage { switch self { case let .v1(conversationV1): return try conversationV1.decode(envelope: envelope) case let .v2(conversationV2): return try conversationV2.decode(envelope: envelope) - case let .group(group): - guard let message = message else { - throw GroupError.groupsRequireMessagePassed - } - return try MessageV3(client: client, ffiMessage: message).decode() + case .group(_): + throw ConversationError.v3NotSupported("decode use decodeV3 instead") + case .dm(_): + throw ConversationError.v3NotSupported("decode use decodeV3 instead") } } - public func decrypt(_ envelope: Envelope, message: FfiMessage? = nil) throws -> DecryptedMessage { + public func decrypt(_ envelope: Envelope) throws -> DecryptedMessage { switch self { case let .v1(conversationV1): return try conversationV1.decrypt(envelope: envelope) case let .v2(conversationV2): return try conversationV2.decrypt(envelope: envelope) - case let .group(group): - guard let message = message else { - throw GroupError.groupsRequireMessagePassed - } - return try MessageV3(client: client, ffiMessage: message).decrypt() + case .group(_): + throw ConversationError.v3NotSupported("decrypt use decryptV3 instead") + case .dm(_): + throw ConversationError.v3NotSupported("decrypt use decryptV3 instead") } } @@ -178,21 +428,25 @@ public enum Conversation: Sendable { throw RemoteAttachmentError.v1NotSupported case let .v2(conversationV2): return try await conversationV2.encode(codec: codec, content: content) - case let .group(group): - throw GroupError.notSupportedByGroups + case .group(_): + throw ConversationError.v3NotSupported("encode") + case .dm(_): + throw ConversationError.v3NotSupported("encode") } } - public func prepareMessage(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> PreparedMessage { - switch self { - case let .v1(conversationV1): - return try await conversationV1.prepareMessage(encodedContent: encodedContent, options: options) - case let .v2(conversationV2): - return try await conversationV2.prepareMessage(encodedContent: encodedContent, options: options) - case let .group(group): - throw GroupError.notSupportedByGroups - } - } + public func prepareMessage(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> PreparedMessage { + switch self { + case let .v1(conversationV1): + return try await conversationV1.prepareMessage(encodedContent: encodedContent, options: options) + case let .v2(conversationV2): + return try await conversationV2.prepareMessage(encodedContent: encodedContent, options: options) + case .group(_): + throw ConversationError.v3NotSupported("prepareMessage use prepareMessageV3 instead") + case .dm(_): + throw ConversationError.v3NotSupported("prepareMessage use prepareMessageV3 instead") + } + } public func prepareMessage(content: T, options: SendOptions? = nil) async throws -> PreparedMessage { switch self { @@ -200,8 +454,10 @@ public enum Conversation: Sendable { return try await conversationV1.prepareMessage(content: content, options: options ?? .init()) case let .v2(conversationV2): return try await conversationV2.prepareMessage(content: content, options: options ?? .init()) - case let .group(group): - throw GroupError.notSupportedByGroups + case .group(_): + throw ConversationError.v3NotSupported("prepareMessage use prepareMessageV3 instead") + case .dm(_): + throw ConversationError.v3NotSupported("prepareMessage use prepareMessageV3 instead") } } @@ -213,60 +469,13 @@ public enum Conversation: Sendable { return try await conversationV1.send(prepared: prepared) case let .v2(conversationV2): return try await conversationV2.send(prepared: prepared) - case let .group(group): - throw GroupError.notSupportedByGroups - } - } - - @discardableResult public func send(content: T, options: SendOptions? = nil, fallback _: String? = nil) async throws -> String { - switch self { - case let .v1(conversationV1): - return try await conversationV1.send(content: content, options: options) - case let .v2(conversationV2): - return try await conversationV2.send(content: content, options: options) - case let .group(group): - return try await group.send(content: content, options: options) - } - } - - @discardableResult public func send(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> String { - switch self { - case let .v1(conversationV1): - return try await conversationV1.send(encodedContent: encodedContent, options: options) - case let .v2(conversationV2): - return try await conversationV2.send(encodedContent: encodedContent, options: options) - case let .group(group): - return try await group.send(content: encodedContent, options: options) - } - } - - /// Send a message to the conversation - public func send(text: String, options: SendOptions? = nil) async throws -> String { - switch self { - case let .v1(conversationV1): - return try await conversationV1.send(content: text, options: options) - case let .v2(conversationV2): - return try await conversationV2.send(content: text, options: options) - case let .group(group): - return try await group.send(content: text, options: options) + case .group(_): + throw ConversationError.v3NotSupported("send(prepareMessage) use send(content) instead") + case .dm(_): + throw ConversationError.v3NotSupported("send(prepareMessage) use send(content) instead") } } - public var clientAddress: String { - return client.address - } - - /// The topic identifier for this conversation - public var topic: String { - switch self { - case let .v1(conversation): - return conversation.topic.description - case let .v2(conversation): - return conversation.topic - case let .group(group): - return group.topic - } - } public func streamEphemeral() throws -> AsyncThrowingStream? { switch self { @@ -274,81 +483,14 @@ public enum Conversation: Sendable { return conversation.streamEphemeral() case let .v2(conversation): return conversation.streamEphemeral() - case let .group(group): - throw GroupError.notSupportedByGroups - } - } - - /// Returns a stream you can iterate through to receive new messages in this conversation. - /// - /// > Note: All messages in the conversation are returned by this stream. If you want to filter out messages - /// by a sender, you can check the ``Client`` address against the message's ``peerAddress``. - public func streamMessages() -> AsyncThrowingStream { - switch self { - case let .v1(conversation): - return conversation.streamMessages() - case let .v2(conversation): - return conversation.streamMessages() - case let .group(group): - return group.streamMessages() - } - } - - public func streamDecryptedMessages() -> AsyncThrowingStream { - switch self { - case let .v1(conversation): - return conversation.streamDecryptedMessages() - case let .v2(conversation): - return conversation.streamDecryptedMessages() - case let .group(group): - return group.streamDecryptedMessages() - } - } - - /// List messages in the conversation - public func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecodedMessage] { - switch self { - case let .v1(conversationV1): - return try await conversationV1.messages(limit: limit, before: before, after: after, direction: direction) - case let .v2(conversationV2): - return try await conversationV2.messages(limit: limit, before: before, after: after, direction: direction) - case let .group(group): - return try await group.messages(before: before, after: after, limit: limit, direction: direction) - } - } - - public func decryptedMessages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecryptedMessage] { - switch self { - case let .v1(conversationV1): - return try await conversationV1.decryptedMessages(limit: limit, before: before, after: after, direction: direction) - case let .v2(conversationV2): - return try await conversationV2.decryptedMessages(limit: limit, before: before, after: after, direction: direction) - case let .group(group): - return try await group.decryptedMessages(before: before, after: after, limit: limit, direction: direction) - } - } - - public var consentProof: ConsentProofPayload? { - switch self { - case .v1(_): - return nil - case let .v2(conversationV2): - return conversationV2.consentProof case .group(_): - return nil + throw ConversationError.v3NotSupported("streamEphemeral") + case .dm(_): + throw ConversationError.v3NotSupported("streamEphemeral") } } - var client: Client { - switch self { - case let .v1(conversationV1): - return conversationV1.client - case let .v2(conversationV2): - return conversationV2.client - case let .group(group): - return group.client - } - } + } extension Conversation: Hashable, Equatable { diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 98dc482b..fb0e399b 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -2,7 +2,7 @@ import Foundation import LibXMTP public enum ConversationError: Error, CustomStringConvertible, LocalizedError { - case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String) + case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String), v2NotSupported(String), v3NotSupported(String) public var description: String { switch self { @@ -12,6 +12,10 @@ public enum ConversationError: Error, CustomStringConvertible, LocalizedError { return "ConversationError.recipientNotOnNetwork: Recipient is not on network" case .v1NotSupported(let str): return "ConversationError.v1NotSupported: V1 does not support: \(str)" + case .v2NotSupported(let str): + return "ConversationError.v2NotSupported: V2 does not support: \(str)" + case .v3NotSupported(let str): + return "ConversationError.v3NotSupported: V3 does not support: \(str)" } } @@ -45,17 +49,19 @@ public enum GroupError: Error, CustomStringConvertible, LocalizedError { } } -final class GroupStreamCallback: FfiConversationCallback { - let client: Client - let callback: (Group) -> Void +public enum ConversationOrder { + case createdAt, lastMessage +} - init(client: Client, callback: @escaping (Group) -> Void) { - self.client = client +final class ConversationStreamCallback: FfiConversationCallback { + let callback: (FfiConversation) -> Void + + init(callback: @escaping (FfiConversation) -> Void) { self.callback = callback } func onConversation(conversation: FfiConversation) { - self.callback(conversation.fromFFI(client: client)) + self.callback(conversation) } } @@ -121,6 +127,13 @@ public actor Conversations { } return try await v3Client.conversations().syncAllConversations() } + + public func syncAllConversations() async throws -> UInt32 { + guard let v3Client = client.v3Client else { + return 0 + } + return try await v3Client.conversations().syncAllConversations() + } public func groups(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil) async throws -> [Group] { guard let v3Client = client.v3Client else { @@ -136,19 +149,101 @@ public actor Conversations { if let limit { options.limit = Int64(limit) } - return try await v3Client.conversations().listGroups(opts: options).map { $0.fromFFI(client: client) } + return try await v3Client.conversations().listGroups(opts: options).map { $0.groupFromFFI(client: client) } + } + + public func dms(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil) async throws -> [Dm] { + if (client.hasV2Client) { + throw ConversationError.v2NotSupported("Only supported with V3 only clients use newConversation instead") + } + guard let v3Client = client.v3Client else { + return [] + } + var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil) + if let createdAfter { + options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) + } + if let createdBefore { + options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + } + if let limit { + options.limit = Int64(limit) + } + return try await v3Client.conversations().listDms(opts: options).map { $0.dmFromFFI(client: client) } + } + + public func listConversations(createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil, order: ConversationOrder = .createdAt, consentState: ConsentState? = nil) async throws -> [Conversation] { + if (client.hasV2Client) { + throw ConversationError.v2NotSupported("Only supported with V3 only clients use list instead") + } + // Todo: add ability to order and consent state + guard let v3Client = client.v3Client else { + return [] + } + var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil) + if let createdAfter { + options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) + } + if let createdBefore { + options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) + } + if let limit { + options.limit = Int64(limit) + } + let ffiConversations = try await v3Client.conversations().list(opts: options) + + let filteredConversations = try filterByConsentState(ffiConversations, consentState: consentState) + let sortedConversations = try sortConversations(filteredConversations, order: order) + + return try sortedConversations.map { try $0.toConversation(client: client) } + } + + private func sortConversations( + _ conversations: [FfiConversation], + order: ConversationOrder + ) throws -> [FfiConversation] { + switch order { + case .lastMessage: + let conversationWithTimestamp: [(FfiConversation, Int64?)] = try conversations.map { conversation in + let message = try conversation.findMessages( + opts: FfiListMessagesOptions( + sentBeforeNs: nil, + sentAfterNs: nil, + limit: 1, + deliveryStatus: nil, + direction: .descending + ) + ).first + return (conversation, message?.sentAtNs) + } + + let sortedTuples = conversationWithTimestamp.sorted { (lhs, rhs) in + (lhs.1 ?? 0) > (rhs.1 ?? 0) + } + return sortedTuples.map { $0.0 } + case .createdAt: + return conversations + } + } + + private func filterByConsentState( + _ conversations: [FfiConversation], + consentState: ConsentState? + ) throws -> [FfiConversation] { + guard let state = consentState else { return conversations } + return try conversations.filter { try $0.consentState() == state.toFFI } } public func streamGroups() async throws -> AsyncThrowingStream { AsyncThrowingStream { continuation in let ffiStreamActor = FfiStreamActor() let task = Task { - let groupCallback = GroupStreamCallback(client: self.client) { group in + let groupCallback = ConversationStreamCallback() { group in guard !Task.isCancelled else { continuation.finish() return } - continuation.yield(group) + continuation.yield(group.groupFromFFI(client: self.client)) } guard let stream = await self.client.v3Client?.conversations().streamGroups(callback: groupCallback) else { continuation.finish(throwing: GroupError.streamingFailure) @@ -176,12 +271,12 @@ public actor Conversations { let ffiStreamActor = FfiStreamActor() let task = Task { let stream = await self.client.v3Client?.conversations().streamGroups( - callback: GroupStreamCallback(client: self.client) { group in + callback: ConversationStreamCallback() { group in guard !Task.isCancelled else { continuation.finish() return } - continuation.yield(Conversation.group(group)) + continuation.yield(Conversation.group(group.groupFromFFI(client: self.client))) } ) await ffiStreamActor.setFfiStream(stream) @@ -200,6 +295,83 @@ public actor Conversations { } } } + + public func streamConversations() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + if (client.hasV2Client) { + continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 only clients use stream instead")) + return + } + let ffiStreamActor = FfiStreamActor() + let task = Task { + let stream = await self.client.v3Client?.conversations().stream( + callback: ConversationStreamCallback() { conversation in + guard !Task.isCancelled else { + continuation.finish() + return + } + do { + let conversationType = try conversation.groupMetadata().conversationType() + if conversationType == "dm" { + continuation.yield( + Conversation.dm(conversation.dmFromFFI(client: self.client)) + ) + } else if conversationType == "group" { + continuation.yield( + Conversation.group(conversation.groupFromFFI(client: self.client)) + ) + } + } catch { + // Do nothing if the conversation type is neither a group or dm + } + } + ) + await ffiStreamActor.setFfiStream(stream) + continuation.onTermination = { @Sendable reason in + Task { + await ffiStreamActor.endStream() + } + } + } + + continuation.onTermination = { @Sendable reason in + task.cancel() + Task { + await ffiStreamActor.endStream() + } + } + } + } + + public func findOrCreateDm(with peerAddress: String) async throws -> Dm { + if (client.hasV2Client) { + throw ConversationError.v2NotSupported("Only supported with V3 only clients use newConversation instead") + } + + guard let v3Client = client.v3Client else { + throw GroupError.alphaMLSNotEnabled + } + if peerAddress.lowercased() == client.address.lowercased() { + throw ConversationError.recipientIsSender + } + let canMessage = try await self.client.canMessageV3(address: peerAddress) + if !canMessage { + throw ConversationError.recipientNotOnNetwork + } + + try await client.contacts.allow(addresses: [peerAddress]) + + if let existingDm = try await client.findDm(address: peerAddress) { + return existingDm + } + + let newDm = try await v3Client.conversations() + .createDm(accountAddress: peerAddress.lowercased()) + .dmFromFFI(client: client) + + try await client.contacts.allow(addresses: [peerAddress]) + return newDm + } public func newGroup(with addresses: [String], permissions: GroupPermissionPreconfiguration = .allMembers, @@ -279,158 +451,84 @@ public actor Conversations { groupDescription: description, groupPinnedFrameUrl: pinnedFrameUrl, customPermissionPolicySet: permissionPolicySet - )).fromFFI(client: client) + )).groupFromFFI(client: client) try await client.contacts.allowGroups(groupIds: [group.id]) return group } - - /// Import a previously seen conversation. - /// See Conversation.toTopicData() - public func importTopicData(data: Xmtp_KeystoreApi_V1_TopicMap.TopicData) -> Conversation { - let conversation: Conversation - if !data.hasInvitation { - let sentAt = Date(timeIntervalSince1970: TimeInterval(data.createdNs / 1_000_000_000)) - conversation = .v1(ConversationV1(client: client, peerAddress: data.peerAddress, sentAt: sentAt)) - } else { - conversation = .v2(ConversationV2( - topic: data.invitation.topic, - keyMaterial: data.invitation.aes256GcmHkdfSha256.keyMaterial, - context: data.invitation.context, - peerAddress: data.peerAddress, - client: client, - createdAtNs: data.createdNs - )) - } - Task { - await self.addConversation(conversation) - } - return conversation - } - - public func listBatchMessages(topics: [String: Pagination?]) async throws -> [DecodedMessage] { - let requests = topics.map { topic, page in - makeQueryRequest(topic: topic, pagination: page) - } - /// The maximum number of requests permitted in a single batch call. - let maxQueryRequestsPerBatch = 50 - let batches = requests.chunks(maxQueryRequestsPerBatch) - .map { requests in BatchQueryRequest.with { $0.requests = requests } } - var messages: [DecodedMessage] = [] - // TODO: consider using a task group here for parallel batch calls - guard let apiClient = client.apiClient else { - throw ClientError.noV2Client("Error no V2 client initialized") - } - for batch in batches { - messages += try await apiClient.batchQuery(request: batch) - .responses.flatMap { res in - res.envelopes.compactMap { envelope in - let conversation = conversationsByTopic[envelope.contentTopic] - if conversation == nil { - print("discarding message, unknown conversation \(envelope)") - return nil + + public func streamAllConversationMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + if (client.hasV2Client) { + continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 clients. Use streamAllMessages instead.")) + return + } + let ffiStreamActor = FfiStreamActor() + let task = Task { + let stream = await self.client.v3Client?.conversations().streamAllMessages( + messageCallback: MessageCallback(client: self.client) { message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } + return } do { - return try conversation?.decode(envelope) + continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) } catch { - print("discarding message, unable to decode \(envelope)") - return nil + print("Error onMessage \(error)") } } - } - } - return messages - } + ) + await ffiStreamActor.setFfiStream(stream) + } - public func listBatchDecryptedMessages(topics: [String: Pagination?]) async throws -> [DecryptedMessage] { - let requests = topics.map { topic, page in - makeQueryRequest(topic: topic, pagination: page) - } - /// The maximum number of requests permitted in a single batch call. - let maxQueryRequestsPerBatch = 50 - let batches = requests.chunks(maxQueryRequestsPerBatch) - .map { requests in BatchQueryRequest.with { $0.requests = requests } } - var messages: [DecryptedMessage] = [] - // TODO: consider using a task group here for parallel batch calls - guard let apiClient = client.apiClient else { - throw ClientError.noV2Client("Error no V2 client initialized") - } - for batch in batches { - messages += try await apiClient.batchQuery(request: batch) - .responses.flatMap { res in - res.envelopes.compactMap { envelope in - let conversation = conversationsByTopic[envelope.contentTopic] - if conversation == nil { - print("discarding message, unknown conversation \(envelope)") - return nil - } - do { - return try conversation?.decrypt(envelope) - } catch { - print("discarding message, unable to decode \(envelope)") - return nil - } - } + continuation.onTermination = { _ in + task.cancel() + Task { + await ffiStreamActor.endStream() } + } } - return messages } - - func streamAllV2Messages() -> AsyncThrowingStream { + + public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - let streamManager = StreamManager() - - Task { - var topics: [String] = [ - Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description - ] - - for conversation in try await list() { - topics.append(conversation.topic) - } - - var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - - let subscriptionCallback = V2SubscriptionCallback { envelope in - Task { - do { - if let conversation = self.conversationsByTopic[envelope.contentTopic] { - let decoded = try conversation.decode(envelope) - continuation.yield(decoded) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try self.fromInvite(envelope: envelope) - await self.addConversation(conversation) - topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try self.fromIntro(envelope: envelope) - await self.addConversation(conversation) - let decoded = try conversation.decode(envelope) - continuation.yield(decoded) - topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else { - print("huh \(envelope)") + if (client.hasV2Client) { + continuation.finish(throwing: ConversationError.v2NotSupported("Only supported with V3 clients. Use streamAllMessages instead.")) + return + } + let ffiStreamActor = FfiStreamActor() + let task = Task { + let stream = await self.client.v3Client?.conversations().streamAllMessages( + messageCallback: MessageCallback(client: self.client) { message in + guard !Task.isCancelled else { + continuation.finish() + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation } + return + } + do { + continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) } catch { - continuation.finish(throwing: error) + print("Error onMessage \(error)") } } - } - let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) - streamManager.setStream(newStream) - - continuation.onTermination = { @Sendable reason in - Task { - try await streamManager.endStream() - } + ) + await ffiStreamActor.setFfiStream(stream) + } + + continuation.onTermination = { _ in + task.cancel() + Task { + await ffiStreamActor.endStream() } } } } + public func streamAllGroupMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in let ffiStreamActor = FfiStreamActor() @@ -561,80 +659,6 @@ public actor Conversations { } } - - - - func streamAllV2DecryptedMessages() -> AsyncThrowingStream { - AsyncThrowingStream { continuation in - let streamManager = StreamManager() - - Task { - var topics: [String] = [ - Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description - ] - - for conversation in try await list() { - topics.append(conversation.topic) - } - - var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - - let subscriptionCallback = V2SubscriptionCallback { envelope in - Task { - do { - if let conversation = self.conversationsByTopic[envelope.contentTopic] { - let decrypted = try conversation.decrypt(envelope) - continuation.yield(decrypted) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { - let conversation = try self.fromInvite(envelope: envelope) - await self.addConversation(conversation) - topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { - let conversation = try self.fromIntro(envelope: envelope) - await self.addConversation(conversation) - let decrypted = try conversation.decrypt(envelope) - continuation.yield(decrypted) - topics.append(conversation.topic) - subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) - try await streamManager.updateStream(with: subscriptionRequest) - } else { - print("huh \(envelope)") - } - } catch { - continuation.finish(throwing: error) - } - } - } - let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) - streamManager.setStream(newStream) - - continuation.onTermination = { @Sendable reason in - Task { - try await streamManager.endStream() - } - } - } - } - } - - public func fromInvite(envelope: Envelope) throws -> Conversation { - let sealedInvitation = try SealedInvitation(serializedData: envelope.message) - let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try .v2(ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header)) - } - - public func fromIntro(envelope: Envelope) throws -> Conversation { - let messageV1 = try MessageV1.fromBytes(envelope.message) - let senderAddress = try messageV1.header.sender.walletAddress - let recipientAddress = try messageV1.header.recipient.walletAddress - let peerAddress = client.address == senderAddress ? recipientAddress : senderAddress - let conversationV1 = ConversationV1(client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) - return .v1(conversationV1) - } - private func findExistingConversation(with peerAddress: String, conversationID: String?) throws -> Conversation? { return try conversationsByTopic.first(where: { try $0.value.peerAddress == peerAddress && (($0.value.conversationID ?? "") == (conversationID ?? "")) @@ -648,6 +672,14 @@ public actor Conversations { let group = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) return Group(ffiGroup: group, client: client) } + + public func conversationFromWelcome(envelopeBytes: Data) async throws -> Conversation? { + guard let v3Client = client.v3Client else { + return nil + } + let conversation = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) + return try conversation.toConversation(client: client) + } public func newConversation(with peerAddress: String, context: InvitationV1.Context? = nil, consentProofPayload: ConsentProofPayload? = nil) async throws -> Conversation { if peerAddress.lowercased() == client.address.lowercased() { @@ -737,12 +769,7 @@ public actor Conversations { } } } - - private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 { - let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) - } - + private func validateConsentSignature(signature: String, clientAddress: String, peerAddress: String, timestamp: UInt64) -> Bool { // timestamp should be in the past if timestamp > UInt64(Date().timeIntervalSince1970 * 1000) { @@ -864,6 +891,254 @@ public actor Conversations { } return hmacKeysResponse } + + // ------- V1 V2 to be deprecated ------ + + /// Import a previously seen conversation. + /// See Conversation.toTopicData() + public func importTopicData(data: Xmtp_KeystoreApi_V1_TopicMap.TopicData) throws -> Conversation { + if (!client.hasV2Client) { + throw ConversationError.v3NotSupported("importTopicData only supported with V2 clients") + } + let conversation: Conversation + if !data.hasInvitation { + let sentAt = Date(timeIntervalSince1970: TimeInterval(data.createdNs / 1_000_000_000)) + conversation = .v1(ConversationV1(client: client, peerAddress: data.peerAddress, sentAt: sentAt)) + } else { + conversation = .v2(ConversationV2( + topic: data.invitation.topic, + keyMaterial: data.invitation.aes256GcmHkdfSha256.keyMaterial, + context: data.invitation.context, + peerAddress: data.peerAddress, + client: client, + createdAtNs: data.createdNs + )) + } + Task { + await self.addConversation(conversation) + } + return conversation + } + + public func listBatchMessages(topics: [String: Pagination?]) async throws -> [DecodedMessage] { + if (!client.hasV2Client) { + throw ConversationError.v3NotSupported("listBatchMessages only supported with V2 clients. Use listConversations order lastMessage") + } + let requests = topics.map { topic, page in + makeQueryRequest(topic: topic, pagination: page) + } + /// The maximum number of requests permitted in a single batch call. + let maxQueryRequestsPerBatch = 50 + let batches = requests.chunks(maxQueryRequestsPerBatch) + .map { requests in BatchQueryRequest.with { $0.requests = requests } } + var messages: [DecodedMessage] = [] + // TODO: consider using a task group here for parallel batch calls + guard let apiClient = client.apiClient else { + throw ClientError.noV2Client("Error no V2 client initialized") + } + for batch in batches { + messages += try await apiClient.batchQuery(request: batch) + .responses.flatMap { res in + res.envelopes.compactMap { envelope in + let conversation = conversationsByTopic[envelope.contentTopic] + if conversation == nil { + print("discarding message, unknown conversation \(envelope)") + return nil + } + do { + return try conversation?.decode(envelope) + } catch { + print("discarding message, unable to decode \(envelope)") + return nil + } + } + } + } + return messages + } + + public func listBatchDecryptedMessages(topics: [String: Pagination?]) async throws -> [DecryptedMessage] { + if (!client.hasV2Client) { + throw ConversationError.v3NotSupported("listBatchMessages only supported with V2 clients. Use listConversations order lastMessage") + } + let requests = topics.map { topic, page in + makeQueryRequest(topic: topic, pagination: page) + } + /// The maximum number of requests permitted in a single batch call. + let maxQueryRequestsPerBatch = 50 + let batches = requests.chunks(maxQueryRequestsPerBatch) + .map { requests in BatchQueryRequest.with { $0.requests = requests } } + var messages: [DecryptedMessage] = [] + // TODO: consider using a task group here for parallel batch calls + guard let apiClient = client.apiClient else { + throw ClientError.noV2Client("Error no V2 client initialized") + } + for batch in batches { + messages += try await apiClient.batchQuery(request: batch) + .responses.flatMap { res in + res.envelopes.compactMap { envelope in + let conversation = conversationsByTopic[envelope.contentTopic] + if conversation == nil { + print("discarding message, unknown conversation \(envelope)") + return nil + } + do { + return try conversation?.decrypt(envelope) + } catch { + print("discarding message, unable to decode \(envelope)") + return nil + } + } + } + } + return messages + } + + private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 { + let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) + return try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) + } + + func streamAllV2Messages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + if (!client.hasV2Client) { + continuation.finish(throwing: ConversationError.v3NotSupported("Only supported with V2 clients. Use streamAllConversationMessages instead.")) + return + } + let streamManager = StreamManager() + + Task { + var topics: [String] = [ + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description + ] + + for conversation in try await list() { + topics.append(conversation.topic) + } + + var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + + let subscriptionCallback = V2SubscriptionCallback { envelope in + Task { + do { + if let conversation = self.conversationsByTopic[envelope.contentTopic] { + let decoded = try conversation.decode(envelope) + continuation.yield(decoded) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { + let conversation = try self.fromInvite(envelope: envelope) + await self.addConversation(conversation) + topics.append(conversation.topic) + subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + try await streamManager.updateStream(with: subscriptionRequest) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { + let conversation = try self.fromIntro(envelope: envelope) + await self.addConversation(conversation) + let decoded = try conversation.decode(envelope) + continuation.yield(decoded) + topics.append(conversation.topic) + subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + try await streamManager.updateStream(with: subscriptionRequest) + } else { + print("huh \(envelope)") + } + } catch { + continuation.finish(throwing: error) + } + } + } + let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) + streamManager.setStream(newStream) + + continuation.onTermination = { @Sendable reason in + Task { + try await streamManager.endStream() + } + } + } + } + } + + func streamAllV2DecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + let streamManager = StreamManager() + if (!client.hasV2Client) { + continuation.finish(throwing: ConversationError.v3NotSupported("Only supported with V2 clients. Use streamAllDecryptedConversationMessages instead.")) + return + } + Task { + var topics: [String] = [ + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description + ] + + for conversation in try await list() { + topics.append(conversation.topic) + } + + var subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + + let subscriptionCallback = V2SubscriptionCallback { envelope in + Task { + do { + if let conversation = self.conversationsByTopic[envelope.contentTopic] { + let decrypted = try conversation.decrypt(envelope) + continuation.yield(decrypted) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { + let conversation = try self.fromInvite(envelope: envelope) + await self.addConversation(conversation) + topics.append(conversation.topic) + subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + try await streamManager.updateStream(with: subscriptionRequest) + } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { + let conversation = try self.fromIntro(envelope: envelope) + await self.addConversation(conversation) + let decrypted = try conversation.decrypt(envelope) + continuation.yield(decrypted) + topics.append(conversation.topic) + subscriptionRequest = FfiV2SubscribeRequest(contentTopics: topics) + try await streamManager.updateStream(with: subscriptionRequest) + } else { + print("huh \(envelope)") + } + } catch { + continuation.finish(throwing: error) + } + } + } + let newStream = try await client.subscribe2(request: subscriptionRequest, callback: subscriptionCallback) + streamManager.setStream(newStream) + + continuation.onTermination = { @Sendable reason in + Task { + try await streamManager.endStream() + } + } + } + } + } + + public func fromInvite(envelope: Envelope) throws -> Conversation { + if (!client.hasV2Client) { + throw ConversationError.v3NotSupported("fromIntro only supported with V2 clients use fromWelcome instead") + } + let sealedInvitation = try SealedInvitation(serializedData: envelope.message) + let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) + return try .v2(ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header)) + } + + public func fromIntro(envelope: Envelope) throws -> Conversation { + if (!client.hasV2Client) { + throw ConversationError.v3NotSupported("fromIntro only supported with V2 clients use fromWelcome instead") + } + let messageV1 = try MessageV1.fromBytes(envelope.message) + let senderAddress = try messageV1.header.sender.walletAddress + let recipientAddress = try messageV1.header.recipient.walletAddress + let peerAddress = client.address == senderAddress ? recipientAddress : senderAddress + let conversationV1 = ConversationV1(client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) + return .v1(conversationV1) + } + private func listIntroductionPeers(pagination: Pagination?) async throws -> [String: Date] { guard let apiClient = client.apiClient else { diff --git a/Sources/XMTPiOS/Dm.swift b/Sources/XMTPiOS/Dm.swift new file mode 100644 index 00000000..d788710f --- /dev/null +++ b/Sources/XMTPiOS/Dm.swift @@ -0,0 +1,336 @@ +// +// Dm.swift +// XMTPiOS +// +// Created by Naomi Plasterer on 10/23/24. +// + +import Foundation +import LibXMTP + +public struct Dm: Identifiable, Equatable, Hashable { + var ffiConversation: FfiConversation + var client: Client + let streamHolder = StreamHolder() + + public var id: String { + ffiConversation.id().toHex + } + + public var topic: String { + Topic.groupMessage(id).description + } + + func metadata() throws -> FfiConversationMetadata { + return try ffiConversation.groupMetadata() + } + + public func sync() async throws { + try await ffiConversation.sync() + } + + public static func == (lhs: Dm, rhs: Dm) -> Bool { + lhs.id == rhs.id + } + + public func hash(into hasher: inout Hasher) { + id.hash(into: &hasher) + } + + public func isCreator() throws -> Bool { + return try metadata().creatorInboxId() == client.inboxID + } + + public func creatorInboxId() throws -> String { + return try metadata().creatorInboxId() + } + + public func addedByInboxId() throws -> String { + return try ffiConversation.addedByInboxId() + } + + public var members: [Member] { + get async throws { + return try await ffiConversation.listMembers().map { ffiGroupMember in + Member(ffiGroupMember: ffiGroupMember) + } + } + } + + public var peerInboxId: String { + get async throws { + var ids = try await members.map(\.inboxId) + if let index = ids.firstIndex(of: client.inboxID) { + ids.remove(at: index) + } + guard let inboxId = ids.first else { + throw ClientError.missingInboxId + } + return inboxId + } + } + + public var createdAt: Date { + Date(millisecondsSinceEpoch: ffiConversation.createdAtNs()) + } + + public func updateConsentState(state: ConsentState) async throws { + if (client.hasV2Client) { + switch (state) { + case .allowed: try await client.contacts.allowGroups(groupIds: [id]) + case .denied: try await client.contacts.denyGroups(groupIds: [id]) + case .unknown: () + } + } + + try ffiConversation.updateConsentState(state: state.toFFI) + } + + public func consentState() throws -> ConsentState{ + return try ffiConversation.consentState().fromFFI + } + + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { + let message = try await ffiConversation.processStreamedConversationMessage(envelopeBytes: envelopeBytes) + return MessageV3(client: client, ffiMessage: message) + } + + public func send(content: T, options: SendOptions? = nil) async throws -> String { + let encodeContent = try await encodeContent(content: content, options: options) + return try await send(encodedContent: encodeContent) + } + + public func send(encodedContent: EncodedContent) async throws -> String { + if (try consentState() == .unknown) { + try await updateConsentState(state: .allowed) + } + + let messageId = try await ffiConversation.send(contentBytes: encodedContent.serializedData()) + return messageId.toHex + } + + public func encodeContent(content: T, options: SendOptions?) async throws -> EncodedContent { + let codec = client.codecRegistry.find(for: options?.contentType) + + func encode(codec: Codec, content: Any) throws -> EncodedContent { + if let content = content as? Codec.T { + return try codec.encode(content: content, client: client) + } else { + throw CodecError.invalidContent + } + } + + var encoded = try encode(codec: codec, content: content) + + func fallback(codec: Codec, content: Any) throws -> String? { + if let content = content as? Codec.T { + return try codec.fallback(content: content) + } else { + throw CodecError.invalidContent + } + } + + if let fallback = try fallback(codec: codec, content: content) { + encoded.fallback = fallback + } + + if let compression = options?.compression { + encoded = try encoded.compress(compression) + } + + return encoded + } + + public func prepareMessage(content: T, options: SendOptions? = nil) async throws -> String { + if (try consentState() == .unknown) { + try await updateConsentState(state: .allowed) + } + + let encodeContent = try await encodeContent(content: content, options: options) + return try ffiConversation.sendOptimistic(contentBytes: try encodeContent.serializedData()).toHex + } + + public func publishMessages() async throws { + try await ffiConversation.publishMessages() + } + + public func endStream() { + self.streamHolder.stream?.end() + } + + public func streamMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + let task = Task.detached { + self.streamHolder.stream = await self.ffiConversation.stream( + messageCallback: MessageCallback(client: self.client) { message in + guard !Task.isCancelled else { + continuation.finish() + return + } + do { + continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decode()) + } catch { + print("Error onMessage \(error)") + continuation.finish(throwing: error) + } + } + ) + + continuation.onTermination = { @Sendable reason in + self.streamHolder.stream?.end() + } + } + + continuation.onTermination = { @Sendable reason in + task.cancel() + self.streamHolder.stream?.end() + } + } + } + + public func streamDecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + let task = Task.detached { + self.streamHolder.stream = await self.ffiConversation.stream( + messageCallback: MessageCallback(client: self.client) { message in + guard !Task.isCancelled else { + continuation.finish() + return + } + do { + continuation.yield(try MessageV3(client: self.client, ffiMessage: message).decrypt()) + } catch { + print("Error onMessage \(error)") + continuation.finish(throwing: error) + } + } + ) + + continuation.onTermination = { @Sendable reason in + self.streamHolder.stream?.end() + } + } + + continuation.onTermination = { @Sendable reason in + task.cancel() + self.streamHolder.stream?.end() + } + } + } + + public func messages( + before: Date? = nil, + after: Date? = nil, + limit: Int? = nil, + direction: PagingInfoSortDirection? = .descending, + deliveryStatus: MessageDeliveryStatus = .all + ) async throws -> [DecodedMessage] { + var options = FfiListMessagesOptions( + sentBeforeNs: nil, + sentAfterNs: nil, + limit: nil, + deliveryStatus: nil, + direction: nil + ) + + if let before { + options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + } + + if let after { + options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + } + + if let limit { + options.limit = Int64(limit) + } + + let status: FfiDeliveryStatus? = { + switch deliveryStatus { + case .published: + return FfiDeliveryStatus.published + case .unpublished: + return FfiDeliveryStatus.unpublished + case .failed: + return FfiDeliveryStatus.failed + default: + return nil + } + }() + + options.deliveryStatus = status + + let direction: FfiDirection? = { + switch direction { + case .ascending: + return FfiDirection.ascending + default: + return FfiDirection.descending + } + }() + + options.direction = direction + + return try ffiConversation.findMessages(opts: options).compactMap { ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage).decodeOrNull() + } + } + + public func decryptedMessages( + before: Date? = nil, + after: Date? = nil, + limit: Int? = nil, + direction: PagingInfoSortDirection? = .descending, + deliveryStatus: MessageDeliveryStatus? = .all + ) async throws -> [DecryptedMessage] { + var options = FfiListMessagesOptions( + sentBeforeNs: nil, + sentAfterNs: nil, + limit: nil, + deliveryStatus: nil, + direction: nil + ) + + if let before { + options.sentBeforeNs = Int64(before.millisecondsSinceEpoch * 1_000_000) + } + + if let after { + options.sentAfterNs = Int64(after.millisecondsSinceEpoch * 1_000_000) + } + + if let limit { + options.limit = Int64(limit) + } + + let status: FfiDeliveryStatus? = { + switch deliveryStatus { + case .published: + return FfiDeliveryStatus.published + case .unpublished: + return FfiDeliveryStatus.unpublished + case .failed: + return FfiDeliveryStatus.failed + default: + return nil + } + }() + + options.deliveryStatus = status + + let direction: FfiDirection? = { + switch direction { + case .ascending: + return FfiDirection.ascending + default: + return FfiDirection.descending + } + }() + + options.direction = direction + + return try ffiConversation.findMessages(opts: options).compactMap { ffiMessage in + return MessageV3(client: self.client, ffiMessage: ffiMessage).decryptOrNull() + } + } +} diff --git a/Sources/XMTPiOS/Extensions/Ffi.swift b/Sources/XMTPiOS/Extensions/Ffi.swift index 60e6787b..9da6402c 100644 --- a/Sources/XMTPiOS/Extensions/Ffi.swift +++ b/Sources/XMTPiOS/Extensions/Ffi.swift @@ -196,9 +196,21 @@ extension FfiV2SubscribeRequest { // MARK: Group extension FfiConversation { - func fromFFI(client: Client) -> Group { + func groupFromFFI(client: Client) -> Group { Group(ffiGroup: self, client: client) } + + func dmFromFFI(client: Client) -> Dm { + Dm(ffiConversation: self, client: client) + } + + func toConversation(client: Client) throws -> Conversation { + if (try groupMetadata().conversationType() == "dm") { + return Conversation.dm(self.dmFromFFI(client: client)) + } else { + return Conversation.group(self.groupFromFFI(client: client)) + } + } } extension FfiConversationMember { diff --git a/Sources/XMTPiOS/Group.swift b/Sources/XMTPiOS/Group.swift index 5c57cf9a..3116356a 100644 --- a/Sources/XMTPiOS/Group.swift +++ b/Sources/XMTPiOS/Group.swift @@ -229,14 +229,9 @@ public struct Group: Identifiable, Equatable, Hashable { return try ffiGroup.consentState().fromFFI } - public func processMessage(envelopeBytes: Data) async throws -> DecodedMessage { + public func processMessage(envelopeBytes: Data) async throws -> MessageV3 { let message = try await ffiGroup.processStreamedConversationMessage(envelopeBytes: envelopeBytes) - return try MessageV3(client: client, ffiMessage: message).decode() - } - - public func processMessageDecrypted(envelopeBytes: Data) async throws -> DecryptedMessage { - let message = try await ffiGroup.processStreamedConversationMessage(envelopeBytes: envelopeBytes) - return try MessageV3(client: client, ffiMessage: message).decrypt() + return MessageV3(client: client, ffiMessage: message) } public func send(content: T, options: SendOptions? = nil) async throws -> String { diff --git a/Tests/XMTPTests/DmTests.swift b/Tests/XMTPTests/DmTests.swift new file mode 100644 index 00000000..443352b4 --- /dev/null +++ b/Tests/XMTPTests/DmTests.swift @@ -0,0 +1,205 @@ +// +// DmTests.swift +// XMTPiOS +// +// Created by Naomi Plasterer on 10/23/24. +// + +import CryptoKit +import XCTest +@testable import XMTPiOS +import LibXMTP +import XMTPTestHelpers + +@available(iOS 16, *) +class DmTests: XCTestCase { + struct LocalFixtures { + var alix: PrivateKey! + var bo: PrivateKey! + var caro: PrivateKey! + var alixClient: Client! + var boClient: Client! + var caroClient: Client! + } + + func localFixtures() async throws -> LocalFixtures { + let key = try Crypto.secureRandomBytes(count: 32) + let alix = try PrivateKey.generate() + let alixClient = try await Client.createV3( + account: alix, + options: .init( + api: .init(env: .local, isSecure: false), + codecs: [GroupUpdatedCodec()], + enableV3: true, + encryptionKey: key + ) + ) + let bo = try PrivateKey.generate() + let boClient = try await Client.createV3( + account: bo, + options: .init( + api: .init(env: .local, isSecure: false), + codecs: [GroupUpdatedCodec()], + enableV3: true, + encryptionKey: key + ) + ) + let caro = try PrivateKey.generate() + let caroClient = try await Client.createV3( + account: caro, + options: .init( + api: .init(env: .local, isSecure: false), + codecs: [GroupUpdatedCodec()], + enableV3: true, + encryptionKey: key + ) + ) + + return .init( + alix: alix, + bo: bo, + caro: caro, + alixClient: alixClient, + boClient: boClient, + caroClient: caroClient + ) + } + + func testCanCreateADm() async throws { + let fixtures = try await localFixtures() + + let convo1 = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + try await fixtures.alixClient.conversations.sync() + let sameConvo1 = try await fixtures.alixClient.conversations.findOrCreateDm(with: fixtures.bo.walletAddress) + XCTAssertEqual(convo1.id, sameConvo1.id) + } + + func testCanListDmMembers() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + var members = try await dm.members + XCTAssertEqual(members.count, 2) + + let peer = try await dm.peerInboxId + XCTAssertEqual(peer, fixtures.alixClient.inboxID) + } + + func testCannotStartGroupWithSelf() async throws { + let fixtures = try await localFixtures() + + await assertThrowsAsyncError( + try await fixtures.alixClient.conversations.findOrCreateDm(with: fixtures.alix.address) + ) + } + + func testCannotStartGroupWithNonRegisteredIdentity() async throws { + let fixtures = try await localFixtures() + let nonRegistered = try PrivateKey.generate() + + await assertThrowsAsyncError( + try await fixtures.alixClient.conversations.findOrCreateDm(with: nonRegistered.address) + ) + } + + func testDmStartsWithAllowedState() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + _ = try await dm.send(content: "howdy") + _ = try await dm.send(content: "gm") + try await dm.sync() + + let isAllowed = try await fixtures.boClient.contacts.isGroupAllowed(groupId: dm.id) + let dmState = try await fixtures.boClient.contacts.consentList.groupState(groupId: dm.id) + XCTAssertTrue(isAllowed) + XCTAssertEqual(dmState, .allowed) + XCTAssertEqual(try dm.consentState(), .allowed) + } + + func testCanSendMessageToDm() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + _ = try await dm.send(content: "howdy") + let messageId = try await dm.send(content: "gm") + try await dm.sync() + + let firstMessage = try await dm.messages().first! + XCTAssertEqual(firstMessage.body, "gm") + XCTAssertEqual(firstMessage.id, messageId) + XCTAssertEqual(firstMessage.deliveryStatus, .published) + let messages = try await dm.messages() + XCTAssertEqual(messages.count, 3) + + try await fixtures.alixClient.conversations.sync() + let sameDm = try await fixtures.alixClient.conversations.dms().last! + try await sameDm.sync() + + let sameMessages = try await sameDm.messages() + XCTAssertEqual(sameMessages.count, 2) + XCTAssertEqual(sameMessages.first!.body, "gm") + } + + func testCanStreamDmMessages() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + try await fixtures.alixClient.conversations.sync() + + let expectation1 = XCTestExpectation(description: "got a message") + expectation1.expectedFulfillmentCount = 1 + + Task(priority: .userInitiated) { + for try await _ in dm.streamMessages() { + expectation1.fulfill() + } + } + + _ = try await dm.send(content: "hi") + + await fulfillment(of: [expectation1], timeout: 3) + } + + func testCanStreamAllDecryptedDmMessages() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + try await fixtures.alixClient.conversations.sync() + + let expectation1 = XCTestExpectation(description: "got a message") + expectation1.expectedFulfillmentCount = 2 + + Task(priority: .userInitiated) { + for try await _ in await fixtures.alixClient.conversations.streamAllConversationMessages() { + expectation1.fulfill() + } + } + + _ = try await dm.send(content: "hi") + let caroDm = try await fixtures.caroClient.conversations.findOrCreateDm(with: fixtures.alixClient.address) + _ = try await caroDm.send(content: "hi") + + await fulfillment(of: [expectation1], timeout: 3) + } + + func testDmConsent() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boClient.conversations.findOrCreateDm(with: fixtures.alix.walletAddress) + + let isGroup = try await fixtures.boClient.contacts.isGroupAllowed(groupId: dm.id) + XCTAssertTrue(isGroup) + XCTAssertEqual(try dm.consentState(), .allowed) + + try await fixtures.boClient.contacts.denyGroups(groupIds: [dm.id]) + let isDenied = try await fixtures.boClient.contacts.isGroupDenied(groupId: dm.id) + XCTAssertTrue(isDenied) + XCTAssertEqual(try dm.consentState(), .denied) + + try await dm.updateConsentState(state: .allowed) + let isAllowed = try await fixtures.boClient.contacts.isGroupAllowed(groupId: dm.id) + XCTAssertTrue(isAllowed) + XCTAssertEqual(try dm.consentState(), .allowed) + } +} diff --git a/Tests/XMTPTests/GroupTests.swift b/Tests/XMTPTests/GroupTests.swift index ba43bdbd..80945344 100644 --- a/Tests/XMTPTests/GroupTests.swift +++ b/Tests/XMTPTests/GroupTests.swift @@ -39,51 +39,52 @@ class GroupTests: XCTestCase { var alice: PrivateKey! var bob: PrivateKey! var fred: PrivateKey! + var davonV3: PrivateKey! var aliceClient: Client! var bobClient: Client! var fredClient: Client! + var davonV3Client: Client! } func localFixtures() async throws -> LocalFixtures { let key = try Crypto.secureRandomBytes(count: 32) + let options = ClientOptions.init( + api: .init(env: .local, isSecure: false), + codecs: [GroupUpdatedCodec()], + enableV3: true, + encryptionKey: key + ) let alice = try PrivateKey.generate() let aliceClient = try await Client.create( account: alice, - options: .init( - api: .init(env: .local, isSecure: false), - codecs: [GroupUpdatedCodec()], - enableV3: true, - encryptionKey: key - ) + options: options ) let bob = try PrivateKey.generate() let bobClient = try await Client.create( account: bob, - options: .init( - api: .init(env: .local, isSecure: false), - codecs: [GroupUpdatedCodec()], - enableV3: true, - encryptionKey: key - ) + options: options ) let fred = try PrivateKey.generate() let fredClient = try await Client.create( account: fred, - options: .init( - api: .init(env: .local, isSecure: false), - codecs: [GroupUpdatedCodec()], - enableV3: true, - encryptionKey: key - ) + options: options + ) + + let davonV3 = try PrivateKey.generate() + let davonV3Client = try await Client.createV3( + account: davonV3, + options: options ) return .init( alice: alice, bob: bob, fred: fred, + davonV3: davonV3, aliceClient: aliceClient, bobClient: bobClient, - fredClient: fredClient + fredClient: fredClient, + davonV3Client: davonV3Client ) } @@ -195,7 +196,10 @@ class GroupTests: XCTestCase { func testCanListGroups() async throws { let fixtures = try await localFixtures() _ = try await fixtures.aliceClient.conversations.newGroup(with: [fixtures.bob.address]) - + _ = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.bob.address) + _ = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) + + try await fixtures.aliceClient.conversations.sync() let aliceGroupCount = try await fixtures.aliceClient.conversations.groups().count try await fixtures.bobClient.conversations.sync() @@ -209,6 +213,8 @@ class GroupTests: XCTestCase { let fixtures = try await localFixtures() _ = try await fixtures.aliceClient.conversations.newGroup(with: [fixtures.bob.address]) _ = try await fixtures.aliceClient.conversations.newConversation(with: fixtures.bob.address) + _ = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.bob.walletAddress) + _ = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.walletAddress) let aliceGroupCount = try await fixtures.aliceClient.conversations.list(includeGroups: true).count @@ -557,6 +563,7 @@ class GroupTests: XCTestCase { } _ = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + _ = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) await fulfillment(of: [expectation1], timeout: 3) } @@ -575,6 +582,7 @@ class GroupTests: XCTestCase { _ = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + _ = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) await fulfillment(of: [expectation1], timeout: 3) } @@ -659,6 +667,8 @@ class GroupTests: XCTestCase { expectation1.expectedFulfillmentCount = 2 let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + let dm = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) + try await fixtures.aliceClient.conversations.sync() Task(priority: .userInitiated) { for try await _ in try await fixtures.aliceClient.conversations.streamAllMessages(includeGroups: true) { @@ -668,6 +678,7 @@ class GroupTests: XCTestCase { _ = try await group.send(content: "hi") _ = try await convo.send(content: "hi") + _ = try await dm.send(content: "hi") await fulfillment(of: [expectation1], timeout: 3) } @@ -680,6 +691,7 @@ class GroupTests: XCTestCase { expectation1.expectedFulfillmentCount = 2 let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + let dm = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) try await fixtures.aliceClient.conversations.sync() Task(priority: .userInitiated) { for try await _ in await fixtures.aliceClient.conversations.streamAllDecryptedMessages(includeGroups: true) { @@ -690,6 +702,7 @@ class GroupTests: XCTestCase { _ = try await group.send(content: "hi") _ = try await group.send(content: membershipChange, options: SendOptions(contentType: ContentTypeGroupUpdated)) _ = try await convo.send(content: "hi") + _ = try await dm.send(content: "hi") await fulfillment(of: [expectation1], timeout: 3) } @@ -700,6 +713,7 @@ class GroupTests: XCTestCase { let expectation1 = XCTestExpectation(description: "got a conversation") let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + let dm = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) try await fixtures.aliceClient.conversations.sync() Task(priority: .userInitiated) { for try await _ in await fixtures.aliceClient.conversations.streamAllGroupMessages() { @@ -708,6 +722,7 @@ class GroupTests: XCTestCase { } _ = try await group.send(content: "hi") + _ = try await dm.send(content: "hi") await fulfillment(of: [expectation1], timeout: 3) } @@ -717,6 +732,8 @@ class GroupTests: XCTestCase { let expectation1 = XCTestExpectation(description: "got a conversation") let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + let dm = try await fixtures.davonV3Client.conversations.findOrCreateDm(with: fixtures.alice.address) + try await fixtures.aliceClient.conversations.sync() Task(priority: .userInitiated) { for try await _ in await fixtures.aliceClient.conversations.streamAllGroupDecryptedMessages() { @@ -725,6 +742,7 @@ class GroupTests: XCTestCase { } _ = try await group.send(content: "hi") + _ = try await dm.send(content: "hi") await fulfillment(of: [expectation1], timeout: 3) } @@ -760,7 +778,10 @@ class GroupTests: XCTestCase { return case .group(let group): bobGroup = group - } + case .dm(_): + XCTFail("failed converting conversation to group") + return + } groupName = try bobGroup.groupName() XCTAssertEqual(groupName, "Start Name") diff --git a/Tests/XMTPTests/IntegrationTests.swift b/Tests/XMTPTests/IntegrationTests.swift index 8cccdd17..835e374a 100644 --- a/Tests/XMTPTests/IntegrationTests.swift +++ b/Tests/XMTPTests/IntegrationTests.swift @@ -154,7 +154,7 @@ final class IntegrationTests: XCTestCase { options: opt ) // And it uses the saved topic data for the conversation - let aliceConvo2 = await alice2.conversations.importTopicData( + let aliceConvo2 = try await alice2.conversations.importTopicData( data: try Xmtp_KeystoreApi_V1_TopicMap.TopicData(serializedData: topicData)) XCTAssertEqual("example.com/alice-bob-1", aliceConvo2.conversationID) diff --git a/Tests/XMTPTests/V3ClientTests.swift b/Tests/XMTPTests/V3ClientTests.swift index 4b0015ff..97a3c333 100644 --- a/Tests/XMTPTests/V3ClientTests.swift +++ b/Tests/XMTPTests/V3ClientTests.swift @@ -70,6 +70,100 @@ class V3ClientTests: XCTestCase { try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.alixV2.address]) ) } + + func testCanCreateDm() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + let members = try await dm.members + XCTAssertEqual(members.count, 2) + + let sameDm = try await fixtures.boV3Client.findDm(address: fixtures.caroV2V3.walletAddress) + XCTAssertEqual(sameDm?.id, dm.id) + + try await fixtures.caroV2V3Client.conversations.sync() + let caroDm = try await fixtures.caroV2V3Client.findDm(address: fixtures.boV3Client.address) + XCTAssertEqual(caroDm?.id, dm.id) + + await assertThrowsAsyncError( + try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.alixV2.walletAddress) + ) + } + + func testCanFindConversationByTopic() async throws { + let fixtures = try await localFixtures() + + let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + + let sameDm = try fixtures.boV3Client.findConversationByTopic(topic: dm.topic) + let sameGroup = try fixtures.boV3Client.findConversationByTopic(topic: group.topic) + + XCTAssertEqual(group.id, try sameGroup?.id) + XCTAssertEqual(dm.id, try sameDm?.id) + } + + func testCanListConversations() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + + let convoCount = try await fixtures.boV3Client.conversations.listConversations().count + let dmCount = try await fixtures.boV3Client.conversations.dms().count + let groupCount = try await fixtures.boV3Client.conversations.groups().count + XCTAssertEqual(convoCount, 2) + XCTAssertEqual(dmCount, 1) + XCTAssertEqual(groupCount, 1) + + try await fixtures.caroV2V3Client.conversations.sync() + let convoCount2 = try await fixtures.caroV2V3Client.conversations.list(includeGroups: true).count + let groupCount2 = try await fixtures.caroV2V3Client.conversations.groups().count + XCTAssertEqual(convoCount2, 1) + XCTAssertEqual(groupCount2, 1) + } + + func testCanListConversationsFiltered() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + + let convoCount = try await fixtures.boV3Client.conversations.listConversations().count + let convoCountConsent = try await fixtures.boV3Client.conversations.listConversations(consentState: .allowed).count + + XCTAssertEqual(convoCount, 2) + XCTAssertEqual(convoCountConsent, 2) + + try await group.updateConsentState(state: .denied) + + let convoCountAllowed = try await fixtures.boV3Client.conversations.listConversations(consentState: .allowed).count + let convoCountDenied = try await fixtures.boV3Client.conversations.listConversations(consentState: .denied).count + + XCTAssertEqual(convoCountAllowed, 1) + XCTAssertEqual(convoCountDenied, 1) + } + + func testCanListConversationsOrder() async throws { + let fixtures = try await localFixtures() + + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.walletAddress) + let group1 = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + let group2 = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.walletAddress]) + + _ = try await dm.send(content: "Howdy") + _ = try await group2.send(content: "Howdy") + _ = try await fixtures.boV3Client.conversations.syncAllConversations() + + let conversations = try await fixtures.boV3Client.conversations.listConversations() + let conversationsOrdered = try await fixtures.boV3Client.conversations.listConversations(order: .lastMessage) + + XCTAssertEqual(conversations.count, 3) + XCTAssertEqual(conversationsOrdered.count, 3) + + XCTAssertEqual(try conversations.map { try $0.id }, [dm.id, group1.id, group2.id]) + XCTAssertEqual(try conversationsOrdered.map { try $0.id }, [group2.id, dm.id, group1.id]) + } func testsCanSendMessages() async throws { let fixtures = try await localFixtures() @@ -94,6 +188,29 @@ class V3ClientTests: XCTestCase { XCTAssertEqual(sameGroupMessages?.first?.body, "gm") } + func testsCanSendMessagesToDm() async throws { + let fixtures = try await localFixtures() + let dm = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) + try await dm.send(content: "howdy") + let messageId = try await dm.send(content: "gm") + try await dm.sync() + + let dmMessages = try await dm.messages() + XCTAssertEqual(dmMessages.first?.body, "gm") + XCTAssertEqual(dmMessages.first?.id, messageId) + XCTAssertEqual(dmMessages.first?.deliveryStatus, .published) + XCTAssertEqual(dmMessages.count, 3) + + + try await fixtures.caroV2V3Client.conversations.sync() + let sameDm = try await fixtures.caroV2V3Client.findDm(address: fixtures.boV3Client.address) + try await sameDm?.sync() + + let sameDmMessages = try await sameDm?.messages() + XCTAssertEqual(sameDmMessages?.count, 2) + XCTAssertEqual(sameDmMessages?.first?.body, "gm") + } + func testGroupConsent() async throws { let fixtures = try await localFixtures() let group = try await fixtures.boV3Client.conversations.newGroup(with: [fixtures.caroV2V3.address]) @@ -149,7 +266,64 @@ class V3ClientTests: XCTestCase { XCTAssert(isAddressAllowed) XCTAssert(!isAddressDenied) } + + func testCanStreamAllMessagesFromV3Users() async throws { + let fixtures = try await localFixtures() + + let expectation1 = XCTestExpectation(description: "got a conversation") + expectation1.expectedFulfillmentCount = 2 + let convo = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) + let group = try await fixtures.caroV2V3Client.conversations.newGroup(with: [fixtures.boV3.address]) + try await fixtures.boV3Client.conversations.sync() + Task(priority: .userInitiated) { + for try await _ in await fixtures.boV3Client.conversations.streamAllConversationMessages() { + expectation1.fulfill() + } + } + _ = try await group.send(content: "hi") + _ = try await convo.send(content: "hi") + + await fulfillment(of: [expectation1], timeout: 3) + } + + func testCanStreamAllDecryptedMessagesFromV3Users() async throws { + let fixtures = try await localFixtures() + + let expectation1 = XCTestExpectation(description: "got a conversation") + expectation1.expectedFulfillmentCount = 2 + let convo = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) + let group = try await fixtures.caroV2V3Client.conversations.newGroup(with: [fixtures.boV3.address]) + try await fixtures.boV3Client.conversations.sync() + Task(priority: .userInitiated) { + for try await _ in await fixtures.boV3Client.conversations.streamAllDecryptedConversationMessages() { + expectation1.fulfill() + } + } + + _ = try await group.send(content: "hi") + _ = try await convo.send(content: "hi") + + await fulfillment(of: [expectation1], timeout: 3) + } + + func testCanStreamGroupsAndConversationsFromV3Users() async throws { + let fixtures = try await localFixtures() + + let expectation1 = XCTestExpectation(description: "got a conversation") + expectation1.expectedFulfillmentCount = 2 + + Task(priority: .userInitiated) { + for try await _ in await fixtures.boV3Client.conversations.streamConversations() { + expectation1.fulfill() + } + } + + _ = try await fixtures.caroV2V3Client.conversations.newGroup(with: [fixtures.boV3.address]) + _ = try await fixtures.boV3Client.conversations.findOrCreateDm(with: fixtures.caroV2V3.address) + + await fulfillment(of: [expectation1], timeout: 3) + } func testCanStreamAllMessagesFromV2andV3Users() async throws { let fixtures = try await localFixtures() diff --git a/XMTP.podspec b/XMTP.podspec index e3f0e969..505cf860 100644 --- a/XMTP.podspec +++ b/XMTP.podspec @@ -16,7 +16,7 @@ Pod::Spec.new do |spec| # spec.name = "XMTP" - spec.version = "0.15.2" + spec.version = "0.16.0" spec.summary = "XMTP SDK Cocoapod" # This description is used to generate tags and improve search results.