Skip to content

Commit

Permalink
feat: add batch message listing, use updated libxmtp (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmccartney authored Jun 2, 2023
1 parent dc2040a commit 83d76e8
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 124 deletions.
4 changes: 2 additions & 2 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/xmtp/xmtp-rust-swift",
"state" : {
"revision" : "eccfc16bb8f866857ecbb1604c1dab855b1960f7",
"version" : "0.2.2-beta0"
"revision" : "41a1161cf06a86bab0aa886e450584a1191429b1",
"version" : "0.3.0-beta0"
}
}
],
Expand Down
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ let package = Package(
.package(url: "https://github.com/1024jp/GzipSwift", from: "5.2.0"),
.package(url: "https://github.com/bufbuild/connect-swift", from: "0.3.0"),
.package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.0.0"),
.package(url: "https://github.com/xmtp/xmtp-rust-swift", from: "0.2.2-beta0"),
.package(url: "https://github.com/xmtp/xmtp-rust-swift", from: "0.3.0-beta0"),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
Expand Down
179 changes: 63 additions & 116 deletions Sources/XMTP/ApiClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,51 @@

import Foundation
import XMTPRust
import XMTPRustSwift

typealias PublishRequest = Xmtp_MessageApi_V1_PublishRequest
typealias PublishResponse = Xmtp_MessageApi_V1_PublishResponse
typealias BatchQueryRequest = Xmtp_MessageApi_V1_BatchQueryRequest
typealias BatchQueryResponse = Xmtp_MessageApi_V1_BatchQueryResponse
typealias Cursor = Xmtp_MessageApi_V1_Cursor
typealias QueryRequest = Xmtp_MessageApi_V1_QueryRequest
typealias QueryResponse = Xmtp_MessageApi_V1_QueryResponse
typealias SubscribeRequest = Xmtp_MessageApi_V1_SubscribeRequest

protocol ApiClient {
var environment: XMTPEnvironment { get }
init(environment: XMTPEnvironment, secure: Bool, rustClient: XMTPRust.RustClient) throws
func setAuthToken(_ token: String)
func batchQuery(request: BatchQueryRequest) async throws -> BatchQueryResponse
func query(topic: String, pagination: Pagination?, cursor: Xmtp_MessageApi_V1_Cursor?) async throws -> QueryResponse
func query(topic: Topic, pagination: Pagination?) async throws -> QueryResponse
func query(request: QueryRequest) async throws -> QueryResponse
func envelopes(topic: String, pagination: Pagination?) async throws -> [Envelope]
func publish(envelopes: [Envelope]) async throws -> PublishResponse
func publish(request: PublishRequest) async throws -> PublishResponse
func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error>
}

func makeQueryRequest(topic: String, pagination: Pagination? = nil, cursor: Cursor? = nil) -> QueryRequest {
return QueryRequest.with {
$0.contentTopics = [topic]
if let pagination {
$0.pagingInfo = pagination.pagingInfo
}
if let startAt = pagination?.startTime {
$0.endTimeNs = UInt64(startAt.millisecondsSinceEpoch) * 1_000_000
$0.pagingInfo.direction = .descending
}
if let endAt = pagination?.endTime {
$0.startTimeNs = UInt64(endAt.millisecondsSinceEpoch) * 1_000_000
$0.pagingInfo.direction = .descending
}
if let cursor {
$0.pagingInfo.cursor = cursor
}
}
}

class GRPCApiClient: ApiClient {
let ClientVersionHeaderKey = "X-Client-Version"
let AppVersionHeaderKey = "X-App-Version"
Expand All @@ -48,96 +77,26 @@ class GRPCApiClient: ApiClient {
func setAuthToken(_ token: String) {
authToken = token
}

func rustPagingInfoFromRequest(request: Xmtp_MessageApi_V1_QueryRequest) -> XMTPRust.PagingInfo {
var rustPaging = XMTPRust.PagingInfo(limit: 0, cursor: nil, direction: XMTPRust.SortDirection.Ascending)
rustPaging.limit = request.pagingInfo.limit
if request.hasPagingInfo && request.pagingInfo.hasCursor {
let cursor = request.pagingInfo.cursor;
let digest = RustVec<UInt8>(cursor.index.digest)
let senderTimeNs = cursor.index.senderTimeNs
rustPaging.cursor = XMTPRust.IndexCursor(digest: digest, sender_time_ns: senderTimeNs)
}

// Set rustPaging.direction based off a switch-case conversion
switch request.pagingInfo.direction {
case .ascending:
rustPaging.direction = XMTPRust.SortDirection.Ascending
case .descending:
rustPaging.direction = XMTPRust.SortDirection.Descending
case .unspecified:
rustPaging.direction = XMTPRust.SortDirection.Unspecified
case .UNRECOGNIZED(_):
rustPaging.direction = XMTPRust.SortDirection.Unspecified
}

return rustPaging;
}

func parseRustPagingInfoFromResponse(response: XMTPRust.QueryResponse) -> PagingInfo {
var pagingInfo = PagingInfo()
if let rustPaging = response.paging_info() {
pagingInfo.limit = rustPaging.limit
if let rustCursor = rustPaging.cursor {
var cursor = PagingInfoCursor()
cursor.index.digest = Data(rustCursor.digest)
cursor.index.senderTimeNs = rustCursor.sender_time_ns
pagingInfo.cursor = cursor
}
switch rustPaging.direction {
case XMTPRust.SortDirection.Ascending:
pagingInfo.direction = .ascending
case XMTPRust.SortDirection.Descending:
pagingInfo.direction = .descending
case XMTPRust.SortDirection.Unspecified:
pagingInfo.direction = .unspecified
}
}
return pagingInfo
}

func query(topic: String, pagination: Pagination? = nil, cursor: Xmtp_MessageApi_V1_Cursor? = nil) async throws -> QueryResponse {
var request = Xmtp_MessageApi_V1_QueryRequest()
request.contentTopics = [topic]
func batchQuery(request: BatchQueryRequest) async throws -> BatchQueryResponse {
let req = RustVec<UInt8>(try request.serializedData())
let res: RustVec<UInt8> = try await rustClient.batch_query(req)
return try BatchQueryResponse(serializedData: Data(res))
}

if let pagination {
request.pagingInfo = pagination.pagingInfo
}
func query(request: QueryRequest) async throws -> QueryResponse {
let req = RustVec<UInt8>(try request.serializedData())
let res: RustVec<UInt8> = try await rustClient.query(req)
return try QueryResponse(serializedData: Data(res))
}

if let startAt = pagination?.startTime {
request.endTimeNs = UInt64(startAt.millisecondsSinceEpoch) * 1_000_000
request.pagingInfo.direction = .descending
}
func query(topic: String, pagination: Pagination? = nil, cursor: Cursor? = nil) async throws -> QueryResponse {
return try await query(request: makeQueryRequest(topic: topic, pagination: pagination, cursor: cursor))
}

if let endAt = pagination?.endTime {
request.startTimeNs = UInt64(endAt.millisecondsSinceEpoch) * 1_000_000
request.pagingInfo.direction = .descending
}

if let cursor {
request.pagingInfo.cursor = cursor
}


let startTimeParam: UInt64? = pagination == nil ? nil : request.startTimeNs
let endTimeParam: UInt64? = pagination == nil ? nil : request.endTimeNs
let rustPagingInfo = rustPagingInfoFromRequest(request: request)
let response = try await rustClient.query(topic.intoRustString(), startTimeParam, endTimeParam, rustPagingInfo)
// response has .envelopes() and .paging_info() but the envelopes need to be mapped into Envelope objects that Swift understands
var queryResponse = QueryResponse()
// Build the query response from response fields
queryResponse.envelopes = response.envelopes().map { rustEnvelope in
var envelope = Envelope()
envelope.contentTopic = rustEnvelope.get_topic().toString()
envelope.timestampNs = rustEnvelope.get_sender_time_ns()
envelope.message = Data(rustEnvelope.get_payload())
return envelope
}
if let _ = response.paging_info() {
queryResponse.pagingInfo = parseRustPagingInfoFromResponse(response: response)
}
return queryResponse
}
func query(topic: Topic, pagination: Pagination? = nil) async throws -> QueryResponse {
return try await query(request: makeQueryRequest(topic: topic.description, pagination: pagination))
}

func envelopes(topic: String, pagination: Pagination? = nil) async throws -> [Envelope] {
var envelopes: [Envelope] = []
Expand All @@ -156,47 +115,35 @@ class GRPCApiClient: ApiClient {
return envelopes
}

func query(topic: Topic, pagination: Pagination? = nil) async throws -> Xmtp_MessageApi_V1_QueryResponse {
return try await query(topic: topic.description, pagination: pagination)
}

func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error> {
return AsyncThrowingStream { continuation in
Task {
let topicsVec = RustVec<RustString>()
for topic in topics {
topicsVec.push(value: topic.intoRustString())
}
let subscription = try await self.rustClient.subscribe(topicsVec)
// Run a continuous for loop polling subscription.get_messages() and then waiting for 2 seconds
let request = SubscribeRequest.with { $0.contentTopics = topics }
let req = RustVec<UInt8>(try request.serializedData())
let subscription = try await self.rustClient.subscribe(req)
// Run a continuous for loop polling and sleeping for a bit each loop.
while true {
let rustEnvelopes = try subscription.get_messages()
for rustEnvelope in rustEnvelopes {
var swiftEnvelope = Envelope()
swiftEnvelope.contentTopic = rustEnvelope.get_topic().toString()
swiftEnvelope.timestampNs = rustEnvelope.get_sender_time_ns()
swiftEnvelope.message = Data(rustEnvelope.get_payload())
continuation.yield(swiftEnvelope)
let buf = try subscription.get_envelopes_as_query_response()
// Note: it uses QueryResponse as a convenient envelopes wrapper.
let res = try QueryResponse(serializedData: Data(buf))
for envelope in res.envelopes {
continuation.yield(envelope)
}
try await Task.sleep(nanoseconds: 50_000_000) // 50ms
}
}
}
}

@discardableResult func publish(envelopes: [Envelope]) async throws -> PublishResponse {
var request = Xmtp_MessageApi_V1_PublishRequest()
request.envelopes = envelopes

let envelopesVec = RustVec<XMTPRust.Envelope>()
func publish(request: PublishRequest) async throws -> PublishResponse {
let req = RustVec<UInt8>(try request.serializedData())
let res: RustVec<UInt8> = try await rustClient.publish(authToken.intoRustString(), req)
return try PublishResponse(serializedData: Data(res))
}

envelopes.forEach { envelope in
let rustEnvelope = XMTPRust.create_envelope(envelope.contentTopic.intoRustString(), envelope.timestampNs, RustVec<UInt8>(envelope.message))
envelopesVec.push(value: rustEnvelope)
}
let _ = try await rustClient.publish(authToken.intoRustString(), envelopesVec)
// NOTE: PublishResponse proto has no fields
let publishResponse = PublishResponse()
return publishResponse
@discardableResult func publish(envelopes: [Envelope]) async throws -> PublishResponse {
return try await publish(request: PublishRequest.with {
$0.envelopes = envelopes
})
}
}
9 changes: 8 additions & 1 deletion Sources/XMTP/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ public class Client {
for envelope in res.envelopes {
let encryptedBundle = try EncryptedPrivateKeyBundle(serializedData: envelope.message)
let bundle = try await encryptedBundle.decrypted(with: account)
return bundle.v1
if case .v1 = bundle.version {
return bundle.v1
}
print("discarding unsupported stored key bundle")
}

return nil
Expand Down Expand Up @@ -275,6 +278,10 @@ public class Client {
)
}

func batchQuery(request: BatchQueryRequest) async throws -> BatchQueryResponse {
return try await apiClient.batchQuery(request: request)
}

@discardableResult func publish(envelopes: [Envelope]) async throws -> PublishResponse {
let authorized = AuthorizedIdentity(address: address, authorized: privateKeyBundleV1.identityKey.publicKey, identity: privateKeyBundleV1.identityKey)
let authToken = try await authorized.createAuthToken()
Expand Down
33 changes: 33 additions & 0 deletions Sources/XMTP/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,39 @@ public class Conversations {
self.client = client
}

public func listBatchMessages(topics: [String], limit: Int? = nil, before: Date? = nil, after: Date? = nil) async throws -> [DecodedMessage] {
let pagination = Pagination(limit: limit, startTime: before, endTime: after)
let requests = topics.map { (topic) in
makeQueryRequest(topic: topic, pagination: pagination)
}
/// The maximum number of requests permitted in a single batch call.
let maxQueryRequestsPerBatch = 50
let batches = requests.chunks(maxQueryRequestsPerBatch)
.map { (requests) in BatchQueryRequest.with { $0.requests = requests } }
var messages: [DecodedMessage] = []
// TODO: consider using a task group here for parallel batch calls
for batch in batches {
messages += try await client.apiClient.batchQuery(request: batch)
.responses.flatMap { (res) in
try res.envelopes.compactMap { (envelope) in
let conversation = conversations
.first(where: { $0.topic == envelope.contentTopic })
if conversation == nil {
print("discarding message, unknown conversation \(envelope)")
return nil
}
let msg = try conversation?.decode(envelope)
if msg == nil {
print("discarding message, unable to decode \(envelope)")
return nil
}
return msg
}
}
}
return messages
}

public func streamAllMessages() async throws -> AsyncThrowingStream<DecodedMessage, Error> {
return AsyncThrowingStream { continuation in
Task {
Expand Down
8 changes: 8 additions & 0 deletions Sources/XMTP/Util.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ enum Util {
return data.web3.keccak256
}
}

extension Array {
func chunks(_ chunkSize: Int) -> [[Element]] {
return stride(from: 0, to: self.count, by: chunkSize).map {
Array(self[$0..<Swift.min($0 + chunkSize, self.count)])
}
}
}
13 changes: 13 additions & 0 deletions Sources/XMTPTestHelpers/TestHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ public class FakeApiClient: ApiClient {

return PublishResponse()
}

public func batchQuery(request: XMTP.BatchQueryRequest) async throws -> XMTP.BatchQueryResponse {
abort() // Not supported on Fake
}

public func query(request: XMTP.QueryRequest) async throws -> XMTP.QueryResponse {
abort() // Not supported on Fake
}

public func publish(request: XMTP.PublishRequest) async throws -> XMTP.PublishResponse {
abort() // Not supported on Fake
}

}

@available(iOS 15, *)
Expand Down
4 changes: 2 additions & 2 deletions XMTP.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pod::Spec.new do |spec|
#

spec.name = "XMTP"
spec.version = "0.2.2-alpha0"
spec.version = "0.3.0-alpha0"
spec.summary = "XMTP SDK Cocoapod"

# This description is used to generate tags and improve search results.
Expand Down Expand Up @@ -44,7 +44,7 @@ Pod::Spec.new do |spec|
spec.dependency "web3.swift"
spec.dependency "GzipSwift"
spec.dependency "Connect-Swift"
spec.dependency 'XMTPRust', '= 0.2.2-beta0'
spec.dependency 'XMTPRust', '= 0.3.0-beta0'

spec.xcconfig = {'VALID_ARCHS' => 'arm64' }
end
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/xmtp/xmtp-rust-swift",
"state" : {
"revision" : "eccfc16bb8f866857ecbb1604c1dab855b1960f7",
"version" : "0.2.2-beta0"
"revision" : "41a1161cf06a86bab0aa886e450584a1191429b1",
"version" : "0.3.0-beta0"
}
}
],
Expand Down

0 comments on commit 83d76e8

Please sign in to comment.