Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe2 #226

Closed
wants to merge 11 commits into from
30 changes: 27 additions & 3 deletions Sources/XMTPTestHelpers/TestHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,35 @@ class FakeStreamHolder: ObservableObject {

@available(iOS 15, *)
public class FakeApiClient: ApiClient {
public func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: XMTPiOS.Envelope, subscription: LibXMTP.FfiV2Subscription), Error> {
AsyncThrowingStream { continuation in
self.cancellable = stream.$envelope.sink(receiveValue: { env in
if let env, topics.contains(env.contentTopic) {
Task {
let request = SubscribeRequest.with { $0.contentTopics = topics }
try continuation.yield((env, await self.subscribe2(request: request)))
}
}
})
}
}

public func subscribe2(request: XMTPiOS.SubscribeRequest) async throws -> LibXMTP.FfiV2Subscription {
return try await rustClient.subscribe(request: request.toFFI)
}

public func makeSubscribeRequest(topics: [String]) -> XMTPiOS.SubscribeRequest {
return SubscribeRequest.with { $0.contentTopics = topics }
}

public func envelopes(topic: String, pagination: XMTPiOS.Pagination?) async throws -> [XMTPiOS.Envelope] {
try await query(topic: topic, pagination: pagination).envelopes
}

public var environment: XMTPEnvironment
public var authToken: String = ""
public var appVersion: String
public var rustClient: LibXMTP.FfiV2ApiClient
private var responses: [String: [XMTPiOS.Envelope]] = [:]
private var stream = FakeStreamHolder()
public var published: [XMTPiOS.Envelope] = []
Expand Down Expand Up @@ -118,9 +140,10 @@ public class FakeApiClient: ApiClient {
responses[topic.description] = responsesForTopic
}

public init() {
public init() async throws {
environment = .local
appVersion = "test/0.0.0"
rustClient = try await LibXMTP.createV2Client(host: GRPCApiClient.envToUrl(env: .local), isSecure: false)
}

public func send(envelope: XMTPiOS.Envelope) {
Expand All @@ -137,9 +160,10 @@ public class FakeApiClient: ApiClient {

// MARK: ApiClient conformance

public required init(environment: XMTPiOS.XMTPEnvironment, secure _: Bool, rustClient _: LibXMTP.FfiV2ApiClient, appVersion: String?) throws {
public required init(environment: XMTPiOS.XMTPEnvironment, secure _: Bool, rustClient: LibXMTP.FfiV2ApiClient, appVersion: String?) throws {
self.environment = environment
self.appVersion = appVersion ?? "0.0.0"
self.rustClient = rustClient
}

public func subscribe(topics: [String]) -> AsyncThrowingStream<XMTPiOS.Envelope, Error> {
Expand Down Expand Up @@ -269,7 +293,7 @@ public struct Fixtures {
alice = try PrivateKey.generate()
bob = try PrivateKey.generate()

fakeApiClient = FakeApiClient()
fakeApiClient = try await FakeApiClient()

aliceClient = try await Client.create(account: alice, apiClient: fakeApiClient)
bobClient = try await Client.create(account: bob, apiClient: fakeApiClient)
Expand Down
11 changes: 8 additions & 3 deletions Sources/XMTPiOS/ApiClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ protocol ApiClient: Sendable {
func envelopes(topic: String, pagination: Pagination?) async throws -> [Envelope]
func publish(envelopes: [Envelope]) async throws
func publish(request: PublishRequest) async throws
func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error>
func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error>
func makeSubscribeRequest(topics: [String]) -> SubscribeRequest
}

func makeQueryRequest(topic: String, pagination: Pagination? = nil, cursor: Cursor? = nil) -> QueryRequest {
Expand Down Expand Up @@ -137,8 +138,12 @@ final class GRPCApiClient: ApiClient {

return envelopes
}

func makeSubscribeRequest(topics: [String]) -> SubscribeRequest {
return SubscribeRequest.with { $0.contentTopics = topics }
}

func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error> {
func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Touple probably isn't the best idea to solve this. But the simplest for now.

return AsyncThrowingStream { continuation in
Task {
let request = SubscribeRequest.with { $0.contentTopics = topics }
Expand All @@ -153,7 +158,7 @@ final class GRPCApiClient: ApiClient {

while true {
let nextEnvelope = try await subscription.next()
continuation.yield(nextEnvelope.fromFFI)
continuation.yield((envelope: nextEnvelope.fromFFI, subscription: subscription))
}
} catch {
throw ApiClientError.subscribeError(error.localizedDescription)
Expand Down
8 changes: 6 additions & 2 deletions Sources/XMTPiOS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,18 @@ public final class Client {
try await apiClient.publish(envelopes: envelopes)
}

public func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error> {
public func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> {
return apiClient.subscribe(topics: topics)
}

public func subscribe(topics: [Topic]) -> AsyncThrowingStream<Envelope, Error> {
public func subscribe(topics: [Topic]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> {
return subscribe(topics: topics.map(\.description))
}

public func makeSubscribeRequest(topics: [String]) -> SubscribeRequest {
return apiClient.makeSubscribeRequest(topics: topics)
}

public func deleteLocalDatabase() throws {
let fm = FileManager.default
try fm.removeItem(atPath: dbPath)
Expand Down
6 changes: 3 additions & 3 deletions Sources/XMTPiOS/ConversationV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public struct ConversationV1 {
public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
for try await envelope in client.subscribe(topics: [topic.description]) {
for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) {
let decoded = try decode(envelope: envelope)
continuation.yield(decoded)
}
Expand All @@ -165,7 +165,7 @@ public struct ConversationV1 {
public func streamDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
for try await envelope in client.subscribe(topics: [topic.description]) {
for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) {
let decoded = try decrypt(envelope: envelope)
continuation.yield(decoded)
}
Expand All @@ -181,7 +181,7 @@ public struct ConversationV1 {
AsyncThrowingStream { continuation in
Task {
do {
for try await envelope in client.subscribe(topics: [ephemeralTopic]) {
for try await (envelope, subscription) in client.subscribe(topics: [ephemeralTopic]) {
continuation.yield(envelope)
}
} catch {
Expand Down
6 changes: 3 additions & 3 deletions Sources/XMTPiOS/ConversationV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public struct ConversationV2 {
AsyncThrowingStream { continuation in
Task {
do {
for try await envelope in client.subscribe(topics: [ephemeralTopic]) {
for try await (envelope, subscription) in client.subscribe(topics: [ephemeralTopic]) {
continuation.yield(envelope)
}
} catch {
Expand All @@ -175,7 +175,7 @@ public struct ConversationV2 {
public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
for try await envelope in client.subscribe(topics: [topic.description]) {
for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) {
let decoded = try decode(envelope: envelope)

continuation.yield(decoded)
Expand All @@ -187,7 +187,7 @@ public struct ConversationV2 {
public func streamDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
for try await envelope in client.subscribe(topics: [topic.description]) {
for try await (envelope, subscription) in client.subscribe(topics: [topic.description]) {
let decoded = try decrypt(envelope: envelope)

continuation.yield(decoded)
Expand Down
61 changes: 32 additions & 29 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Foundation
import LibXMTP

public enum ConversationError: Error, CustomStringConvertible {
case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String)
case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String), streamingIssue(String)

public var description: String {
switch self {
Expand All @@ -13,6 +13,9 @@ public enum ConversationError: Error, CustomStringConvertible {
case .v1NotSupported(let str):
return "ConversationError.v1NotSupported: V1 does not support: \(str)"
}
case .streamingIssue(let str):
return "ConversationError.streamingIssue: \(str)"
}
}
}

Expand Down Expand Up @@ -242,33 +245,33 @@ public actor Conversations {
func streamAllV2Messages() async throws -> AsyncThrowingStream<DecodedMessage, Error> {
return AsyncThrowingStream { continuation in
Task {
while true {
var topics: [String] = [
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
]
var topics: [String] = [
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
]

for conversation in try await list() {
topics.append(conversation.topic)
}
for conversation in try await list() {
topics.append(conversation.topic)
}

while(true) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where this while loop should actually live. 🤔

do {
for try await envelope in client.subscribe(topics: topics) {
for try await (envelope, subscription) in client.subscribe(topics: topics) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a scenario where we returned the subscription independent of this for loop but the await function never completed. It may be my lack of knowledge of iOS threads/streams. But you can see that work in the previous commit history.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zombieobject I've got some iOS threading questions for you next week 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nplasterer Thanks for looping me in, I'll have a look. 👨🏼‍🚒

if let conversation = conversationsByTopic[envelope.contentTopic] {
let decoded = try conversation.decode(envelope)
continuation.yield(decoded)
} else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") {
let conversation = try fromInvite(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
break // Break so we can resubscribe with the new conversation
topics.append(conversation.topic)
try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this update method want the entire list of topics or just the new topics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is also blocking from the for loop continuing..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes the entire list of topics

} else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") {
let conversation = try fromIntro(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
let decoded = try conversation.decode(envelope)
continuation.yield(decoded)
break // Break so we can resubscribe with the new conversation
} else {
print("huh \(envelope)")
topics.append(conversation.topic)
try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI)
}
}
} catch {
Expand Down Expand Up @@ -373,33 +376,33 @@ public actor Conversations {
func streamAllV2DecryptedMessages() async throws -> AsyncThrowingStream<DecryptedMessage, Error> {
return AsyncThrowingStream { continuation in
Task {
var topics: [String] = [
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
]

for conversation in try await list() {
topics.append(conversation.topic)
}

while true {
var topics: [String] = [
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
]

for conversation in try await list() {
topics.append(conversation.topic)
}

do {
for try await envelope in client.subscribe(topics: topics) {
for try await (envelope, subscription) in client.subscribe(topics: topics) {
if let conversation = conversationsByTopic[envelope.contentTopic] {
let decoded = try conversation.decrypt(envelope)
continuation.yield(decoded)
} else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") {
let conversation = try fromInvite(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
break // Break so we can resubscribe with the new conversation
topics.append(conversation.topic)
try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI)
} else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") {
let conversation = try fromIntro(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
let decoded = try conversation.decrypt(envelope)
continuation.yield(decoded)
break // Break so we can resubscribe with the new conversation
} else {
print("huh \(envelope)")
topics.append(conversation.topic)
try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI)
}
}
} catch {
Expand Down Expand Up @@ -474,7 +477,7 @@ public actor Conversations {
Task {
var streamedConversationTopics: Set<String> = []

for try await envelope in client.subscribe(topics: [.userIntro(client.address), .userInvite(client.address)]) {
for try await (envelope, subscription) in client.subscribe(topics: [.userIntro(client.address), .userInvite(client.address)]) {
if envelope.contentTopic == Topic.userIntro(client.address).description {
let conversationV1 = try fromIntro(envelope: envelope)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/xmtp/libxmtp-swift",
"state" : {
"revision" : "f6571b6b771b4c0a2a9469bd407ce124a9d91fed",
"version" : "0.4.2-beta3"
"revision" : "28ee27a4ded8b996a74850e366247e9fe51d782a",
"version" : "0.4.2-beta4"
}
},
{
Expand Down
Loading