Skip to content

Commit

Permalink
Merge pull request #110 from xmtp/use-waku-filter
Browse files Browse the repository at this point in the history
Use waku filter
  • Loading branch information
neekolas authored May 31, 2022
2 parents 0ec6b34 + a819cee commit ca6d413
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 141 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const messages = await conversation.messages()
// Send a message
await conversation.send('gm')
// Listen for new messages in the conversation
for await (const message of conversation.streamMessages()) {
for await (const message of await conversation.streamMessages()) {
console.log(`[${message.senderAddress}]: ${message.text}`)
}
```
Expand Down Expand Up @@ -134,7 +134,7 @@ You can also listen for new conversations being started in real-time. This will
_Warning: this stream will continue infinitely. To end the stream you can either break from the loop, or call `await stream.return()`_

```ts
const stream = xmtp.conversations.stream()
const stream = await xmtp.conversations.stream()
for await (const conversation of stream) {
console.log(`New conversation started with ${conversation.peerAddress}`)
// Say hello to your new friend
Expand Down Expand Up @@ -193,7 +193,7 @@ The Stream returned by the `stream` methods is an asynchronous iterator and as s
const conversation = await xmtp.conversations.newConversation(
'0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045'
)
for await (const message of conversation.streamMessages()) {
for await (const message of await conversation.streamMessages()) {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue
Expand Down
1 change: 1 addition & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- --store
- --message-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable
- --lightpush
- --filter
- --ws-port=9001
- --wait-for-db=30s
ports:
Expand Down
115 changes: 31 additions & 84 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"@stardazed/streams-polyfill": "^2.4.0",
"cross-fetch": "^3.1.5",
"ethers": "^5.5.3",
"js-waku": "^0.22.0",
"js-waku": "^0.24.0",
"protobufjs": "^6.11.2"
},
"devDependencies": {
Expand Down
11 changes: 6 additions & 5 deletions src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Nodes = { [k: string]: string }

type NodesList = {
testnet: Nodes
dev: Nodes
}

// Default maximum allowed content size
Expand Down Expand Up @@ -104,7 +105,7 @@ export type ClientOptions = NetworkOptions & KeyStoreOptions & ContentOptions
export function defaultOptions(opts?: Partial<ClientOptions>): ClientOptions {
const _defaultOptions: ClientOptions = {
keyStoreType: KeyStoreType.networkTopicStoreV1,
env: 'testnet',
env: 'dev',
waitForPeersTimeoutMs: 10000,
codecs: [new TextCodec()],
maxContentSize: MaxContentSize,
Expand Down Expand Up @@ -353,17 +354,17 @@ export default class Client {
return message
}

streamIntroductionMessages(): Stream<Message> {
return new Stream<Message>(
streamIntroductionMessages(): Promise<Stream<Message>> {
return Stream.create<Message>(
this,
buildUserIntroTopic(this.address),
noTransformation
)
}

streamConversationMessages(peerAddress: string): Stream<Message> {
streamConversationMessages(peerAddress: string): Promise<Stream<Message>> {
const topic = buildDirectMessageTopic(peerAddress, this.address)
return new Stream<Message>(
return Stream.create<Message>(
this,
topic,
noTransformation,
Expand Down
28 changes: 26 additions & 2 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export default class Stream<T> {
// if callback is undefined the stream is closed
callback: ((wakuMsg: WakuMessage) => Promise<void>) | undefined

unsubscribeFn?: () => Promise<void>

constructor(
client: Client,
topic: string,
Expand All @@ -32,7 +34,6 @@ export default class Stream<T> {
this.topic = topic
this.client = client
this.callback = this.newMessageCallback(messageTransformer, messageFilter)
client.waku.relay.addObserver(this.callback, [topic])
}

// returns new closure to handle incoming Waku messages
Expand Down Expand Up @@ -61,6 +62,27 @@ export default class Stream<T> {
}
}

private async start(): Promise<void> {
if (!this.callback) {
throw new Error('Missing callback for stream')
}
this.unsubscribeFn = await this.client.waku.filter.subscribe(
this.callback,
[this.topic]
)
}

static async create<T>(
client: Client,
topic: string,
messageTransformer: MessageTransformer<T>,
messageFilter?: MessageFilter
): Promise<Stream<T>> {
const stream = new Stream(client, topic, messageTransformer, messageFilter)
await stream.start()
return stream
}

// To make Stream proper Async Iterable
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
return this
Expand All @@ -74,7 +96,9 @@ export default class Stream<T> {
if (!this.callback) {
return { value: undefined, done: true }
}
this.client.waku.relay.deleteObserver(this.callback, [this.topic])
if (this.unsubscribeFn) {
await this.unsubscribeFn()
}
this.callback = undefined
this.resolvers.forEach((resolve) =>
resolve({ value: undefined, done: true })
Expand Down
2 changes: 1 addition & 1 deletion src/conversations/Conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export default class Conversation {
/**
* Returns a Stream of any new messages to/from the peerAddress
*/
streamMessages(): Stream<Message> {
streamMessages(): Promise<Stream<Message>> {
return this.client.streamConversationMessages(this.peerAddress)
}

Expand Down
4 changes: 2 additions & 2 deletions src/conversations/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export default class Conversations {
* Will dedupe to not return the same conversation twice in the same stream.
* Does not dedupe any other previously seen conversations
*/
stream(): Stream<Conversation> {
stream(): Promise<Stream<Conversation>> {
const messageTransformer: MessageTransformer<Conversation> = (
msg: Message
) => {
Expand All @@ -72,7 +72,7 @@ export default class Conversations {
return true
}

return new Stream<Conversation>(
return Stream.create<Conversation>(
this.client,
buildUserIntroTopic(this.client.address),
messageTransformer,
Expand Down
Loading

0 comments on commit ca6d413

Please sign in to comment.