Skip to content

Commit

Permalink
V3 group streaming (#239)
Browse files Browse the repository at this point in the history
* ui updates, fix lint

* Add streaming

* use convo id

* bump podspec
  • Loading branch information
nakajima authored Feb 8, 2024
1 parent 15d34aa commit 8b5706c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 11 deletions.
2 changes: 0 additions & 2 deletions Sources/XMTPiOS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ public final class Client {
let address = try v1Bundle.identityKey.publicKey.recoverWalletSignerPublicKey().walletAddress

let options = options ?? ClientOptions()

let client = try await LibXMTP.createV2Client(host: options.api.env.url, isSecure: options.api.env.isSecure)
let apiClient = try GRPCApiClient(
environment: options.api.env,
Expand Down Expand Up @@ -302,7 +301,6 @@ public final class Client {

public static func canMessage(_ peerAddress: String, options: ClientOptions? = nil) async throws -> Bool {
let options = options ?? ClientOptions()

let client = try await LibXMTP.createV2Client(host: options.api.env.url, isSecure: options.api.env.isSecure)
let apiClient = try GRPCApiClient(
environment: options.api.env,
Expand Down
27 changes: 27 additions & 0 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,25 @@ public enum GroupError: Error, CustomStringConvertible {
}
}

final class GroupStreamCallback: FfiConversationCallback {
let client: Client
let callback: (Group) -> Void

init(client: Client, callback: @escaping (Group) -> Void) {
self.client = client
self.callback = callback
}

func onConversation(conversation: FfiGroup) {
self.callback(conversation.fromFFI(client: client))
}
}

/// Handles listing and creating Conversations.
public actor Conversations {
var client: Client
var conversationsByTopic: [String: Conversation] = [:]
let streamHolder = StreamHolder()

init(client: Client) {
self.client = client
Expand Down Expand Up @@ -72,6 +87,18 @@ public actor Conversations {
return try await v3Client.conversations().list(opts: options).map { $0.fromFFI(client: client) }
}

public func streamGroups() async throws -> AsyncThrowingStream<Group, Error> {
AsyncThrowingStream { continuation in
Task {
self.streamHolder.stream = try await self.client.v3Client?.conversations().stream(
callback: GroupStreamCallback(client: self.client) { group in
continuation.yield(group)
}
)
}
}
}

public func newGroup(with addresses: [String]) async throws -> Group {
guard let v3Client = client.v3Client else {
throw GroupError.alphaMLSNotEnabled
Expand Down
16 changes: 16 additions & 0 deletions Sources/XMTPiOS/Extensions/Ffi.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ extension FfiV2SubscribeRequest {
}
}

// MARK: Messages

extension FfiMessage {
func fromFFI(client: Client) throws -> DecodedMessage {
let encodedContent = try EncodedContent(serializedData: content)

return DecodedMessage(
client: client,
topic: convoId.toHex,
encodedContent: encodedContent,
senderAddress: addrFrom,
sent: Date(timeIntervalSince1970: TimeInterval(sentAtNs / 1_000_000_000))
)
}
}

// MARK: Group

extension FfiGroup {
Expand Down
53 changes: 44 additions & 9 deletions Sources/XMTPiOS/Group.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,32 @@
import Foundation
import LibXMTP

final class MessageCallback: FfiMessageCallback {
let client: XMTPiOS.Client
let callback: (DecodedMessage) -> Void

init(client: XMTPiOS.Client, _ callback: @escaping (DecodedMessage) -> Void) {
self.client = client
self.callback = callback
}

func onMessage(message: LibXMTP.FfiMessage) {
do {
try callback(message.fromFFI(client: client))
} catch {
print("Error onMessage \(error)")
}
}
}

final class StreamHolder {
var stream: FfiStreamCloser?
}

public struct Group: Identifiable, Equatable, Hashable {
var ffiGroup: FfiGroup
var client: Client
let streamHolder = StreamHolder()

struct Member {
var ffiGroupMember: FfiGroupMember
Expand Down Expand Up @@ -87,6 +110,26 @@ public struct Group: Identifiable, Equatable, Hashable {
try await ffiGroup.send(contentBytes: encoded.serializedData())
}

public func endStream() {
self.streamHolder.stream?.end()
}

public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
Task.detached {
do {
self.streamHolder.stream = try await ffiGroup.stream(
messageCallback: MessageCallback(client: self.client) { message in
continuation.yield(message)
}
)
} catch {
print("STREAM ERR: \(error)")
}
}
}
}

public func messages(before: Date? = nil, after: Date? = nil, limit: Int? = nil) async throws -> [DecodedMessage] {
var options = FfiListMessagesOptions(sentBeforeNs: nil, sentAfterNs: nil, limit: nil)

Expand All @@ -105,15 +148,7 @@ public struct Group: Identifiable, Equatable, Hashable {
let messages = try ffiGroup.findMessages(opts: options)

return try messages.map { ffiMessage in
let encodedContent = try EncodedContent(serializedData: ffiMessage.content)

return DecodedMessage(
client: client,
topic: id.toHex,
encodedContent: encodedContent,
senderAddress: ffiMessage.addrFrom,
sent: Date(timeIntervalSince1970: TimeInterval(ffiMessage.sentAtNs / 1_000_000_000))
)
try ffiMessage.fromFFI(client: client)
}
}
}
12 changes: 12 additions & 0 deletions XMTPiOSExample/XMTPiOSExample/Views/ConversationListView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ struct ConversationListView: View {
.task {
await loadConversations()
}
.task {
do {
for try await group in try await client.conversations.streamGroups() {
conversations.insert(.group(group), at: 0)

await add(conversations: [.group(group)])
}

} catch {
print("Error streaming groups: \(error)")
}
}
.task {
do {
for try await conversation in await client.conversations.stream() {
Expand Down
9 changes: 9 additions & 0 deletions XMTPiOSExample/XMTPiOSExample/Views/GroupDetailView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ struct GroupDetailView: View {
.task {
await loadMessages()
}
.task {
do {
for try await _ in group.streamMessages() {
await loadMessages()
}
} catch {
print("Erorr streaming group messages \(error)")
}
}

MessageComposerView { text in
do {
Expand Down

0 comments on commit 8b5706c

Please sign in to comment.