diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index 20b4dd74..16e1956f 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -220,6 +220,15 @@ public enum Conversation: Sendable { } } + public func streamDecryptedMessages() -> AsyncThrowingStream { + switch self { + case let .v1(conversation): + return conversation.streamDecryptedMessages() + case let .v2(conversation): + return conversation.streamDecryptedMessages() + } + } + /// List messages in the conversation public func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecodedMessage] { switch self { diff --git a/Sources/XMTP/ConversationV1.swift b/Sources/XMTP/ConversationV1.swift index d01510c3..282a11e6 100644 --- a/Sources/XMTP/ConversationV1.swift +++ b/Sources/XMTP/ConversationV1.swift @@ -162,6 +162,17 @@ public struct ConversationV1 { } } + public func streamDecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + for try await envelope in client.subscribe(topics: [topic.description]) { + let decoded = try decrypt(envelope: envelope) + continuation.yield(decoded) + } + } + } + } + var ephemeralTopic: String { topic.description.replacingOccurrences(of: "/xmtp/0/dm-", with: "/xmtp/0/dmE-") } diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index 887aafba..05847157 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -176,6 +176,18 @@ public struct ConversationV2 { } } + public func streamDecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + for try await envelope in client.subscribe(topics: [topic.description]) { + let decoded = try decrypt(envelope: envelope) + + continuation.yield(decoded) + } + } + } + } + public var createdAt: Date { Date(timeIntervalSince1970: Double(header.createdNs / 1_000_000) / 1000) }