Skip to content

Commit

Permalink
Add stream_all_messages to node bindings (#838)
Browse files Browse the repository at this point in the history
* Add stream_all_messages

* Add tests

* Add release
  • Loading branch information
rygine authored Jun 12, 2024
1 parent 67604da commit 8f8b2ac
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 21 deletions.
4 changes: 4 additions & 0 deletions bindings_node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# @xmtp/mls-client-bindings-node

## 0.0.4

- Added `stream_all_messages`

## 0.0.3

- Fixed default export value
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@xmtp/mls-client-bindings-node",
"version": "0.0.3",
"version": "0.0.4",
"repository": {
"type": "git",
"url": "git+https://[email protected]/xmtp/libxmtp.git",
Expand Down
38 changes: 18 additions & 20 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun
use napi::JsFunction;
use napi_derive::napi;

// use crate::messages::NapiMessage;
use crate::messages::NapiMessage;
use crate::{
groups::{GroupPermissions, NapiGroup},
mls_client::RustXmtpClient,
Expand Down Expand Up @@ -147,23 +147,21 @@ impl NapiConversations {
))
}

// TODO: this fn needs to be sync for it to work with NAPI
// #[napi(ts_args_type = "callback: (err: null | Error, result: NapiGroup) => void")]
// pub async fn stream_all_messages(&self, callback: JsFunction) -> Result<NapiStreamCloser> {
// let tsfn: ThreadsafeFunction<NapiMessage, ErrorStrategy::CalleeHandled> =
// callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
// let stream_closer = RustXmtpClient::stream_all_messages_with_callback(
// self.inner_client.clone(),
// move |message| {
// tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking);
// },
// )
// .await
// .map_err(|e| Error::from_reason(format!("{}", e)))?;

// Ok(NapiStreamCloser::new(
// stream_closer.close_fn,
// stream_closer.is_closed_atomic,
// ))
// }
#[napi(ts_args_type = "callback: (err: null | Error, result: NapiMessage) => void")]
pub fn stream_all_messages(&self, callback: JsFunction) -> Result<NapiStreamCloser> {
let tsfn: ThreadsafeFunction<NapiMessage, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let stream_closer = RustXmtpClient::stream_all_messages_with_callback_sync(
self.inner_client.clone(),
move |message| {
tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking);
},
)
.map_err(|e| Error::from_reason(format!("{}", e)))?;

Ok(NapiStreamCloser::new(
stream_closer.close_fn,
stream_closer.is_closed_atomic,
))
}
}
77 changes: 77 additions & 0 deletions bindings_node/test/AsyncStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
type Value<T, V> = V extends undefined ? T : V

type ResolveValue<T, V> = {
value: Value<T, V> | undefined
done: boolean
}

type ResolveNext<T, V> = (resolveValue: ResolveValue<T, V>) => void

type TransformValue<T, V> = (value: T) => Value<T, V>

export class AsyncStream<T, V = undefined> {
#done = false
#resolveNext: ResolveNext<T, V> | null
#queue: Value<T, V>[]
#transformValue?: TransformValue<T, V>

stopCallback: (() => void) | undefined = undefined

constructor(
transformValue: V extends undefined ? undefined : TransformValue<T, V>
) {
this.#queue = []
this.#resolveNext = null
this.#done = false
this.#transformValue = transformValue
}

callback = (err: Error | null, value: T) => {
if (err) {
console.error('stream error', err)
this.stop()
return
}

if (this.#done) {
return
}

const newValue = this.#transformValue
? this.#transformValue(value)
: // must assert type because TypeScript can't infer that T is assignable
// to Value<T, V> when this.#transformValue is undefined
(value as unknown as Value<T, V>)

if (this.#resolveNext) {
this.#resolveNext({ value: newValue, done: false })
this.#resolveNext = null
} else {
this.#queue.push(newValue)
}
}

stop = () => {
this.#done = true
if (this.#resolveNext) {
this.#resolveNext({ value: undefined, done: true })
}
this.stopCallback?.()
}

next = (): Promise<ResolveValue<T, V>> => {
if (this.#queue.length > 0) {
return Promise.resolve({ value: this.#queue.shift(), done: false })
} else if (this.#done) {
return Promise.resolve({ value: undefined, done: true })
} else {
return new Promise((resolve) => {
this.#resolveNext = resolve
})
}
};

[Symbol.asyncIterator]() {
return this
}
}
128 changes: 128 additions & 0 deletions bindings_node/test/Conversations.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { encode } from 'punycode'
import { describe, expect, it } from 'vitest'
import { AsyncStream } from '@test/AsyncStream'
import {
createRegisteredClient,
createUser,
encodeTextMessage,
} from '@test/helpers'
import { NapiGroup, NapiMessage } from '../dist'

describe('Conversations', () => {
it('should not have initial conversations', async () => {
const user = createUser()
const client = await createRegisteredClient(user)
const conversations = client.conversations().list()
expect((await conversations).length).toBe(0)
})

it('should create a new group', async () => {
const user1 = createUser()
const user2 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const group = await client1
.conversations()
.createGroup([user2.account.address])
expect(group).toBeDefined()
expect(group.id()).toBeDefined()
expect(group.createdAtNs()).toBeTypeOf('number')
expect(group.isActive()).toBe(true)
expect(group.groupName()).toBe('')
expect(group.addedByInboxId()).toBe(client1.inboxId())
expect(group.findMessages().length).toBe(1)
const members = group.listMembers()
expect(members.length).toBe(2)
const memberInboxIds = members.map((member) => member.inboxId)
expect(memberInboxIds).toContain(client1.inboxId())
expect(memberInboxIds).toContain(client2.inboxId())
expect(group.groupMetadata().conversationType()).toBe('group')
expect(group.groupMetadata().creatorInboxId()).toBe(client1.inboxId())

const group1 = await client1.conversations().list()
expect(group1.length).toBe(1)
expect(group1[0].id).toBe(group.id)

expect((await client2.conversations().list()).length).toBe(0)

await client2.conversations().sync()

const group2 = await client2.conversations().list()
expect(group2.length).toBe(1)
expect(group2[0].id).toBe(group.id)
})

it('should stream new groups', async () => {
const user1 = createUser()
const user2 = createUser()
const user3 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const client3 = await createRegisteredClient(user3)
const asyncStream = new AsyncStream<NapiGroup>(undefined)
const stream = client3.conversations().stream(asyncStream.callback)
const group1 = await client1
.conversations()
.createGroup([user3.account.address])
const group2 = await client2
.conversations()
.createGroup([user3.account.address])
let count = 0
for await (const convo of asyncStream) {
count++
expect(convo).toBeDefined()
if (count === 1) {
expect(convo!.id).toBe(group1.id)
}
if (count === 2) {
expect(convo!.id).toBe(group2.id)
break
}
}
asyncStream.stop()
stream.end()
})

it('should stream all messages', async () => {
const user1 = createUser()
const user2 = createUser()
const user3 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const client3 = await createRegisteredClient(user3)
await client1.conversations().createGroup([user2.account.address])
await client1.conversations().createGroup([user3.account.address])

const asyncStream = new AsyncStream<NapiMessage>(undefined)
const stream = client1
.conversations()
.streamAllMessages(asyncStream.callback)

const groups2 = client2.conversations()
await groups2.sync()
const groupsList2 = await groups2.list()

const groups3 = client3.conversations()
await groups3.sync()
const groupsList3 = await groups3.list()

await groupsList2[0].send(encodeTextMessage('gm!'))
await groupsList3[0].send(encodeTextMessage('gm2!'))

let count = 0

for await (const message of asyncStream) {
count++
expect(message).toBeDefined()
if (count === 1) {
expect(message!.senderInboxId).toBe(client2.inboxId())
}
if (count === 2) {
expect(message!.senderInboxId).toBe(client3.inboxId())
break
}
}
asyncStream.stop()
stream.end()
})
})
15 changes: 15 additions & 0 deletions bindings_node/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,18 @@ export const createRegisteredClient = async (user: User) => {
}
return client
}

export const encodeTextMessage = (text: string) => {
return {
type: {
authorityId: 'xmtp.org',
typeId: 'text',
versionMajor: 1,
versionMinor: 0,
},
parameters: {
encoding: 'UTF-8',
},
content: new TextEncoder().encode(text),
}
}

0 comments on commit 8f8b2ac

Please sign in to comment.