Skip to content

Commit

Permalink
Setup websockets communication between web and workers
Browse files Browse the repository at this point in the history
We want to keep the client informed when evaluations are running. How
many logs are created and how many logs fail.
This PR setup a new Express server that setup a Socket.IO websocket
server with 2 endpoints /websocket and worker-websocket. The one for web
has secure cookies token auth and the one for workers share a secret
token between Sockets server and workers.

I think this setup makes sense
  • Loading branch information
andresgutgon committed Sep 16, 2024
1 parent 2c8576e commit 6ece440
Show file tree
Hide file tree
Showing 43 changed files with 2,423 additions and 1,064 deletions.
3 changes: 2 additions & 1 deletion apps/web/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const nextConfig = {
'flydrive/drivers/fs',
'flydrive/drivers/types',
'@sentry/nextjs',
'bullmq'
'bullmq',
'jose',
],
experimental: {
// Dear developer,
Expand Down
2 changes: 2 additions & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"date-fns": "^3.6.0",
"drizzle-orm": "^0.33.0",
"ioredis": "^5.4.1",
"jose": "^5.8.0",
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
Expand All @@ -42,6 +43,7 @@
"nprogress": "^0.2.0",
"react": "19.0.0-rc-f994737d14-20240522",
"react-dom": "19.0.0-rc-f994737d14-20240522",
"socket.io-react-hook": "^2.4.5",
"swr": "^2.2.5",
"use-debounce": "^10.0.1",
"zod": "^3.23.8",
Expand Down
3 changes: 1 addition & 2 deletions apps/web/src/actions/invitations/accept.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ export const acceptInvitationAction = createServerAction()
if (!user) throw new NotFoundError('User not found')

await acceptInvitation({ membership, user })

setSession({
await setSession({
sessionData: {
user,
workspace: { id: Number(workspace.id), name: workspace.name },
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/actions/magicLinkTokens/confirm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const confirmMagicLinkTokenAction = createServerAction()
r.unwrap(),
)

setSession({
await setSession({
sessionData: {
user,
workspace,
Expand Down
17 changes: 17 additions & 0 deletions apps/web/src/actions/user/logoutAction.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
'use server'

import { type TokenType } from '@latitude-data/core/websockets/constants'
import { authProcedure } from '$/actions/procedures'
import { removeSession } from '$/services/auth/removeSession'
import { ROUTES } from '$/services/routes'
import { ReadonlyRequestCookies } from 'next/dist/server/web/spec-extension/adapters/request-cookies'
import { redirect } from 'next/navigation'

function removeSocketCookie({
name,
cookies,
}: {
name: TokenType
cookies: ReadonlyRequestCookies
}) {
cookies.delete(name)
}

export const logoutAction = authProcedure
.createServerAction()
.handler(async ({ ctx }) => {
removeSession({ session: ctx.session })
const { cookies: getCookies } = await import('next/headers')

const cookies = getCookies()
removeSocketCookie({ name: 'websocket', cookies })
removeSocketCookie({ name: 'websocketRefresh', cookies })

redirect(ROUTES.auth.login)
})
31 changes: 31 additions & 0 deletions apps/web/src/actions/user/refreshWebsocketTokenAction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use server'

import { verifyWebsocketToken } from '@latitude-data/core/websockets/utils'
import { setWebsocketSessionCookie } from '$/services/auth/setSession'

import { authProcedure } from '../procedures'

export const refreshWebesocketTokenAction = authProcedure
.createServerAction()
.handler(async ({ ctx: { user, workspace } }) => {
const { cookies } = await import('next/headers')
const refreshWebsocketCookie = cookies().get('websocketRefresh')
const refreshToken = refreshWebsocketCookie?.value
const result = await verifyWebsocketToken({
token: refreshToken,
type: 'websocket',
})

if (!result.error) return { success: true }

await setWebsocketSessionCookie({
name: 'websocket',
sessionData: { user, workspace },
})
await setWebsocketSessionCookie({
name: 'websocketRefresh',
sessionData: { user, workspace },
})

return { success: true }
})
2 changes: 1 addition & 1 deletion apps/web/src/actions/user/setupAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const setupAction = errorHandlingProcedure
const result = await setupService(input)
const sessionData = result.unwrap()

setSession({ sessionData })
await setSession({ sessionData })

redirect(ROUTES.root)
})
15 changes: 12 additions & 3 deletions apps/web/src/app/(private)/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { ReactNode } from 'react'

import { SessionProvider } from '@latitude-data/web-ui/browser'
import {
LatitudeWebsocketsProvider,
SocketIOProvider,
} from '$/components/Providers/WebsocketsProvider'
import env from '$/env'
import { getCurrentUser } from '$/services/auth/getCurrentUser'
import { getSession } from '$/services/auth/getSession'
import { ROUTES } from '$/services/routes'
Expand All @@ -17,8 +22,12 @@ export default async function PrivateLayout({
const { workspace, user } = await getCurrentUser()

return (
<SessionProvider currentUser={user} workspace={workspace}>
{children}
</SessionProvider>
<SocketIOProvider>
<SessionProvider currentUser={user} workspace={workspace}>
<LatitudeWebsocketsProvider socketServer={env.WEBSOCKETS_SERVER}>
{children}
</LatitudeWebsocketsProvider>
</SessionProvider>
</SocketIOProvider>
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
'use client'

import { useCallback, useEffect, useRef, useState } from 'react'

import { EvaluationDto } from '@latitude-data/core/browser'
import { ProgressIndicator, useCurrentDocument } from '@latitude-data/web-ui'
import {
useSockets,
type EventArgs,
} from '$/components/Providers/WebsocketsProvider/useSockets'

const DISAPERING_IN_MS = 1500
export function EvaluationStatusBanner({
evaluation,
}: {
evaluation: EvaluationDto
}) {
const timeoutRef = useRef<number | null>(null)
const [jobs, setJobs] = useState<EventArgs<'evaluationStatus'>[]>([])
const document = useCurrentDocument()
const onMessage = useCallback(
(args: EventArgs<'evaluationStatus'>) => {
if (evaluation.id !== args.evaluationId) return
if (document.documentUuid !== args.documentUuid) return

setJobs((prevJobs) => {
const jobIndex = prevJobs.findIndex(
(job) => job.batchId === args.batchId,
)

if (jobIndex === -1) {
return [...prevJobs, args]
} else {
const newJobs = [...prevJobs]
newJobs[jobIndex] = args

if (args.status && args.status === 'finished') {
setTimeout(() => {
setJobs((currentJobs) => {
return currentJobs.filter((job) => job.batchId !== args.batchId)
})
}, DISAPERING_IN_MS)
}

return newJobs
}
})
},
[evaluation.id, document.documentUuid],
)
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current)
}
}
}, [])

useSockets({ event: 'evaluationStatus', onMessage })

return (
<>
{jobs.map((job) => (
<ProgressIndicator
key={job.batchId}
state={job.status === 'finished' ? 'completed' : 'running'}
>
{`Generating batch evaluation (ID: ${job.batchId}) ${job.completed}/${job.initialTotal}`}
</ProgressIndicator>
))}
</>
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { useProviderLog } from '$/stores/providerLogs'

import { EvaluationResultInfo } from './EvaluationResultInfo'
import { EvaluationResultsTable } from './EvaluationResultsTable'
import { EvaluationStatusBanner } from './EvaluationStatusBanner'

export function EvaluationResults({
evaluation,
Expand All @@ -25,6 +26,7 @@ export function EvaluationResults({
return (
<div className='flex flex-col gap-4'>
<Text.H4>Evaluation Results</Text.H4>
<EvaluationStatusBanner evaluation={evaluation} />
<div className='flex flex-row w-full h-full overflow-hidden gap-4'>
<div className='flex-grow min-w-0 h-full'>
<EvaluationResultsTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { useCallback } from 'react'

import { ConversationMetadata } from '@latitude-data/compiler'
import { DocumentVersion, EvaluationDto } from '@latitude-data/core/browser'
import { Button, CloseTrigger, Modal, useToast } from '@latitude-data/web-ui'
import { Button, CloseTrigger, Modal } from '@latitude-data/web-ui'
import { useNavigate } from '$/hooks/useNavigate'
import { ROUTES } from '$/services/routes'

Expand All @@ -25,7 +25,6 @@ export default function CreateBatchEvaluationModal({
evaluation: EvaluationDto
documentMetadata: ConversationMetadata
}) {
const { toast } = useToast()
const navigate = useNavigate()
const documentUuid = document.documentUuid
const goToDetail = useCallback(() => {
Expand All @@ -42,10 +41,6 @@ export default function CreateBatchEvaluationModal({
projectId,
commitUuid,
onSuccess: () => {
toast({
title: 'Success',
description: 'Batch evaluation is processing',
})
goToDetail()
},
})
Expand Down
108 changes: 108 additions & 0 deletions apps/web/src/components/Providers/WebsocketsProvider/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use client'

import {
createContext,
ReactNode,
useCallback,
useContext,
useEffect,
} from 'react'

import {
WebClientToServerEvents,
WebServerToClientEvents,
} from '@latitude-data/core/browser'
import { useSession, useToast } from '@latitude-data/web-ui'
import { refreshWebesocketTokenAction } from '$/actions/user/refreshWebsocketTokenAction'
import useCurrentWorkspace from '$/stores/currentWorkspace'
import { IoProvider, useSocket } from 'socket.io-react-hook'

export const SocketIOProvider = ({ children }: { children: ReactNode }) => {
return <IoProvider>{children}</IoProvider>
}

function useJoinWorkspace({ connection }: { connection: IWebsocketConfig }) {
const { currentUser } = useSession()
const { data: workspace } = useCurrentWorkspace()
return useCallback(() => {
connection.socket.emit('joinWorkspace', {
workspaceId: workspace.id,
userId: currentUser.id,
})
}, [workspace.id, connection.socket, connection.connected])
}

export function useSocketConnection({
socketServer,
}: {
socketServer: string
}) {
const { toast } = useToast()
const connection = useSocket<
WebServerToClientEvents,
WebClientToServerEvents
>(
`${socketServer}/web`, // namespace
{
path: '/websocket', // Socket server endpoint
withCredentials: true, // Cookies cross-origin
transports: ['websocket'],
},
)

connection.socket.on('connect_error', async (error) => {
if (error.message.startsWith('AUTH_ERROR')) {
const [data] = await refreshWebesocketTokenAction()

if (data && data.success) {
connection.socket.connect()
} else {
toast({
title: 'We have a problem reconnecting to the server',
description: 'Try logout and login again',
variant: 'destructive',
})
}
}
})

return connection
}

type IWebsocketConfig = ReturnType<typeof useSocketConnection>
const WebsocketConfigContext = createContext<IWebsocketConfig>(
{} as IWebsocketConfig,
)

export const LatitudeWebsocketsProvider = ({
children,
socketServer,
}: {
children: ReactNode
socketServer: string
}) => {
const connection = useSocketConnection({ socketServer })
const joinWorkspace = useJoinWorkspace({ connection })
useEffect(() => {
if (connection.connected) return

joinWorkspace()
}, [connection.connected, joinWorkspace])
return (
<WebsocketConfigContext.Provider value={connection}>
{children}
</WebsocketConfigContext.Provider>
)
}

export const useWebsocketConfig = () => {
const context = useContext(WebsocketConfigContext)

if (!context) {
throw new Error(
'useWebsocketConfig must be used within a WebsocketProvider',
)
}

return context
}
22 changes: 22 additions & 0 deletions apps/web/src/components/Providers/WebsocketsProvider/useSockets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { WebServerToClientEvents } from '@latitude-data/core/browser'
import { useSocketEvent } from 'socket.io-react-hook'

import { useWebsocketConfig } from './index'

type ServerEventType = keyof WebServerToClientEvents
export type EventArgs<T extends ServerEventType> = Parameters<
WebServerToClientEvents[T]
>[0]
export function useSockets<SEName extends ServerEventType>({
event,
onMessage,
}: {
event: SEName
onMessage: (args: EventArgs<SEName>) => void
}) {
const connection = useWebsocketConfig()
useSocketEvent<EventArgs<SEName>>(connection.socket, event, {
onMessage,
})
return connection.socket
}
Loading

0 comments on commit 6ece440

Please sign in to comment.