Skip to content

Commit

Permalink
Merge pull request #120 from xmtp/np/fix-concurrency-thread-issue
Browse files Browse the repository at this point in the history
fix: concurrent threading issue
  • Loading branch information
nplasterer authored Sep 20, 2023
2 parents 583bbc1 + 3e75bf6 commit 66126de
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 58 deletions.
8 changes: 4 additions & 4 deletions example/ios/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,15 @@ PODS:
- GenericJSON (~> 2.0)
- Logging (~> 1.0.0)
- secp256k1.swift (~> 0.1)
- XMTP (0.5.7-alpha0):
- XMTP (0.5.8-alpha0):
- Connect-Swift
- GzipSwift
- web3.swift
- XMTPRust (= 0.3.1-beta0)
- XMTPReactNative (0.1.0):
- ExpoModulesCore
- MessagePacker
- XMTP (= 0.5.7-alpha0)
- XMTP (= 0.5.8-alpha0)
- XMTPRust (0.3.1-beta0)
- Yoga (1.14.0)

Expand Down Expand Up @@ -680,8 +680,8 @@ SPEC CHECKSUMS:
secp256k1.swift: a7e7a214f6db6ce5db32cc6b2b45e5c4dd633634
SwiftProtobuf: b70d65f419fbfe61a2d58003456ca5da58e337d6
web3.swift: 2263d1e12e121b2c42ffb63a5a7beb1acaf33959
XMTP: 1b584f662fff8f006987dba2d5b1c9cacc929966
XMTPReactNative: a8057fb0fa8ab1decf2f70414a7359b2b194055c
XMTP: 381e951a5f74ba9751504b50da3bad8c2ab89a20
XMTPReactNative: 2c4fa8f24957002cfef22b73ff58fb8f08c47cd2
XMTPRust: 78f65f77b1454392980da244961777aee955652f
Yoga: 065f0b74dba4832d6e328238de46eb72c5de9556

Expand Down
158 changes: 105 additions & 53 deletions ios/XMTPModule.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,52 @@ extension Conversation {
}

public class XMTPModule: Module {
var clients: [String: XMTP.Client] = [:]
var signer: ReactNativeSigner?
var conversations: [String: Conversation] = [:]
var subscriptions: [String: Task<Void, Never>] = [:]
let clientsManager = ClientsManager()
let conversationsManager = ConversationsManager()
let subscriptionsManager = SubscriptionsManager()

actor ClientsManager {
private var clients: [String: XMTP.Client] = [:]

// A method to update the conversations
func updateClient(key: String, client: XMTP.Client?) {
clients[key] = client
}

// A method to retrieve a conversation
func getClient(key: String) -> XMTP.Client? {
return clients[key]
}
}

actor ConversationsManager {
private var conversations: [String: Conversation] = [:]

// A method to update the conversations
func updateConversation(key: String, conversation: Conversation?) {
conversations[key] = conversation
}

// A method to retrieve a conversation
func getConversation(key: String) -> Conversation? {
return conversations[key]
}
}

actor SubscriptionsManager {
private var subscriptions: [String: Task<Void, Never>] = [:]

// A method to update the subscriptions
func updateSubscription(key: String, task: Task<Void, Never>?) {
subscriptions[key] = task
}

// A method to retrieve a subscription
func getSubscription(key: String) -> Task<Void, Never>? {
return subscriptions[key]
}
}

enum Error: Swift.Error {
case noClient, conversationNotFound(String), noMessage, invalidKeyBundle, invalidDigest, badPreparation(String)
Expand All @@ -85,8 +127,8 @@ public class XMTPModule: Module {

Events("sign", "authed", "conversation", "message")

Function("address") { (clientAddress: String) -> String in
if let client = clients[clientAddress] {
AsyncFunction("address") { (clientAddress: String) -> String in
if let client = await clientsManager.getClient(key: clientAddress) {
return client.address
} else {
return "No Client."
Expand All @@ -100,7 +142,7 @@ public class XMTPModule: Module {
let signer = ReactNativeSigner(module: self, address: address)
self.signer = signer
let options = createClientConfig(env: environment, appVersion: appVersion)
self.clients[address] = try await XMTP.Client.create(account: signer, options: options)
await clientsManager.updateClient(key: address, client: try await XMTP.Client.create(account: signer, options: options))
self.signer = nil
sendEvent("authed")
}
Expand All @@ -115,7 +157,7 @@ public class XMTPModule: Module {
let options = createClientConfig(env: environment, appVersion: appVersion)
let client = try await Client.create(account: privateKey, options: options)

self.clients[client.address] = client
await clientsManager.updateClient(key: client.address, client: client)
return client.address
}

Expand All @@ -129,7 +171,7 @@ public class XMTPModule: Module {

let options = createClientConfig(env: environment, appVersion: appVersion)
let client = try await Client.from(bundle: bundle, options: options)
self.clients[client.address] = client
await clientsManager.updateClient(key: client.address, client: client)
return client.address
} catch {
print("ERRO! Failed to create client: \(error)")
Expand All @@ -139,7 +181,7 @@ public class XMTPModule: Module {

// Export the client's serialized key bundle.
AsyncFunction("exportKeyBundle") { (clientAddress: String) -> String in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}
let bundle = try client.privateKeyBundle.serializedData().base64EncodedString()
Expand All @@ -156,29 +198,29 @@ public class XMTPModule: Module {

// Import a conversation from its serialized topic data.
AsyncFunction("importConversationTopicData") { (clientAddress: String, topicData: String) -> String in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}
let data = try Xmtp_KeystoreApi_V1_TopicMap.TopicData(
serializedData: Data(base64Encoded: Data(topicData.utf8))!
)
let conversation = client.conversations.importTopicData(data: data)
conversations[conversation.cacheKey(clientAddress)] = conversation
let conversation = try await client.conversations.importTopicData(data: data)
await conversationsManager.updateConversation(key: conversation.cacheKey(clientAddress), conversation: conversation)
return try ConversationWrapper.encode(conversation, client: client)
}

//
// Client API
AsyncFunction("canMessage") { (clientAddress: String, peerAddress: String) -> Bool in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}

return try await client.canMessage(peerAddress)
}

AsyncFunction("encryptAttachment") { (clientAddress: String, fileJson: String) -> String in
if clients[clientAddress] == nil {
if await clientsManager.getClient(key: clientAddress) == nil {
throw Error.noClient
}
let file = try DecryptedLocalAttachment.fromJson(fileJson)
Expand All @@ -204,7 +246,7 @@ public class XMTPModule: Module {
}

AsyncFunction("decryptAttachment") { (clientAddress: String, encryptedFileJson: String) -> String in
if clients[clientAddress] == nil {
if await clientsManager.getClient(key: clientAddress) == nil {
throw Error.noClient
}
let encryptedFile = try EncryptedLocalAttachment.fromJson(encryptedFileJson)
Expand All @@ -229,16 +271,26 @@ public class XMTPModule: Module {
}

AsyncFunction("listConversations") { (clientAddress: String) -> [String] in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}

let conversations = try await client.conversations.list()

return try conversations.map { conversation in
self.conversations[conversation.cacheKey(clientAddress)] = conversation

return try ConversationWrapper.encode(conversation, client: client)

return try await withThrowingTaskGroup(of: String.self) { group in
for conversation in conversations {
group.addTask {
await self.conversationsManager.updateConversation(key: conversation.cacheKey(clientAddress), conversation: conversation)
return try await ConversationWrapper.encode(conversation, client: client)
}
}

var results: [String] = []
for try await result in group {
results.append(result)
}

return results
}
}

Expand Down Expand Up @@ -270,7 +322,7 @@ public class XMTPModule: Module {
}

AsyncFunction("loadBatchMessages") { (clientAddress: String, topics: [String]) -> [String] in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}

Expand Down Expand Up @@ -363,7 +415,7 @@ public class XMTPModule: Module {
}

AsyncFunction("sendPreparedMessage") { (clientAddress: String, preparedLocalMessageJson: String) -> String in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}
guard let local = try? PreparedLocalMessage.fromJson(preparedLocalMessageJson) else {
Expand All @@ -386,7 +438,7 @@ public class XMTPModule: Module {
}

AsyncFunction("createConversation") { (clientAddress: String, peerAddress: String, contextJson: String) -> String in
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}

Expand All @@ -405,24 +457,24 @@ public class XMTPModule: Module {
}
}

Function("subscribeToConversations") { (clientAddress: String) in
subscribeToConversations(clientAddress: clientAddress)
AsyncFunction("subscribeToConversations") { (clientAddress: String) in
try await subscribeToConversations(clientAddress: clientAddress)
}

Function("subscribeToAllMessages") { (clientAddress: String) in
subscribeToAllMessages(clientAddress: clientAddress)
AsyncFunction("subscribeToAllMessages") { (clientAddress: String) in
try await subscribeToAllMessages(clientAddress: clientAddress)
}

AsyncFunction("subscribeToMessages") { (clientAddress: String, topic: String) in
try await subscribeToMessages(clientAddress: clientAddress, topic: topic)
}

Function("unsubscribeFromConversations") {
subscriptions["conversations"]?.cancel()
AsyncFunction("unsubscribeFromConversations") {
await subscriptionsManager.getSubscription(key: "conversations")?.cancel()
}

Function("unsubscribeFromAllMessages") {
subscriptions["messages"]?.cancel()
AsyncFunction("unsubscribeFromAllMessages") {
await subscriptionsManager.getSubscription(key: "messages")?.cancel()
}

AsyncFunction("unsubscribeFromMessages") { (clientAddress: String, topic: String) in
Expand Down Expand Up @@ -494,49 +546,49 @@ public class XMTPModule: Module {
}

func findConversation(clientAddress: String, topic: String) async throws -> Conversation? {
guard let client = clients[clientAddress] else {
guard let client = await clientsManager.getClient(key: clientAddress) else {
throw Error.noClient
}

let cacheKey = Conversation.cacheKeyForTopic(clientAddress: clientAddress, topic: topic)
if let conversation = conversations[cacheKey] {
if let conversation = await conversationsManager.getConversation(key: cacheKey) {
return conversation
} else if let conversation = try await client.conversations.list().first(where: { $0.topic == topic }) {
conversations[cacheKey] = conversation
await conversationsManager.updateConversation(key: cacheKey, conversation: conversation)
return conversation
}

return nil
}

func subscribeToConversations(clientAddress: String) {
guard let client = clients[clientAddress] else {
func subscribeToConversations(clientAddress: String) async throws {
guard let client = await clientsManager.getClient(key: clientAddress) else {
return
}

subscriptions["conversations"]?.cancel()
subscriptions["conversations"] = Task {
await subscriptionsManager.getSubscription(key: "conversations")?.cancel()
await subscriptionsManager.updateSubscription(key: "conversations", task: Task {
do {
for try await conversation in client.conversations.stream() {
for try await conversation in try await client.conversations.stream() {
sendEvent("conversation", [
"clientAddress": clientAddress,
"conversation": try ConversationWrapper.encodeToObj(conversation, client: client)
])
}
} catch {
print("Error in conversations subscription: \(error)")
subscriptions["conversations"]?.cancel()
await subscriptionsManager.getSubscription(key: "conversations")?.cancel()
}
}
})
}

func subscribeToAllMessages(clientAddress: String) {
guard let client = clients[clientAddress] else {
func subscribeToAllMessages(clientAddress: String) async throws {
guard let client = await clientsManager.getClient(key: clientAddress) else {
return
}

subscriptions["messages"]?.cancel()
subscriptions["messages"] = Task {
await subscriptionsManager.getSubscription(key: "messages")?.cancel()
await subscriptionsManager.updateSubscription(key: "messages", task: Task {
do {
for try await message in try await client.conversations.streamAllMessages() {
do {
Expand All @@ -550,18 +602,18 @@ public class XMTPModule: Module {
}
} catch {
print("Error in all messages subscription: \(error)")
subscriptions["messages"]?.cancel()
await subscriptionsManager.getSubscription(key: "messages")?.cancel()
}
}
})
}

func subscribeToMessages(clientAddress: String, topic: String) async throws {
guard let conversation = try await findConversation(clientAddress: clientAddress, topic: topic) else {
return
}

subscriptions[conversation.cacheKey(clientAddress)]?.cancel()
subscriptions[conversation.cacheKey(clientAddress)] = Task {
await subscriptionsManager.getSubscription(key: conversation.cacheKey(clientAddress))?.cancel()
await subscriptionsManager.updateSubscription(key: conversation.cacheKey(clientAddress), task: Task {
do {
for try await message in conversation.streamMessages() {
do {
Expand All @@ -575,16 +627,16 @@ public class XMTPModule: Module {
}
} catch {
print("Error in messages subscription: \(error)")
subscriptions[conversation.cacheKey(clientAddress)]?.cancel()
await subscriptionsManager.getSubscription(key: conversation.cacheKey(clientAddress))?.cancel()
}
}
})
}

func unsubscribeFromMessages(clientAddress: String, topic: String) async throws {
guard let conversation = try await findConversation(clientAddress: clientAddress, topic: topic) else {
return
}

subscriptions[conversation.cacheKey(clientAddress)]?.cancel()
await subscriptionsManager.getSubscription(key: conversation.cacheKey(clientAddress))?.cancel()
}
}
2 changes: 1 addition & 1 deletion ios/XMTPReactNative.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ Pod::Spec.new do |s|

s.source_files = "**/*.{h,m,swift}"
s.dependency "MessagePacker"
s.dependency "XMTP", "= 0.5.7-alpha0"
s.dependency "XMTP", "= 0.5.8-alpha0"
end

0 comments on commit 66126de

Please sign in to comment.