Skip to content

Commit

Permalink
Add streamAllDecryptedMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
nakajima committed Nov 27, 2023
1 parent a1e5d9c commit 7208f37
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions Sources/XMTP/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,46 @@ public actor Conversations {
}
}

public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream<DecryptedMessage, Error> {
return AsyncThrowingStream { continuation in
Task {
while true {
var topics: [String] = [
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
]

for conversation in try await list() {
topics.append(conversation.topic)
}

do {
for try await envelope in client.subscribe(topics: topics) {
if let conversation = conversationsByTopic[envelope.contentTopic] {
let decoded = try conversation.decrypt(envelope)
continuation.yield(decoded)
} else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") {
let conversation = try fromInvite(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
break // Break so we can resubscribe with the new conversation
} else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") {
let conversation = try fromIntro(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
let decoded = try conversation.decrypt(envelope)
continuation.yield(decoded)
break // Break so we can resubscribe with the new conversation
} else {
print("huh \(envelope)")
}
}
} catch {
continuation.finish(throwing: error)
}
}
}
}
}

public func fromInvite(envelope: Envelope) throws -> Conversation {
let sealedInvitation = try SealedInvitation(serializedData: envelope.message)
let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys)
Expand Down

0 comments on commit 7208f37

Please sign in to comment.