Skip to content

Commit

Permalink
Introduce Conversation.prepareMessage (#80)
Browse files Browse the repository at this point in the history
This gives back a PreparedMessage that has a messageID on it. Clients
can use this for optimistic sending purposes.
  • Loading branch information
nakajima authored Mar 14, 2023
1 parent 8e96ec3 commit d9463d7
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 90 deletions.
9 changes: 9 additions & 0 deletions Sources/XMTP/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ public enum Conversation {
}
}

public func prepareMessage<T>(content: T, options: SendOptions? = nil) async throws -> PreparedMessage {
switch self {
case let .v1(conversationV1):
return try await conversationV1.prepareMessage(content: content, options: options ?? .init())
case let .v2(conversationV2):
return try await conversationV2.prepareMessage(content: content, options: options ?? .init())
}
}

@discardableResult public func send<T>(content: T, options: SendOptions? = nil, fallback _: String? = nil) async throws -> String {
switch self {
case let .v1(conversationV1):
Expand Down
84 changes: 42 additions & 42 deletions Sources/XMTP/ConversationV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,11 @@ public struct ConversationV1 {
Topic.directMessageV1(client.address, peerAddress)
}

@discardableResult func send(content: String, options: SendOptions? = nil) async throws -> String {
return try await send(content: content, options: options, sentAt: nil)
}

@discardableResult internal func send(content: String, options: SendOptions? = nil, sentAt: Date? = nil) async throws -> String {
let encoder = TextCodec()
let encodedContent = try encoder.encode(content: content)

return try await send(content: encodedContent, options: options, sentAt: sentAt)
}
func prepareMessage<T>(content: T, options: SendOptions?) async throws -> PreparedMessage {
guard let contact = try await client.contacts.find(peerAddress) else {
throw ContactBundleError.notFound
}

func send<T>(content: T, options: SendOptions? = nil) async throws -> String {
let codec = Client.codecRegistry.find(for: options?.contentType)

func encode<Codec: ContentCodec>(codec: Codec, content: Any) throws -> EncodedContent {
Expand All @@ -63,18 +56,9 @@ public struct ConversationV1 {
let content = content as T
var encoded = try encode(codec: codec, content: content)
encoded.fallback = options?.contentFallback ?? ""
return try await send(content: encoded, options: options)
}

internal func send(content encodedContent: EncodedContent, options: SendOptions? = nil, sentAt: Date? = nil) async throws -> String {
guard let contact = try await client.contacts.find(peerAddress) else {
throw ContactBundleError.notFound
}

var encodedContent = encodedContent

if let compression = options?.compression {
encodedContent = try encodedContent.compress(compression)
encoded = try encoded.compress(compression)
}

let recipient = try contact.toPublicKeyBundle()
Expand All @@ -83,12 +67,12 @@ public struct ConversationV1 {
fatalError("no signature for id key")
}

let date = sentAt ?? Date()
let date = sentAt

let message = try MessageV1.encode(
sender: client.privateKeyBundleV1,
recipient: recipient,
message: try encodedContent.serializedData(),
message: try encoded.serializedData(),
timestamp: date
)

Expand All @@ -98,28 +82,44 @@ public struct ConversationV1 {
message: try Message(v1: message).serializedData()
)

var envelopes = [messageEnvelope]

if client.contacts.needsIntroduction(peerAddress) {
envelopes.append(contentsOf: [
Envelope(
topic: .userIntro(peerAddress),
timestamp: date,
message: try Message(v1: message).serializedData()
),
Envelope(
topic: .userIntro(client.address),
timestamp: date,
message: try Message(v1: message).serializedData()
),
])

client.contacts.hasIntroduced[peerAddress] = true
return PreparedMessage(messageEnvelope: messageEnvelope, conversation: .v1(self)) {
var envelopes = [messageEnvelope]

if client.contacts.needsIntroduction(peerAddress) {
envelopes.append(contentsOf: [
Envelope(
topic: .userIntro(peerAddress),
timestamp: date,
message: try Message(v1: message).serializedData()
),
Envelope(
topic: .userIntro(client.address),
timestamp: date,
message: try Message(v1: message).serializedData()
),
])

client.contacts.hasIntroduced[peerAddress] = true
}

try await client.publish(envelopes: envelopes)
}
}

try await client.publish(envelopes: envelopes)
@discardableResult func send(content: String, options: SendOptions? = nil) async throws -> String {
return try await send(content: content, options: options, sentAt: nil)
}

return generateID(from: messageEnvelope)
@discardableResult internal func send(content: String, options: SendOptions? = nil, sentAt _: Date? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
}

func send<T>(content: T, options: SendOptions? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
}

public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
Expand Down
80 changes: 38 additions & 42 deletions Sources/XMTP/ConversationV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,37 @@ public struct ConversationV2 {
ConversationV2Container(topic: topic, keyMaterial: keyMaterial, conversationID: context.conversationID, metadata: context.metadata, peerAddress: peerAddress, header: header)
}

func prepareMessage<T>(content: T, options: SendOptions?) async throws -> PreparedMessage {
let codec = Client.codecRegistry.find(for: options?.contentType)

func encode<Codec: ContentCodec>(codec: Codec, content: Any) throws -> EncodedContent {
if let content = content as? Codec.T {
return try codec.encode(content: content)
} else {
throw CodecError.invalidContent
}
}

var encoded = try encode(codec: codec, content: content)
encoded.fallback = options?.contentFallback ?? ""

if let compression = options?.compression {
encoded = try encoded.compress(compression)
}

let message = try await MessageV2.encode(
client: client,
content: encoded,
topic: topic,
keyMaterial: keyMaterial
)

let envelope = Envelope(topic: topic, timestamp: Date(), message: try Message(v2: message).serializedData())
return PreparedMessage(messageEnvelope: envelope, conversation: .v2(self)) {
try await client.publish(envelopes: [envelope])
}
}

func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil) async throws -> [DecodedMessage] {
let pagination = Pagination(limit: limit, startTime: before, endTime: after)

Expand Down Expand Up @@ -118,25 +149,15 @@ public struct ConversationV2 {
}

@discardableResult func send<T>(content: T, options: SendOptions? = nil) async throws -> String {
let codec = Client.codecRegistry.find(for: options?.contentType)

func encode<Codec: ContentCodec>(codec: Codec, content: Any) throws -> EncodedContent {
if let content = content as? Codec.T {
return try codec.encode(content: content)
} else {
throw CodecError.invalidContent
}
}

var encoded = try encode(codec: codec, content: content)
encoded.fallback = options?.contentFallback ?? ""
return try await send(content: encoded, options: options, sentAt: Date())
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
}

@discardableResult func send(content: String, options: SendOptions? = nil, sentAt: Date) async throws -> String {
let encoder = TextCodec()
let encodedContent = try encoder.encode(content: content)
return try await send(content: encodedContent, options: options, sentAt: sentAt)
@discardableResult func send(content: String, options: SendOptions? = nil, sentAt _: Date) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
}

public func encode<Codec: ContentCodec, T>(codec: Codec, content: T) async throws -> Data where Codec.T == T {
Expand All @@ -158,31 +179,6 @@ public struct ConversationV2 {
return try envelope.serializedData()
}

internal func send(content: EncodedContent, options: SendOptions? = nil, sentAt: Date) async throws -> String {
guard try await client.getUserContact(peerAddress: peerAddress) != nil else {
throw ContactBundleError.notFound
}

var content = content

if let compression = options?.compression {
content = try content.compress(compression)
}

let message = try await MessageV2.encode(
client: client,
content: content,
topic: topic,
keyMaterial: keyMaterial
)

let envelope = Envelope(topic: topic, timestamp: sentAt, message: try Message(v2: message).serializedData())

try await client.publish(envelopes: [envelope])

return generateID(from: envelope)
}

@discardableResult func send(content: String, options: SendOptions? = nil) async throws -> String {
return try await send(content: content, options: options, sentAt: Date())
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/XMTP/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Foundation
import XMTPProto

public enum ConversationError: Error {
case recipientNotOnNetwork, recipientIsSender
case recipientNotOnNetwork, recipientIsSender, v1NotSupported(String)
}

/// Handles listing and creating Conversations.
Expand Down
27 changes: 27 additions & 0 deletions Sources/XMTP/PreparedMessage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// PreparedMessage.swift
//
//
// Created by Pat Nakajima on 3/9/23.
//

import CryptoKit
import Foundation

public struct PreparedMessage {
var messageEnvelope: Envelope
var conversation: Conversation
var onSend: () async throws -> Void

public func decodedMessage() throws -> DecodedMessage {
return try conversation.decode(messageEnvelope)
}

public func send() async throws {
try await onSend()
}

var messageID: String {
Data(SHA256.hash(data: messageEnvelope.message)).toHex
}
}
48 changes: 43 additions & 5 deletions Tests/XMTPTests/ConversationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,52 @@ class ConversationTests: XCTestCase {
var bob: PrivateKey!
var bobClient: Client!

var fixtures: Fixtures!

override func setUp() async throws {
alice = try PrivateKey.generate()
bob = try PrivateKey.generate()
fixtures = await fixtures()

alice = fixtures.alice
bob = fixtures.bob

fakeApiClient = fixtures.fakeApiClient

aliceClient = fixtures.aliceClient
bobClient = fixtures.bobClient
}

func testCanPrepareV1Message() async throws {
// Publish legacy contacts so we can get v1 conversations
try await fixtures.publishLegacyContact(client: bobClient)
try await fixtures.publishLegacyContact(client: aliceClient)

let conversation = try await aliceClient.conversations.newConversation(with: bob.address)
XCTAssertEqual(conversation.version, .v1)

let preparedMessage = try await conversation.prepareMessage(content: "hi")
let messageID = preparedMessage.messageID

try await preparedMessage.send()

let messages = try await conversation.messages()
let message = messages[0]

XCTAssertEqual("hi", message.body)
XCTAssertEqual(message.id, messageID)
}

func testCanPrepareV2Message() async throws {
let conversation = try await aliceClient.conversations.newConversation(with: bob.address)
let preparedMessage = try await conversation.prepareMessage(content: "hi")
let messageID = preparedMessage.messageID

try await preparedMessage.send()

fakeApiClient = FakeApiClient()
let messages = try await conversation.messages()
let message = messages[0]

aliceClient = try await Client.create(account: alice, apiClient: fakeApiClient)
bobClient = try await Client.create(account: bob, apiClient: fakeApiClient)
XCTAssertEqual("hi", message.body)
XCTAssertEqual(message.id, messageID)
}

func testV2RejectsSpoofedContactBundles() async throws {
Expand Down

0 comments on commit d9463d7

Please sign in to comment.