diff --git a/README.md b/README.md index 60753cb17..6420cc945 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ const messages = await conversation.messages() // Send a message await conversation.send('gm') // Listen for new messages in the conversation -for await (const message of conversation.streamMessages()) { +for await (const message of await conversation.streamMessages()) { console.log(`[${message.senderAddress}]: ${message.text}`) } ``` @@ -134,7 +134,7 @@ You can also listen for new conversations being started in real-time. This will _Warning: this stream will continue infinitely. To end the stream you can either break from the loop, or call `await stream.return()`_ ```ts -const stream = xmtp.conversations.stream() +const stream = await xmtp.conversations.stream() for await (const conversation of stream) { console.log(`New conversation started with ${conversation.peerAddress}`) // Say hello to your new friend @@ -193,7 +193,7 @@ The Stream returned by the `stream` methods is an asynchronous iterator and as s const conversation = await xmtp.conversations.newConversation( '0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045' ) -for await (const message of conversation.streamMessages()) { +for await (const message of await conversation.streamMessages()) { if (message.senderAddress === xmtp.address) { // This message was sent from me continue diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 2022a5c03..5836b4092 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -8,6 +8,7 @@ services: - --store - --message-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - --lightpush + - --filter - --ws-port=9001 - --wait-for-db=30s ports: diff --git a/package-lock.json b/package-lock.json index 14e7f0b27..c3d4575cb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@stardazed/streams-polyfill": "^2.4.0", "cross-fetch": "^3.1.5", "ethers": "^5.5.3", - "js-waku": "^0.22.0", + "js-waku": "^0.24.0", "protobufjs": "^6.11.2" }, "devDependencies": { @@ -2040,11 +2040,6 @@ "resolved": "https://registry.npmjs.org/@leichtgewicht/ip-codec/-/ip-codec-2.0.3.tgz", "integrity": "sha512-nkalE/f1RvRGChwBnEIoBfSEYOXnCRdleKuv6+lePbMDrMZXeDQnqak5XDOeBgrPPyPfAdcCu/B5z+v3VhplGg==" }, - "node_modules/@multiformats/base-x": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/@multiformats/base-x/-/base-x-4.0.1.tgz", - "integrity": "sha512-eMk0b9ReBbV23xXU693TAIrLyeO5iTgBZGSJfpqriG8UkYvr/hC9u9pyMlAakDNHWmbhMZCDs6KQO0jzKD8OTw==" - }, "node_modules/@noble/ed25519": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.6.0.tgz", @@ -8003,9 +7998,9 @@ "dev": true }, "node_modules/js-waku": { - "version": "0.22.0", - "resolved": "https://registry.npmjs.org/js-waku/-/js-waku-0.22.0.tgz", - "integrity": "sha512-4aMOJI6HKSaJ4eLfanbD1CDifwUgCPfcIDnIrt96SJA5A3nf9VXFZUmR1huwv07JithmkP9orXNGb6u8KdGcrg==", + "version": "0.24.0", + "resolved": "https://registry.npmjs.org/js-waku/-/js-waku-0.24.0.tgz", + "integrity": "sha512-8l7/WuadaaGy6XmVKutZpJ61JohbBe8WamJUtNiTd8WdxhBuz/rXh5RUiD8mjiiG8kEzQ+3+E68rvIpb/+BbXw==", "dependencies": { "@chainsafe/libp2p-noise": "^5.0.0", "@ethersproject/rlp": "^5.5.0", @@ -8015,16 +8010,23 @@ "hi-base32": "^0.5.1", "it-concat": "^2.0.0", "it-length-prefixed": "^5.0.2", + "it-pipe": "^1.1.0", "js-sha3": "^0.8.0", "libp2p": "^0.36.2", "libp2p-bootstrap": "^0.14.0", + "libp2p-crypto": "^0.21.2", "libp2p-gossipsub": "0.13.0", + "libp2p-interfaces": "^4.0.6", "libp2p-mplex": "^0.10.4", "libp2p-websockets": "^0.16.1", + "long": "^4.0.0", "multiaddr": "^10.0.1", - "multihashes": "^4.0.3", + "multiformats": "^9.6.5", + "peer-id": "^0.16.0", "protobufjs": "^6.8.8", - "uuid": "^8.3.2" + "uint8arrays": "^3.0.0", + "uuid": "^8.3.2", + "varint": "^6.0.0" }, "engines": { "node": ">=16" @@ -8947,42 +8949,10 @@ "multiaddr": "^10.0.0" } }, - "node_modules/multibase": { - "version": "4.0.6", - "resolved": "https://registry.npmjs.org/multibase/-/multibase-4.0.6.tgz", - "integrity": "sha512-x23pDe5+svdLz/k5JPGCVdfn7Q5mZVMBETiC+ORfO+sor9Sgs0smJzAjfTbM5tckeCqnaUuMYoz+k3RXMmJClQ==", - "deprecated": "This module has been superseded by the multiformats module", - "dependencies": { - "@multiformats/base-x": "^4.0.1" - }, - "engines": { - "node": ">=12.0.0", - "npm": ">=6.0.0" - } - }, "node_modules/multiformats": { - "version": "9.6.4", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.6.4.tgz", - "integrity": "sha512-fCCB6XMrr6CqJiHNjfFNGT0v//dxOBMrOMqUIzpPc/mmITweLEyhvMpY9bF+jZ9z3vaMAau5E8B68DW77QMXkg==" - }, - "node_modules/multihashes": { - "version": "4.0.3", - "resolved": "https://registry.npmjs.org/multihashes/-/multihashes-4.0.3.tgz", - "integrity": "sha512-0AhMH7Iu95XjDLxIeuCOOE4t9+vQZsACyKZ9Fxw2pcsRmlX4iCn1mby0hS0bb+nQOVpdQYWPpnyusw4da5RPhA==", - "dependencies": { - "multibase": "^4.0.1", - "uint8arrays": "^3.0.0", - "varint": "^5.0.2" - }, - "engines": { - "node": ">=12.0.0", - "npm": ">=6.0.0" - } - }, - "node_modules/multihashes/node_modules/varint": { - "version": "5.0.2", - "resolved": "https://registry.npmjs.org/varint/-/varint-5.0.2.tgz", - "integrity": "sha512-lKxKYG6H03yCZUpAGOPOsMcGxd1RHCu1iKvEHYDPmTyq2HueGhD73ssNBqqQWfvYs04G9iUFRvmAVLW20Jw6ow==" + "version": "9.6.5", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.6.5.tgz", + "integrity": "sha512-vMwf/FUO+qAPvl3vlSZEgEVFY/AxeZq5yg761ScF3CZsXgmTi/HGkicUiNN0CI4PW8FiY2P0OLklOcmQjdQJhw==" }, "node_modules/multistream-select": { "version": "3.0.2", @@ -16294,11 +16264,6 @@ "resolved": "https://registry.npmjs.org/@leichtgewicht/ip-codec/-/ip-codec-2.0.3.tgz", "integrity": "sha512-nkalE/f1RvRGChwBnEIoBfSEYOXnCRdleKuv6+lePbMDrMZXeDQnqak5XDOeBgrPPyPfAdcCu/B5z+v3VhplGg==" }, - "@multiformats/base-x": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/@multiformats/base-x/-/base-x-4.0.1.tgz", - "integrity": "sha512-eMk0b9ReBbV23xXU693TAIrLyeO5iTgBZGSJfpqriG8UkYvr/hC9u9pyMlAakDNHWmbhMZCDs6KQO0jzKD8OTw==" - }, "@noble/ed25519": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.6.0.tgz", @@ -21000,9 +20965,9 @@ "dev": true }, "js-waku": { - "version": "0.22.0", - "resolved": "https://registry.npmjs.org/js-waku/-/js-waku-0.22.0.tgz", - "integrity": "sha512-4aMOJI6HKSaJ4eLfanbD1CDifwUgCPfcIDnIrt96SJA5A3nf9VXFZUmR1huwv07JithmkP9orXNGb6u8KdGcrg==", + "version": "0.24.0", + "resolved": "https://registry.npmjs.org/js-waku/-/js-waku-0.24.0.tgz", + "integrity": "sha512-8l7/WuadaaGy6XmVKutZpJ61JohbBe8WamJUtNiTd8WdxhBuz/rXh5RUiD8mjiiG8kEzQ+3+E68rvIpb/+BbXw==", "requires": { "@chainsafe/libp2p-noise": "^5.0.0", "@ethersproject/rlp": "^5.5.0", @@ -21012,16 +20977,23 @@ "hi-base32": "^0.5.1", "it-concat": "^2.0.0", "it-length-prefixed": "^5.0.2", + "it-pipe": "^1.1.0", "js-sha3": "^0.8.0", "libp2p": "^0.36.2", "libp2p-bootstrap": "^0.14.0", + "libp2p-crypto": "^0.21.2", "libp2p-gossipsub": "0.13.0", + "libp2p-interfaces": "^4.0.6", "libp2p-mplex": "^0.10.4", "libp2p-websockets": "^0.16.1", + "long": "^4.0.0", "multiaddr": "^10.0.1", - "multihashes": "^4.0.3", + "multiformats": "^9.6.5", + "peer-id": "^0.16.0", "protobufjs": "^6.8.8", - "uuid": "^8.3.2" + "uint8arrays": "^3.0.0", + "uuid": "^8.3.2", + "varint": "^6.0.0" } }, "js-yaml": { @@ -21769,35 +21741,10 @@ "multiaddr": "^10.0.0" } }, - "multibase": { - "version": "4.0.6", - "resolved": "https://registry.npmjs.org/multibase/-/multibase-4.0.6.tgz", - "integrity": "sha512-x23pDe5+svdLz/k5JPGCVdfn7Q5mZVMBETiC+ORfO+sor9Sgs0smJzAjfTbM5tckeCqnaUuMYoz+k3RXMmJClQ==", - "requires": { - "@multiformats/base-x": "^4.0.1" - } - }, "multiformats": { - "version": "9.6.4", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.6.4.tgz", - "integrity": "sha512-fCCB6XMrr6CqJiHNjfFNGT0v//dxOBMrOMqUIzpPc/mmITweLEyhvMpY9bF+jZ9z3vaMAau5E8B68DW77QMXkg==" - }, - "multihashes": { - "version": "4.0.3", - "resolved": "https://registry.npmjs.org/multihashes/-/multihashes-4.0.3.tgz", - "integrity": "sha512-0AhMH7Iu95XjDLxIeuCOOE4t9+vQZsACyKZ9Fxw2pcsRmlX4iCn1mby0hS0bb+nQOVpdQYWPpnyusw4da5RPhA==", - "requires": { - "multibase": "^4.0.1", - "uint8arrays": "^3.0.0", - "varint": "^5.0.2" - }, - "dependencies": { - "varint": { - "version": "5.0.2", - "resolved": "https://registry.npmjs.org/varint/-/varint-5.0.2.tgz", - "integrity": "sha512-lKxKYG6H03yCZUpAGOPOsMcGxd1RHCu1iKvEHYDPmTyq2HueGhD73ssNBqqQWfvYs04G9iUFRvmAVLW20Jw6ow==" - } - } + "version": "9.6.5", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.6.5.tgz", + "integrity": "sha512-vMwf/FUO+qAPvl3vlSZEgEVFY/AxeZq5yg761ScF3CZsXgmTi/HGkicUiNN0CI4PW8FiY2P0OLklOcmQjdQJhw==" }, "multistream-select": { "version": "3.0.2", diff --git a/package.json b/package.json index ef40eec14..1b2f2a969 100644 --- a/package.json +++ b/package.json @@ -64,7 +64,7 @@ "@stardazed/streams-polyfill": "^2.4.0", "cross-fetch": "^3.1.5", "ethers": "^5.5.3", - "js-waku": "^0.22.0", + "js-waku": "^0.24.0", "protobufjs": "^6.11.2" }, "devDependencies": { diff --git a/src/Client.ts b/src/Client.ts index 0dd73ccda..1589f2d94 100644 --- a/src/Client.ts +++ b/src/Client.ts @@ -35,6 +35,7 @@ type Nodes = { [k: string]: string } type NodesList = { testnet: Nodes + dev: Nodes } // Default maximum allowed content size @@ -104,7 +105,7 @@ export type ClientOptions = NetworkOptions & KeyStoreOptions & ContentOptions export function defaultOptions(opts?: Partial): ClientOptions { const _defaultOptions: ClientOptions = { keyStoreType: KeyStoreType.networkTopicStoreV1, - env: 'testnet', + env: 'dev', waitForPeersTimeoutMs: 10000, codecs: [new TextCodec()], maxContentSize: MaxContentSize, @@ -353,17 +354,17 @@ export default class Client { return message } - streamIntroductionMessages(): Stream { - return new Stream( + streamIntroductionMessages(): Promise> { + return Stream.create( this, buildUserIntroTopic(this.address), noTransformation ) } - streamConversationMessages(peerAddress: string): Stream { + streamConversationMessages(peerAddress: string): Promise> { const topic = buildDirectMessageTopic(peerAddress, this.address) - return new Stream( + return Stream.create( this, topic, noTransformation, diff --git a/src/Stream.ts b/src/Stream.ts index 63d62565b..fc916b691 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -21,6 +21,8 @@ export default class Stream { // if callback is undefined the stream is closed callback: ((wakuMsg: WakuMessage) => Promise) | undefined + unsubscribeFn?: () => Promise + constructor( client: Client, topic: string, @@ -32,7 +34,6 @@ export default class Stream { this.topic = topic this.client = client this.callback = this.newMessageCallback(messageTransformer, messageFilter) - client.waku.relay.addObserver(this.callback, [topic]) } // returns new closure to handle incoming Waku messages @@ -61,6 +62,27 @@ export default class Stream { } } + private async start(): Promise { + if (!this.callback) { + throw new Error('Missing callback for stream') + } + this.unsubscribeFn = await this.client.waku.filter.subscribe( + this.callback, + [this.topic] + ) + } + + static async create( + client: Client, + topic: string, + messageTransformer: MessageTransformer, + messageFilter?: MessageFilter + ): Promise> { + const stream = new Stream(client, topic, messageTransformer, messageFilter) + await stream.start() + return stream + } + // To make Stream proper Async Iterable [Symbol.asyncIterator](): AsyncIterableIterator { return this @@ -74,7 +96,9 @@ export default class Stream { if (!this.callback) { return { value: undefined, done: true } } - this.client.waku.relay.deleteObserver(this.callback, [this.topic]) + if (this.unsubscribeFn) { + await this.unsubscribeFn() + } this.callback = undefined this.resolvers.forEach((resolve) => resolve({ value: undefined, done: true }) diff --git a/src/conversations/Conversation.ts b/src/conversations/Conversation.ts index 156041b03..a791bfa22 100644 --- a/src/conversations/Conversation.ts +++ b/src/conversations/Conversation.ts @@ -26,7 +26,7 @@ export default class Conversation { /** * Returns a Stream of any new messages to/from the peerAddress */ - streamMessages(): Stream { + streamMessages(): Promise> { return this.client.streamConversationMessages(this.peerAddress) } diff --git a/src/conversations/Conversations.ts b/src/conversations/Conversations.ts index 2a4f6f789..389ceecd7 100644 --- a/src/conversations/Conversations.ts +++ b/src/conversations/Conversations.ts @@ -49,7 +49,7 @@ export default class Conversations { * Will dedupe to not return the same conversation twice in the same stream. * Does not dedupe any other previously seen conversations */ - stream(): Stream { + stream(): Promise> { const messageTransformer: MessageTransformer = ( msg: Message ) => { @@ -72,7 +72,7 @@ export default class Conversations { return true } - return new Stream( + return Stream.create( this.client, buildUserIntroTopic(this.client.address), messageTransformer, diff --git a/test/Client.test.ts b/test/Client.test.ts index c2c8fefa2..547835fc6 100644 --- a/test/Client.test.ts +++ b/test/Client.test.ts @@ -1,5 +1,11 @@ import assert from 'assert' -import { pollFor, newWallet, dumpStream } from './helpers' +import { + pollFor, + newWallet, + dumpStream, + newLocalDockerClient, + newDevClient, +} from './helpers' import { publishUserContact, sleep } from '../src/utils' import Client, { KeyStoreType } from '../src/Client' import { TestKeyCodec, ContentTypeTestKey } from './ContentTypeTestKey' @@ -13,16 +19,6 @@ import { PrivateKeyBundle, } from '../src' -const newLocalDockerClient = (): Promise => - Client.create(newWallet(), { - bootstrapAddrs: [ - '/ip4/127.0.0.1/tcp/9001/ws/p2p/16Uiu2HAmNCxLZCkXNbpVPBpSSnHj9iq4HZQj7fxRzw2kj1kKSHHA', - ], - }) - -const newTestnetClient = (): Promise => - Client.create(newWallet(), { env: 'testnet' }) - describe('Client', () => { const tests = [ { @@ -32,8 +28,8 @@ describe('Client', () => { ] if (process.env.CI || process.env.TESTNET) { tests.push({ - name: 'testnet', - newClient: newTestnetClient, + name: 'dev', + newClient: newDevClient, }) } tests.forEach((testCase) => { @@ -72,10 +68,10 @@ describe('Client', () => { }) it('send, stream and list messages', async () => { - const bobIntros = bob.streamIntroductionMessages() - const bobAlice = bob.streamConversationMessages(alice.address) - const aliceIntros = alice.streamIntroductionMessages() - const aliceBob = alice.streamConversationMessages(bob.address) + const bobIntros = await bob.streamIntroductionMessages() + const bobAlice = await bob.streamConversationMessages(alice.address) + const aliceIntros = await alice.streamIntroductionMessages() + const aliceBob = await alice.streamConversationMessages(bob.address) // alice sends intro await alice.sendMessage(bob.address, 'hi bob!') @@ -156,8 +152,8 @@ describe('Client', () => { ) }) it('messaging yourself', async () => { - const convo = alice.streamConversationMessages(alice.address) - const intro = alice.streamIntroductionMessages() + const convo = await alice.streamConversationMessages(alice.address) + const intro = await alice.streamIntroductionMessages() const messages = ['Hey me!', 'Yo!', 'Over and out'] for (let message of messages) { await alice.sendMessage(alice.address, message) @@ -173,7 +169,7 @@ describe('Client', () => { }) it('for-await-of with stream', async () => { - const convo = alice.streamConversationMessages(bob.address) + const convo = await alice.streamConversationMessages(bob.address) let count = 5 await alice.sendMessage(bob.address, 'msg ' + count) for await (const msg of convo) { @@ -204,7 +200,7 @@ describe('Client', () => { }) it('can send compressed messages', async () => { - const convo = bob.streamConversationMessages(alice.address) + const convo = await bob.streamConversationMessages(alice.address) const content = 'A'.repeat(111) await alice.sendMessage(bob.address, content, { contentType: ContentTypeText, @@ -217,7 +213,7 @@ describe('Client', () => { }) it('can send custom content type', async () => { - const stream = bob.streamConversationMessages(alice.address) + const stream = await bob.streamConversationMessages(alice.address) const key = PrivateKey.generate().publicKey // alice doesn't recognize the type @@ -268,7 +264,7 @@ describe('Client', () => { }) it('filters out spoofed messages', async () => { - const stream = bob.streamConversationMessages(alice.address) + const stream = await bob.streamConversationMessages(alice.address) // mallory takes over alice's client const malloryWallet = newWallet() const mallory = await PrivateKeyBundle.generate(malloryWallet) diff --git a/test/conversations/Conversation.test.ts b/test/conversations/Conversation.test.ts index c3599ebff..482d29713 100644 --- a/test/conversations/Conversation.test.ts +++ b/test/conversations/Conversation.test.ts @@ -1,12 +1,6 @@ import { Client } from '../../src' -import { Wallet } from 'ethers' import { sleep } from '../../src/utils' - -const opts = { - bootstrapAddrs: [ - '/ip4/127.0.0.1/tcp/9001/ws/p2p/16Uiu2HAmNCxLZCkXNbpVPBpSSnHj9iq4HZQj7fxRzw2kj1kKSHHA', - ], -} +import { newLocalDockerClient } from '../helpers' jest.setTimeout(20000) @@ -15,8 +9,8 @@ describe('conversations', () => { let bob: Client beforeEach(async () => { - alice = await Client.create(Wallet.createRandom(), opts) - bob = await Client.create(Wallet.createRandom(), opts) + alice = await newLocalDockerClient() + bob = await newLocalDockerClient() }) afterEach(async () => { @@ -53,7 +47,7 @@ describe('conversations', () => { ) // Start the stream before sending the message to ensure delivery - const stream = aliceConversation.streamMessages() + const stream = await aliceConversation.streamMessages() await bobConversation.send('gm') let numMessages = 0 diff --git a/test/conversations/Conversations.test.ts b/test/conversations/Conversations.test.ts index e36414588..8791bbf9c 100644 --- a/test/conversations/Conversations.test.ts +++ b/test/conversations/Conversations.test.ts @@ -1,13 +1,7 @@ +import { newLocalDockerClient } from './../helpers' import { Client } from '../../src' -import { Wallet } from 'ethers' import { sleep } from '../../src/utils' -const opts = { - bootstrapAddrs: [ - '/ip4/127.0.0.1/tcp/9001/ws/p2p/16Uiu2HAmNCxLZCkXNbpVPBpSSnHj9iq4HZQj7fxRzw2kj1kKSHHA', - ], -} - jest.setTimeout(20000) describe('conversations', () => { @@ -15,8 +9,8 @@ describe('conversations', () => { let bob: Client beforeEach(async () => { - alice = await Client.create(Wallet.createRandom(), opts) - bob = await Client.create(Wallet.createRandom(), opts) + alice = await newLocalDockerClient() + bob = await newLocalDockerClient() await sleep(100) }) @@ -43,7 +37,7 @@ describe('conversations', () => { }) it('streams conversations', async () => { - const stream = alice.conversations.stream() + const stream = await alice.conversations.stream() const conversation = await alice.conversations.newConversation(bob.address) await conversation.send('hi bob') diff --git a/test/helpers.ts b/test/helpers.ts index 4b079aa40..d16c7e6e2 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -5,10 +5,14 @@ import { ContentCodec, ContentTypeId, TextCodec, + Client, } from '../src' import Stream from '../src/Stream' import { promiseWithTimeout } from '../src/utils' +const LOCAL_DOCKER_MULTIADDR = + '/ip4/127.0.0.1/tcp/9001/ws/p2p/16Uiu2HAmNCxLZCkXNbpVPBpSSnHj9iq4HZQj7fxRzw2kj1kKSHHA' + export const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) @@ -89,3 +93,11 @@ export class CodecRegistry { return this._codecs.get(key) } } + +export const newLocalDockerClient = (): Promise => + Client.create(newWallet(), { + bootstrapAddrs: [LOCAL_DOCKER_MULTIADDR], + }) + +export const newDevClient = (): Promise => + Client.create(newWallet(), { env: 'dev' })