Skip to content

Commit

Permalink
Setup websockets communication between web and workers
Browse files Browse the repository at this point in the history
We want to keep the client informed when evaluations are running. How
many logs are created and how many logs fail.
This PR setup a new Express server that setup a Socket.IO websocket
server with 2 endpoints /websocket and worker-websocket. The one for web
has secure cookies token auth and the one for workers share a secret
token between Sockets server and workers.

I think this setup makes sense
  • Loading branch information
andresgutgon committed Sep 16, 2024
1 parent e5353a6 commit 61e7179
Show file tree
Hide file tree
Showing 40 changed files with 2,340 additions and 1,050 deletions.
3 changes: 2 additions & 1 deletion apps/web/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const nextConfig = {
'flydrive/drivers/fs',
'flydrive/drivers/types',
'@sentry/nextjs',
'bullmq'
'bullmq',
'jose',
],
experimental: {
// Dear developer,
Expand Down
2 changes: 2 additions & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"date-fns": "^3.6.0",
"drizzle-orm": "^0.33.0",
"ioredis": "^5.4.1",
"jose": "^5.8.0",
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
Expand All @@ -42,6 +43,7 @@
"nprogress": "^0.2.0",
"react": "19.0.0-rc-f994737d14-20240522",
"react-dom": "19.0.0-rc-f994737d14-20240522",
"socket.io-react-hook": "^2.4.5",
"swr": "^2.2.5",
"use-debounce": "^10.0.1",
"zod": "^3.23.8",
Expand Down
3 changes: 1 addition & 2 deletions apps/web/src/actions/invitations/accept.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ export const acceptInvitationAction = createServerAction()
if (!user) throw new NotFoundError('User not found')

await acceptInvitation({ membership, user })

setSession({
await setSession({
sessionData: {
user,
workspace: { id: Number(workspace.id), name: workspace.name },
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/actions/magicLinkTokens/confirm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const confirmMagicLinkTokenAction = createServerAction()
r.unwrap(),
)

setSession({
await setSession({
sessionData: {
user,
workspace,
Expand Down
32 changes: 32 additions & 0 deletions apps/web/src/actions/user/refreshWebsocketTokenAction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use server'

import { verifyWebsocketToken } from '@latitude-data/core/websockets/utils'
import { setWebsocketSessionCookie } from '$/services/auth/setSession'

import { authProcedure } from '../procedures'

export const refreshWebesocketTokenAction = authProcedure
.createServerAction()
.handler(async ({ ctx: { session, user, workspace } }) => {
debugger
const { cookies } = await import('next/headers')
const refreshWebsocketCookie = cookies().get('websocketRefresh')
const refreshToken = refreshWebsocketCookie?.value
const result = await verifyWebsocketToken({
token: refreshToken,
type: 'websocket',
})

if (!result.error) return { success: true }

await setWebsocketSessionCookie({
name: 'websocket',
sessionData: { user, workspace },
})
await setWebsocketSessionCookie({
name: 'websocketRefresh',
sessionData: { user, workspace },
})

return { success: true }
})
2 changes: 1 addition & 1 deletion apps/web/src/actions/user/setupAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const setupAction = errorHandlingProcedure
const result = await setupService(input)
const sessionData = result.unwrap()

setSession({ sessionData })
await setSession({ sessionData })

redirect(ROUTES.root)
})
15 changes: 12 additions & 3 deletions apps/web/src/app/(private)/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { ReactNode } from 'react'

import { SessionProvider } from '@latitude-data/web-ui/browser'
import {
LatitudeWebsocketsProvider,
SocketIOProvider,
} from '$/components/Providers/WebsocketsProvider'
import env from '$/env'
import { getCurrentUser } from '$/services/auth/getCurrentUser'
import { getSession } from '$/services/auth/getSession'
import { ROUTES } from '$/services/routes'
Expand All @@ -17,8 +22,12 @@ export default async function PrivateLayout({
const { workspace, user } = await getCurrentUser()

return (
<SessionProvider currentUser={user} workspace={workspace}>
{children}
</SessionProvider>
<SocketIOProvider>
<SessionProvider currentUser={user} workspace={workspace}>
<LatitudeWebsocketsProvider socketServer={env.WEBSOCKETS_SERVER}>
{children}
</LatitudeWebsocketsProvider>
</SessionProvider>
</SocketIOProvider>
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use client'

import { useCallback, useEffect, useRef, useState } from 'react'

import { EvaluationDto } from '@latitude-data/core/browser'
import { ProgressIndicator, useCurrentDocument } from '@latitude-data/web-ui'
import {
useSockets,
type EventArgs,
} from '$/components/Providers/WebsocketsProvider/useSockets'

export function EvaluationStatusBanner({
evaluation,
}: {
evaluation: EvaluationDto
}) {
const timeoutRef = useRef<number | null>(null)
const [jobs, setJobs] = useState<EventArgs<'evaluationStatus'>[]>([])
const document = useCurrentDocument()
const onMessage = useCallback(
(args: EventArgs<'evaluationStatus'>) => {
if (evaluation.id !== args.evaluationId) return
if (document.documentUuid !== args.documentUuid) return

setJobs((prevJobs) => {
const jobIndex = prevJobs.findIndex(
(job) => job.batchId === args.batchId,
)

if (jobIndex === -1) {
return [...prevJobs, args]
} else {
const newJobs = [...prevJobs]
newJobs[jobIndex] = args

if (args.status && args.status === 'finished') {
setTimeout(() => {
setJobs((currentJobs) => {
return currentJobs.filter((job) => job.batchId !== args.batchId)
})
}, 500)
}

return newJobs
}
})
},
[evaluation.id, document.documentUuid],
)
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current)
}
}
}, [])

useSockets({ event: 'evaluationStatus', onMessage })

return (
<>
{jobs.map((job) => (
<ProgressIndicator
key={job.batchId}
state={job.status === 'finished' ? 'completed' : 'running'}
>
{`Generating batch evaluation (ID: ${job.batchId}) ${job.completed}/${job.initialTotal}`}
</ProgressIndicator>
))}
</>
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ReactNode } from 'react'

import { EvaluationsRepository } from '@latitude-data/core/repositories'
import { TableWithHeader, Text } from '@latitude-data/web-ui'
import { EvaluationStatusBanner } from '$/app/(private)/projects/[projectId]/versions/[commitUuid]/documents/[documentUuid]/evaluations/[evaluationId]/_components/EvaluationStatusBanner'
import BreadcrumpLink from '$/components/BreadcrumpLink'
import { Breadcrump } from '$/components/layouts/AppLayout/Header'
import { getCurrentUser } from '$/services/auth/getCurrentUser'
Expand Down Expand Up @@ -60,7 +61,7 @@ export default async function ConnectedEvaluationLayout({
documentUuid={params.documentUuid}
/>
}
table={<></>}
table={<EvaluationStatusBanner evaluation={evaluation} />}
/>
</div>
)
Expand Down
110 changes: 110 additions & 0 deletions apps/web/src/components/Providers/WebsocketsProvider/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
'use client'

import {
createContext,
ReactNode,
useCallback,
useContext,
useEffect,
} from 'react'

import {
WebClientToServerEvents,
WebServerToClientEvents,
} from '@latitude-data/core/browser'
import { useSession, useToast } from '@latitude-data/web-ui'
import { refreshWebesocketTokenAction } from '$/actions/user/refreshWebsocketTokenAction'
import useCurrentWorkspace from '$/stores/currentWorkspace'
import { IoProvider, useSocket } from 'socket.io-react-hook'

export const SocketIOProvider = ({ children }: { children: ReactNode }) => {
return <IoProvider>{children}</IoProvider>
}

function useJoinWorkspace({ connection }: { connection: IWebsocketConfig }) {
const { currentUser } = useSession()
const { data: workspace } = useCurrentWorkspace()
return useCallback(() => {
connection.socket.emit('joinWorkspace', {
workspaceId: workspace.id,
userId: currentUser.id,
})
}, [workspace.id, connection.socket, connection.connected])
}

export function useSocketConnection({
socketServer,
}: {
socketServer: string
}) {
const { toast } = useToast()
const connection = useSocket<
WebServerToClientEvents,
WebClientToServerEvents
>(
`${socketServer}/web`, // namespace
{
path: '/websocket', // Socket server endpoint
withCredentials: true, // Cookies cross-origin
transports: ['websocket'],
},
)

connection.socket.on('connect_error', async (error) => {
console.error('Connection error:', error.message)

if (error.message.startsWith('AUTH_ERROR')) {
const [data] = await refreshWebesocketTokenAction()

if (data && data.success) {
connection.socket.connect()
} else {
toast({
title: 'We have a problem reconnecting to the server',
description: 'Try logout and login again',
variant: 'destructive',
})
}
}
})

return connection
}

type IWebsocketConfig = ReturnType<typeof useSocketConnection>
const WebsocketConfigContext = createContext<IWebsocketConfig>(
{} as IWebsocketConfig,
)

export const LatitudeWebsocketsProvider = ({
children,
socketServer,
}: {
children: ReactNode
socketServer: string
}) => {
const connection = useSocketConnection({ socketServer })
const joinWorkspace = useJoinWorkspace({ connection })
useEffect(() => {
if (connection.connected) return

joinWorkspace()
}, [connection.connected, joinWorkspace])
return (
<WebsocketConfigContext.Provider value={connection}>
{children}
</WebsocketConfigContext.Provider>
)
}

export const useWebsocketConfig = () => {
const context = useContext(WebsocketConfigContext)

if (!context) {
throw new Error(
'useWebsocketConfig must be used within a WebsocketProvider',
)
}

return context
}
22 changes: 22 additions & 0 deletions apps/web/src/components/Providers/WebsocketsProvider/useSockets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { WebServerToClientEvents } from '@latitude-data/core/browser'
import { useSocketEvent } from 'socket.io-react-hook'

import { useWebsocketConfig } from './index'

type ServerEventType = keyof WebServerToClientEvents
export type EventArgs<T extends ServerEventType> = Parameters<
WebServerToClientEvents[T]
>[0]
export function useSockets<SEName extends ServerEventType>({
event,
onMessage,
}: {
event: SEName
onMessage: (args: EventArgs<SEName>) => void
}) {
const connection = useWebsocketConfig()
useSocketEvent<EventArgs<SEName>>(connection.socket, event, {
onMessage,
})
return connection.socket
}
2 changes: 2 additions & 0 deletions apps/web/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export default createEnv({
DATABASE_URL: z.string(),
GATEWAY_HOSTNAME: z.string(),
GATEWAY_PORT: z.coerce.number().optional(),
WEBSOCKETS_SERVER: z.string(),
GATEWAY_SSL: z
.enum(['true', 'false'])
.transform((value) => value === 'true')
Expand All @@ -23,5 +24,6 @@ export default createEnv({
GATEWAY_HOSTNAME: process.env.GATEWAY_HOSTNAME,
GATEWAY_PORT: process.env.GATEWAY_PORT,
GATEWAY_SSL: process.env.GATEWAY_SSL,
WEBSOCKETS_SERVER: process.env.WEBSOCKETS_SERVER,
},
})
1 change: 0 additions & 1 deletion apps/web/src/helpers/fonts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,3 @@ export const fontMono = localFont({
],
variable: '--font-mono',
})

Loading

0 comments on commit 61e7179

Please sign in to comment.