From c489f1aaf172136feabc1d11b4577e7dc0dff080 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Mon, 27 Nov 2023 11:42:56 -0800 Subject: [PATCH] Add more decrypted streaming --- Sources/XMTP/Conversation.swift | 9 +++++++++ Sources/XMTP/ConversationV1.swift | 11 +++++++++++ Sources/XMTP/ConversationV2.swift | 12 ++++++++++++ 3 files changed, 32 insertions(+) diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index 20b4dd74..16e1956f 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -220,6 +220,15 @@ public enum Conversation: Sendable { } } + public func streamDecryptedMessages() -> AsyncThrowingStream { + switch self { + case let .v1(conversation): + return conversation.streamDecryptedMessages() + case let .v2(conversation): + return conversation.streamDecryptedMessages() + } + } + /// List messages in the conversation public func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecodedMessage] { switch self { diff --git a/Sources/XMTP/ConversationV1.swift b/Sources/XMTP/ConversationV1.swift index d01510c3..282a11e6 100644 --- a/Sources/XMTP/ConversationV1.swift +++ b/Sources/XMTP/ConversationV1.swift @@ -162,6 +162,17 @@ public struct ConversationV1 { } } + public func streamDecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + for try await envelope in client.subscribe(topics: [topic.description]) { + let decoded = try decrypt(envelope: envelope) + continuation.yield(decoded) + } + } + } + } + var ephemeralTopic: String { topic.description.replacingOccurrences(of: "/xmtp/0/dm-", with: "/xmtp/0/dmE-") } diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index 887aafba..05847157 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -176,6 +176,18 @@ public struct ConversationV2 { } } + public func streamDecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + for try await envelope in client.subscribe(topics: [topic.description]) { + let decoded = try decrypt(envelope: envelope) + + continuation.yield(decoded) + } + } + } + } + public var createdAt: Date { Date(timeIntervalSince1970: Double(header.createdNs / 1_000_000) / 1000) }