Skip to content

Commit

Permalink
remove all decrypted messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Nov 7, 2024
1 parent 15efb9f commit 9c97e77
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 161 deletions.
4 changes: 2 additions & 2 deletions Sources/XMTPiOS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -793,12 +793,12 @@ public final class Client {
}
}

public func findMessage(messageId: String) throws -> MessageV3? {
public func findMessage(messageId: String) throws -> Message? {
guard let client = v3Client else {
throw ClientError.noV3Client("Error no V3 client initialized")
}
do {
return MessageV3(
return Message(
client: self,
ffiMessage: try client.message(messageId: messageId.hexToData))
} catch {
Expand Down
29 changes: 1 addition & 28 deletions Sources/XMTPiOS/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum Conversation {
}
}

public func processMessage(messageBytes: Data) async throws -> MessageV3 {
public func processMessage(messageBytes: Data) async throws -> Message {
switch self {
case let .group(group):
return try await group.processMessage(messageBytes: messageBytes)
Expand Down Expand Up @@ -168,17 +168,6 @@ public enum Conversation {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<
DecryptedMessage, Error
> {
switch self {
case let .group(group):
return group.streamDecryptedMessages()
case let .dm(dm):
return dm.streamDecryptedMessages()
}
}

public func messages(
limit: Int? = nil, before: Date? = nil, after: Date? = nil,
direction: PagingInfoSortDirection? = .descending
Expand All @@ -195,22 +184,6 @@ public enum Conversation {
}
}

public func decryptedMessages(
limit: Int? = nil, before: Date? = nil, after: Date? = nil,
direction: PagingInfoSortDirection? = .descending
) async throws -> [DecryptedMessage] {
switch self {
case let .group(group):
return try await group.decryptedMessages(
before: before, after: after, limit: limit, direction: direction
)
case let .dm(dm):
return try await dm.decryptedMessages(
before: before, after: after, limit: limit, direction: direction
)
}
}

var client: Client {
switch self {
case let .group(group):
Expand Down
2 changes: 1 addition & 1 deletion Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public actor Conversations {
}
do {
continuation.yield(
try MessageV3(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
Expand Down
8 changes: 4 additions & 4 deletions Sources/XMTPiOS/Dm.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ public struct Dm: Identifiable, Equatable, Hashable {
return try ffiConversation.consentState().fromFFI
}

public func processMessage(messageBytes: Data) async throws -> MessageV3 {
public func processMessage(messageBytes: Data) async throws -> Message {
let message =
try await ffiConversation.processStreamedConversationMessage(
envelopeBytes: messageBytes)
return MessageV3(client: client, ffiMessage: message)
return Message(client: client, ffiMessage: message)
}

public func send<T>(content: T, options: SendOptions? = nil) async throws
Expand Down Expand Up @@ -181,7 +181,7 @@ public struct Dm: Identifiable, Equatable, Hashable {
}
do {
continuation.yield(
try MessageV3(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
Expand Down Expand Up @@ -260,7 +260,7 @@ public struct Dm: Identifiable, Equatable, Hashable {

return try ffiConversation.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
return Message(client: self.client, ffiMessage: ffiMessage)
.decodeOrNull()
}
}
Expand Down
106 changes: 4 additions & 102 deletions Sources/XMTPiOS/Group.swift
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ public struct Group: Identifiable, Equatable, Hashable {
return try ffiGroup.consentState().fromFFI
}

public func processMessage(messageBytes: Data) async throws -> MessageV3 {
public func processMessage(messageBytes: Data) async throws -> Message {
let message = try await ffiGroup.processStreamedConversationMessage(
envelopeBytes: messageBytes)
return MessageV3(client: client, ffiMessage: message)
return Message(client: client, ffiMessage: message)
}

public func send<T>(content: T, options: SendOptions? = nil) async throws
Expand Down Expand Up @@ -379,7 +379,7 @@ public struct Group: Identifiable, Equatable, Hashable {
}
do {
continuation.yield(
try MessageV3(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
Expand All @@ -401,42 +401,6 @@ public struct Group: Identifiable, Equatable, Hashable {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<
DecryptedMessage, Error
> {
AsyncThrowingStream { continuation in
let task = Task.detached {
self.streamHolder.stream = await self.ffiGroup.stream(
messageCallback: MessageCallback(client: self.client) {
message in
guard !Task.isCancelled else {
continuation.finish()
return
}
do {
continuation.yield(
try MessageV3(
client: self.client, ffiMessage: message
).decrypt())
} catch {
print("Error onMessage \(error)")
continuation.finish(throwing: error)
}
}
)

continuation.onTermination = { @Sendable reason in
self.streamHolder.stream?.end()
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
self.streamHolder.stream?.end()
}
}
}

public func messages(
before: Date? = nil,
after: Date? = nil,
Expand Down Expand Up @@ -494,70 +458,8 @@ public struct Group: Identifiable, Equatable, Hashable {

return try ffiGroup.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
return Message(client: self.client, ffiMessage: ffiMessage)
.decodeOrNull()
}
}

public func decryptedMessages(
before: Date? = nil,
after: Date? = nil,
limit: Int? = nil,
direction: PagingInfoSortDirection? = .descending,
deliveryStatus: MessageDeliveryStatus? = .all
) async throws -> [DecryptedMessage] {
var options = FfiListMessagesOptions(
sentBeforeNs: nil,
sentAfterNs: nil,
limit: nil,
deliveryStatus: nil,
direction: nil
)

if let before {
options.sentBeforeNs = Int64(
before.millisecondsSinceEpoch * 1_000_000)
}

if let after {
options.sentAfterNs = Int64(
after.millisecondsSinceEpoch * 1_000_000)
}

if let limit {
options.limit = Int64(limit)
}

let status: FfiDeliveryStatus? = {
switch deliveryStatus {
case .published:
return FfiDeliveryStatus.published
case .unpublished:
return FfiDeliveryStatus.unpublished
case .failed:
return FfiDeliveryStatus.failed
default:
return nil
}
}()

options.deliveryStatus = status

let direction: FfiDirection? = {
switch direction {
case .ascending:
return FfiDirection.ascending
default:
return FfiDirection.descending
}
}()

options.direction = direction

return try ffiGroup.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
.decryptOrNull()
}
}
}
24 changes: 0 additions & 24 deletions Tests/XMTPTests/V3ClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -356,30 +356,6 @@ class V3ClientTests: XCTestCase {
await fulfillment(of: [expectation1], timeout: 3)
}

func testCanStreamAllDecryptedMessagesFromV3Users() async throws {
let fixtures = try await localFixtures()

let expectation1 = XCTestExpectation(description: "got a conversation")
expectation1.expectedFulfillmentCount = 2
let convo = try await fixtures.boV3Client.conversations.findOrCreateDm(
with: fixtures.caroV2V3.address)
let group = try await fixtures.caroV2V3Client.conversations.newGroup(
with: [fixtures.boV3.address])
try await fixtures.boV3Client.conversations.sync()
Task(priority: .userInitiated) {
for try await _ in await fixtures.boV3Client.conversations
.streamAllDecryptedConversationMessages()
{
expectation1.fulfill()
}
}

_ = try await group.send(content: "hi")
_ = try await convo.send(content: "hi")

await fulfillment(of: [expectation1], timeout: 3)
}

func testCanStreamGroupsAndConversationsFromV3Users() async throws {
let fixtures = try await localFixtures()

Expand Down

0 comments on commit 9c97e77

Please sign in to comment.