Skip to content

Commit

Permalink
Add more decrypted streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
nakajima committed Nov 27, 2023
1 parent e205664 commit c489f1a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Sources/XMTP/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ public enum Conversation: Sendable {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
switch self {
case let .v1(conversation):
return conversation.streamDecryptedMessages()
case let .v2(conversation):
return conversation.streamDecryptedMessages()
}
}

/// List messages in the conversation
public func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil, direction: PagingInfoSortDirection? = .descending) async throws -> [DecodedMessage] {
switch self {
Expand Down
11 changes: 11 additions & 0 deletions Sources/XMTP/ConversationV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ public struct ConversationV1 {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
for try await envelope in client.subscribe(topics: [topic.description]) {
let decoded = try decrypt(envelope: envelope)
continuation.yield(decoded)
}
}
}
}

var ephemeralTopic: String {
topic.description.replacingOccurrences(of: "/xmtp/0/dm-", with: "/xmtp/0/dmE-")
}
Expand Down
12 changes: 12 additions & 0 deletions Sources/XMTP/ConversationV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ public struct ConversationV2 {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
for try await envelope in client.subscribe(topics: [topic.description]) {
let decoded = try decrypt(envelope: envelope)

continuation.yield(decoded)
}
}
}
}

public var createdAt: Date {
Date(timeIntervalSince1970: Double(header.createdNs / 1_000_000) / 1000)
}
Expand Down

0 comments on commit c489f1a

Please sign in to comment.