Skip to content

Commit

Permalink
Merge pull request #253 from xmtp/ar/additional-broadcast-handling
Browse files Browse the repository at this point in the history
feat: Add Additional Broadcast Handling
  • Loading branch information
alexrisch authored Jun 28, 2024
2 parents 5ddd500 + 8a81939 commit 41ba00c
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-rats-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@xmtp/broadcast-sdk": minor
---

Message personalization callback, conversation creation callback
2 changes: 1 addition & 1 deletion packages/broadcast/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"packageManager": "[email protected]",
"devDependencies": {
"@rollup/plugin-typescript": "^11.1.6",
"@xmtp/xmtp-js": "^11.3.12",
"@xmtp/xmtp-js": "^11.6.3",
"ethers": "^6.10.0",
"rollup": "^4.13.0",
"rollup-plugin-dts": "^6.1.0",
Expand Down
85 changes: 59 additions & 26 deletions packages/broadcast/src/BroadcastClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { type Client, type Conversation } from "@xmtp/xmtp-js"

import { BroadcastConstructorParams, BroadcastOptions } from "./types"
import {
BroadcastConstructorParams,
BroadcastOptions,
OnBatchComplete,
OnBatchStart,
OnBroadcastComplete,
OnCanMessageAddressesUpdate,
OnCantMessageAddress,
OnDelay,
OnMessageFailed,
OnMessageSending,
OnMessageSent,
OnWillConversationCreate,
} from "./types"

const GENERAL_RATE_LIMIT = 10000

Expand All @@ -18,39 +31,44 @@ export class BroadcastClient<ContentTypes = unknown> {
/**
* Called when a batch of addresses is about to be sent
*/
onBatchStart?: (addresses: string[]) => void
onBatchStart?: OnBatchStart
/**
* Called when a batch of addresses has been sent/failed
*/
onBatchComplete?: (addresses: string[]) => void
onBatchComplete?: OnBatchComplete
/**
* Called when all addresses have been sent/failed
*/
onBroadcastComplete?: () => void
onBroadcastComplete?: OnBroadcastComplete
/**
* Called when an address can't be messaged
*/
onCantMessageAddress?: (address: string) => void
onCantMessageAddress?: OnCantMessageAddress
/**
* Called when a message is about to be sent
*/
onMessageSending?: (address: string) => void
onMessageSending?: OnMessageSending<ContentTypes>
/**
* Called when a message fails to send
*/
onMessageFailed?: (address: string) => void
onMessageFailed?: OnMessageFailed
/**
* Called when a message is successfully sent
*/
onMessageSent?: (address: string) => void
onMessageSent?: OnMessageSent
/**
* Called when the list of addresses that can be messaged is updated, this is useful for caching
*/
onCanMessageAddressesUpdate?: (addresses: string[]) => void
onCanMessageAddressesUpdate?: OnCanMessageAddressesUpdate
/**
* Called when a delay is about to happen
*/
onDelay?: (ms: number) => void
onDelay?: OnDelay
/**
* Called when a new conversation is about to be created
* This can be used to add additional payload for individual addresses like conversation context and consent proofs
*/
onWillConversationCreate?: OnWillConversationCreate

constructor({
client,
Expand All @@ -67,6 +85,7 @@ export class BroadcastClient<ContentTypes = unknown> {
onMessageSent,
onCanMessageAddressesUpdate,
onDelay,
onWillConversationCreate,
}: BroadcastConstructorParams<ContentTypes>) {
this.client = client
this.addresses = addresses
Expand All @@ -82,6 +101,7 @@ export class BroadcastClient<ContentTypes = unknown> {
this.onMessageSent = onMessageSent
this.onCanMessageAddressesUpdate = onCanMessageAddressesUpdate
this.onDelay = onDelay
this.onWillConversationCreate = onWillConversationCreate
}

public broadcast = async (
Expand Down Expand Up @@ -133,14 +153,18 @@ export class BroadcastClient<ContentTypes = unknown> {
try {
let conversation = this.conversationMapping.get(address)
if (!conversation) {
conversation =
await this.client.conversations.newConversation(address)
const newConversationArgs =
await this.onWillConversationCreate?.(address)
conversation = await this.client.conversations.newConversation(
address,
...(newConversationArgs ?? []),
)
this.conversationMapping.set(address, conversation)
}

for (const message of messages) {
this.onMessageSending?.(address)
await conversation.send(message)
const personalizedMessage = await this.onMessageSending?.(address)
await conversation.send(personalizedMessage || message)
}
this.onMessageSent?.(address)
// Clear up some memory after we are done with the conversation
Expand All @@ -165,10 +189,15 @@ export class BroadcastClient<ContentTypes = unknown> {
const finalErrors: string[] = []
for (const address of this.errorBatch) {
try {
const conversation =
await this.client.conversations.newConversation(address)
const newConversationArgs =
await this.onWillConversationCreate?.(address)
const conversation = await this.client.conversations.newConversation(
address,
...(newConversationArgs ?? []),
)
for (const message of messages) {
await conversation.send(message)
const personalizedMessage = await this.onMessageSending?.(address)
await conversation.send(personalizedMessage ?? message)
}
this.onMessageSent?.(address)
} catch (err) {
Expand Down Expand Up @@ -260,39 +289,43 @@ export class BroadcastClient<ContentTypes = unknown> {
this.rateLimitTime = time
}

setOnBatchStart(callback: (addresses: string[]) => void) {
setOnBatchStart(callback: OnBatchStart) {
this.onBatchStart = callback
}

setOnBatchComplete(callback: (addresses: string[]) => void) {
setOnBatchComplete(callback: OnBatchComplete) {
this.onBatchComplete = callback
}

setOnBroadcastComplete(callback: () => void) {
setOnBroadcastComplete(callback: OnBroadcastComplete) {
this.onBroadcastComplete = callback
}

setOnCantMessageAddress(callback: (address: string) => void) {
setOnCantMessageAddress(callback: OnCantMessageAddress) {
this.onCantMessageAddress = callback
}

setOnMessageSending(callback: (address: string) => void) {
setOnMessageSending(callback: OnMessageSending<ContentTypes>) {
this.onMessageSending = callback
}

setOnMessageFailed(callback: (address: string) => void) {
setOnMessageFailed(callback: OnMessageFailed) {
this.onMessageFailed = callback
}

setOnMessageSent(callback: (address: string) => void) {
setOnMessageSent(callback: OnMessageSent) {
this.onMessageSent = callback
}

setOnCanMessageAddressesUpdate(callback: (addresses: string[]) => void) {
setOnCanMessageAddressesUpdate(callback: OnCanMessageAddressesUpdate) {
this.onCanMessageAddressesUpdate = callback
}

setOnDelay(callback: (ms: number) => void) {
setOnDelay(callback: OnDelay) {
this.onDelay = callback
}

setOnWillConversationCreate(callback: OnWillConversationCreate) {
this.onWillConversationCreate = callback
}
}
59 changes: 59 additions & 0 deletions packages/broadcast/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,63 @@ describe("BroadcastClient", () => {

expect(onDelay).toHaveBeenCalledTimes(4)
})

it("should call onWillConversationCreate and use its return value", async () => {
const onWillConversationCreate = vi
.fn()
.mockResolvedValue(["additionalArg1", "additionalArg2"])
broadcastClient.setOnWillConversationCreate(onWillConversationCreate)
broadcastClient.setAddresses(["address3"])
const messages = ["message1", "message2"]
const options = { skipInitialDelay: true }

await broadcastClient.broadcast(messages, options)

expect(onWillConversationCreate).toHaveBeenCalledWith("address3")
expect(clientMock.conversations.newConversation).toHaveBeenCalledWith(
"address3",
"additionalArg1",
"additionalArg2",
)
})

it("should call onWillConversationCreate and work if undefined", async () => {
broadcastClient.setAddresses(["address3"])
const messages = ["message1", "message2"]
const options = { skipInitialDelay: true }

await broadcastClient.broadcast(messages, options)

expect(clientMock.conversations.newConversation).toHaveBeenCalledWith(
"address3",
)
})

it("should call onMessageSending and use the personalized message if provided", async () => {
const onMessageSending = vi.fn().mockResolvedValue("personalizedMessage")
broadcastClient.setOnMessageSending(onMessageSending)

const messages = ["message1"]
const options = { skipInitialDelay: true }

await broadcastClient.broadcast(messages, options)

expect(onMessageSending).toHaveBeenCalledWith("address1")
expect(onMessageSending).toHaveBeenCalledWith("address2")
expect(conversationMock.send).toHaveBeenCalledWith("personalizedMessage")
})

it("should use the original message if onMessageSending does not return a personalized message", async () => {
const onMessageSending = vi.fn().mockResolvedValue(undefined)
broadcastClient.setOnMessageSending(onMessageSending)

const messages = ["message1"]
const options = { skipInitialDelay: true }

await broadcastClient.broadcast(messages, options)

expect(onMessageSending).toHaveBeenCalledWith("address1")
expect(onMessageSending).toHaveBeenCalledWith("address2")
expect(conversationMock.send).toHaveBeenCalledWith("message1")
})
})
88 changes: 78 additions & 10 deletions packages/broadcast/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,41 @@
import { type Client } from "@xmtp/xmtp-js"

type Conversations = Client["conversations"]
type CreateConvo = Conversations["newConversation"]
type CreateConversationArgs = Parameters<CreateConvo>
type RemoveFirstFromTuple<T extends unknown[]> = T extends [
unknown,
...infer Rest,
]
? Rest
: []

type AdditionalConversationArgs = RemoveFirstFromTuple<CreateConversationArgs>

export type OnBatchStart = (addresses: string[]) => void

export type OnBatchComplete = (addresses: string[]) => void

export type OnBroadcastComplete = () => void

export type OnCantMessageAddress = (address: string) => void

export type OnCanMessageAddressesUpdate = (addresses: string[]) => void

export type OnMessageSending<ContentTypes = unknown> = (
address: string,
) => Promise<Exclude<ContentTypes, undefined>>

export type OnMessageFailed = (address: string) => void

export type OnMessageSent = (address: string) => void

export type OnDelay = (ms: number) => void

export type OnWillConversationCreate = (
address: string,
) => Promise<AdditionalConversationArgs>

export interface BroadcastConstructorParams<ContentTypes = unknown> {
client: Client<ContentTypes>
addresses: string[]
Expand All @@ -8,16 +44,48 @@ export interface BroadcastConstructorParams<ContentTypes = unknown> {
rateLimitTime?: number

// Callbacks
onBatchStart?: (addresses: string[]) => void
onBatchComplete?: (addresses: string[]) => void
onBroadcastComplete?: () => void
onCantMessageAddress?: (address: string) => void
onCanMessageAddreses?: (addresses: string[]) => void
onMessageSending?: (address: string) => void
onMessageFailed?: (address: string) => void
onMessageSent?: (address: string) => void
onCanMessageAddressesUpdate?: (addresses: string[]) => void
onDelay?: (ms: number) => void
/**
* Called when a batch of addresses is about to be sent
*/
onBatchStart?: OnBatchStart
/**
* Called when a batch of addresses has been sent/failed
*/
onBatchComplete?: OnBatchComplete
/**
* Called when all addresses have been sent/failed
*/
onBroadcastComplete?: OnBroadcastComplete
/**
* Called when an address can't be messaged
*/
onCantMessageAddress?: OnCantMessageAddress
/**
* Called when the list of addresses that can be messaged is updated, this is useful for caching
*/
onCanMessageAddressesUpdate?: OnCanMessageAddressesUpdate
/**
* Called when a message is about to be sent
* This can be used to return a different message for each address
*/
onMessageSending?: OnMessageSending<ContentTypes>
/**
* Called when a message fails to send
*/
onMessageFailed?: OnMessageFailed
/**
* Called when a message is successfully sent
*/
onMessageSent?: OnMessageSent
/**
* Called when a delay is about to happen
*/
onDelay?: OnDelay
/**
* Called when a new conversation is about to be created
* This can be used to add additional payload for individual addresses like conversation context and consent proofs
*/
onWillConversationCreate?: OnWillConversationCreate
}

export interface BroadcastOptions {
Expand Down
Loading

0 comments on commit 41ba00c

Please sign in to comment.