Skip to content

Commit

Permalink
feature: improve server sent event in typescript sdk (#303)
Browse files Browse the repository at this point in the history
Our home-made parser was failing with partial chunks of data (in case of
very large amounts of data), so i'm reverting to using
eventsource-parser which is a pretty well supported library for sse
parsing that is environment agnostic.
  • Loading branch information
geclos authored Sep 28, 2024
1 parent a799133 commit ec11c19
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 87 deletions.
6 changes: 5 additions & 1 deletion packages/core/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,9 @@ export type ChainEventDto =
}
| {
type: ChainEventTypes.Error
error: Error
error: {
name: string
message: string
stack?: string
}
}
1 change: 1 addition & 0 deletions packages/sdks/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"types": "dist/index.d.ts",
"dependencies": {
"@t3-oss/env-core": "^0.10.1",
"eventsource-parser": "^2.0.1",
"zod": "^3.23.8"
},
"devDependencies": {
Expand Down
12 changes: 10 additions & 2 deletions packages/sdks/typescript/src/chat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ describe('message', () => {
const stream = new ReadableStream({
start(controller) {
CHUNKS.forEach((chunk, index) => {
controller.enqueue(encoder.encode(chunk))
// @ts-expect-error
const { event, data } = parseSSE(chunk)
controller.enqueue(
encoder.encode(`event: ${event}\ndata: ${data}\n\n`),
)
if (index === CHUNKS.length - 1) {
controller.close()
}
Expand Down Expand Up @@ -97,7 +101,11 @@ describe('message', () => {
const stream = new ReadableStream({
start(controller) {
CHUNKS.forEach((chunk, index) => {
controller.enqueue(encoder.encode(chunk))
// @ts-expect-error
const { event, data } = parseSSE(chunk)
controller.enqueue(
encoder.encode(`event: ${event}\ndata: ${data}\n\n`),
)
if (index === CHUNKS.length - 1) {
controller.close()
}
Expand Down
125 changes: 48 additions & 77 deletions packages/sdks/typescript/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
import env from '$sdk/env'
import { GatewayApiConfig, RouteResolver } from '$sdk/utils'
import { BodyParams, HandlerType, UrlParams } from '$sdk/utils/types'
import { ParsedEvent, ReconnectInterval } from 'eventsource-parser'
import { EventSourceParserStream } from 'eventsource-parser/stream'

export type StreamChainResponse = {
conversation: Message[]
Expand Down Expand Up @@ -128,83 +130,62 @@ export class Latitude {
let conversation: Message[] = []
let uuid: string | undefined
let chainResponse: ChainCallResponseDto | undefined
const consumeEvent = (event: { event: string; data: string }) => {
if (!event.event) {
onError?.(new Error(`Invalid SSE event: ${event}`))
return
}
if (!event.data) {
onError?.(new Error(`Invalid data in server event:\n${event}`))
return
}
if (event.event === 'error') {
onError?.(new Error(event.data))
return
}

const json = this.parseJSON(event.data)
if (!json) {
const error = new Error(`Invalid JSON in server event:\n${event}`)
onError?.(error)
return
}

if (event.event === 'latitude-event') {
const messages = 'messages' in json ? (json.messages! as Message[]) : []

if (json.type === 'chain-error') {
const error = new Error((json.error as Error).message)
onError?.(error)
const parser = new EventSourceParserStream()
const eventStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(parser)

return
}

if (messages.length > 0) {
conversation.push(...messages)
}

if (json.type === 'chain-complete') {
uuid = json.uuid!
chainResponse = json.response!
try {
for await (const event of eventStream as unknown as AsyncIterable<
ParsedEvent | ReconnectInterval
>) {
const parsedEvent = event as ParsedEvent | ReconnectInterval

if (parsedEvent.type === 'event') {
const data = this.parseJSON(parsedEvent.data)
if (!data) {
throw new Error(
`Invalid JSON in server event:\n${parsedEvent.data}`,
)
}

if (parsedEvent.event === 'latitude-event') {
const messages =
'messages' in data ? (data.messages! as Message[]) : []

if (data.type === 'chain-error') {
throw new Error(data.error.message)
}

if (messages.length > 0) {
conversation.push(...messages)
}

if (data.type === 'chain-complete') {
uuid = data.uuid!
chainResponse = data.response!
}
}

onEvent?.({ event: parsedEvent.event as StreamEventTypes, data })
}
}

onEvent?.({ event: event.event as StreamEventTypes, data: json })
}
if (!uuid || !chainResponse) return

const reader = stream.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break

const chunks = new TextDecoder('utf-8').decode(value).trim()
for (const chunk of chunks.split('\n\n')) {
const event = this.parseEvent(chunk)
if (!event) {
onError?.(new Error(`Invalid SSE event: ${chunk}`))
return
}
const data = this.parseData(chunk)
if (data === null || data === undefined) {
onError?.(new Error(`Invalid data in server event:\n${chunk}`))
return
}

consumeEvent({ event, data })
const finalResponse = {
conversation,
uuid,
response: chainResponse,
}
}

if (!uuid || !chainResponse) return
onFinished?.(finalResponse)

const finalResponse = {
conversation,
uuid,
response: chainResponse,
return finalResponse
} catch (error) {
onError?.(error as Error)
}

onFinished?.(finalResponse)

return finalResponse
}

private async request<H extends HandlerType>({
Expand Down Expand Up @@ -233,16 +214,6 @@ export class Latitude {
}
}

private parseEvent(chunk: string) {
const event = chunk.split('\n')[0]
return event?.split('event: ')[1]
}

private parseData(chunk: string) {
const data = chunk.split('\n')[1]
return data?.split('data: ')[1]
}

private get authHeader() {
return {
Authorization: `Bearer ${this.apiKey}`,
Expand Down
12 changes: 10 additions & 2 deletions packages/sdks/typescript/src/runDocument.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ describe('run', () => {
const stream = new ReadableStream({
start(controller) {
CHUNKS.forEach((chunk, index) => {
controller.enqueue(encoder.encode(chunk))
// @ts-expect-error
const { event, data } = parseSSE(chunk)
controller.enqueue(
encoder.encode(`event: ${event}\ndata: ${data}\n\n`),
)
if (index === CHUNKS.length - 1) {
controller.close()
}
Expand Down Expand Up @@ -171,7 +175,11 @@ describe('run', () => {
const stream = new ReadableStream({
start(controller) {
CHUNKS.forEach((chunk, index) => {
controller.enqueue(encoder.encode(chunk))
// @ts-expect-error
const { event, data } = parseSSE(chunk)
controller.enqueue(
encoder.encode(`event: ${event}\ndata: ${data}\n\n`),
)
if (index === CHUNKS.length - 1) {
controller.close()
}
Expand Down
15 changes: 10 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ec11c19

Please sign in to comment.