Skip to content

Commit

Permalink
Setup websockets communication between web and workers (#171)
Browse files Browse the repository at this point in the history
* Setup websockets communication between web and workers
We want to keep the client informed when evaluations are running. How
many logs are created and how many logs fail.
This PR setup a new Express server that setup a Socket.IO websocket
server with 2 endpoints /websocket and worker-websocket. The one for web
has secure cookies token auth and the one for workers share a secret
token between Sockets server and workers.

I think this setup makes sense

* Use existing env variables for latitude domain and subdomain and disable
websockets until infra is ready
  • Loading branch information
andresgutgon authored Sep 16, 2024
1 parent 2c8576e commit 1295d48
Show file tree
Hide file tree
Showing 44 changed files with 2,414 additions and 1,065 deletions.
1 change: 1 addition & 0 deletions apps/infra/src/deployments/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export const environment = pulumi
{ name: 'GATEWAY_SSL', value: 'true' },
{ name: 'LATITUDE_DOMAIN', value: 'latitude.so' },
{ name: 'LATITUDE_URL', value: 'https://app.latitude.so' },
{ name: 'WEBSOCKETS_SERVER', value: 'ws.latitude.so' },
{ name: 'FROM_MAILER_EMAIL', value: '[email protected]' },
{ name: 'MAILER_API_KEY', value: mailerApiKey },
{ name: 'SENTRY_DSN', value: sentryDsn },
Expand Down
3 changes: 2 additions & 1 deletion apps/web/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const nextConfig = {
'flydrive/drivers/fs',
'flydrive/drivers/types',
'@sentry/nextjs',
'bullmq'
'bullmq',
'jose',
],
experimental: {
// Dear developer,
Expand Down
2 changes: 2 additions & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"date-fns": "^3.6.0",
"drizzle-orm": "^0.33.0",
"ioredis": "^5.4.1",
"jose": "^5.8.0",
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
Expand All @@ -42,6 +43,7 @@
"nprogress": "^0.2.0",
"react": "19.0.0-rc-f994737d14-20240522",
"react-dom": "19.0.0-rc-f994737d14-20240522",
"socket.io-react-hook": "^2.4.5",
"swr": "^2.2.5",
"use-debounce": "^10.0.1",
"zod": "^3.23.8",
Expand Down
3 changes: 1 addition & 2 deletions apps/web/src/actions/invitations/accept.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ export const acceptInvitationAction = createServerAction()
if (!user) throw new NotFoundError('User not found')

await acceptInvitation({ membership, user })

setSession({
await setSession({
sessionData: {
user,
workspace: { id: Number(workspace.id), name: workspace.name },
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/actions/magicLinkTokens/confirm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const confirmMagicLinkTokenAction = createServerAction()
r.unwrap(),
)

setSession({
await setSession({
sessionData: {
user,
workspace,
Expand Down
17 changes: 17 additions & 0 deletions apps/web/src/actions/user/logoutAction.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
'use server'

import { type TokenType } from '@latitude-data/core/websockets/constants'
import { authProcedure } from '$/actions/procedures'
import { removeSession } from '$/services/auth/removeSession'
import { ROUTES } from '$/services/routes'
import { ReadonlyRequestCookies } from 'next/dist/server/web/spec-extension/adapters/request-cookies'
import { redirect } from 'next/navigation'

function removeSocketCookie({
name,
cookies,
}: {
name: TokenType
cookies: ReadonlyRequestCookies
}) {
cookies.delete(name)
}

export const logoutAction = authProcedure
.createServerAction()
.handler(async ({ ctx }) => {
removeSession({ session: ctx.session })
const { cookies: getCookies } = await import('next/headers')

const cookies = getCookies()
removeSocketCookie({ name: 'websocket', cookies })
removeSocketCookie({ name: 'websocketRefresh', cookies })

redirect(ROUTES.auth.login)
})
31 changes: 31 additions & 0 deletions apps/web/src/actions/user/refreshWebsocketTokenAction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use server'

import { verifyWebsocketToken } from '@latitude-data/core/websockets/utils'
import { setWebsocketSessionCookie } from '$/services/auth/setSession'

import { authProcedure } from '../procedures'

export const refreshWebesocketTokenAction = authProcedure
.createServerAction()
.handler(async ({ ctx: { user, workspace } }) => {
const { cookies } = await import('next/headers')
const refreshWebsocketCookie = cookies().get('websocketRefresh')
const refreshToken = refreshWebsocketCookie?.value
const result = await verifyWebsocketToken({
token: refreshToken,
type: 'websocket',
})

if (!result.error) return { success: true }

await setWebsocketSessionCookie({
name: 'websocket',
sessionData: { user, workspace },
})
await setWebsocketSessionCookie({
name: 'websocketRefresh',
sessionData: { user, workspace },
})

return { success: true }
})
2 changes: 1 addition & 1 deletion apps/web/src/actions/user/setupAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const setupAction = errorHandlingProcedure
const result = await setupService(input)
const sessionData = result.unwrap()

setSession({ sessionData })
await setSession({ sessionData })

redirect(ROUTES.root)
})
15 changes: 12 additions & 3 deletions apps/web/src/app/(private)/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { ReactNode } from 'react'

import { SessionProvider } from '@latitude-data/web-ui/browser'
import {
LatitudeWebsocketsProvider,
SocketIOProvider,
} from '$/components/Providers/WebsocketsProvider'
import env from '$/env'
import { getCurrentUser } from '$/services/auth/getCurrentUser'
import { getSession } from '$/services/auth/getSession'
import { ROUTES } from '$/services/routes'
Expand All @@ -17,8 +22,12 @@ export default async function PrivateLayout({
const { workspace, user } = await getCurrentUser()

return (
<SessionProvider currentUser={user} workspace={workspace}>
{children}
</SessionProvider>
<SocketIOProvider>
<SessionProvider currentUser={user} workspace={workspace}>
<LatitudeWebsocketsProvider socketServer={env.WEBSOCKETS_SERVER}>
{children}
</LatitudeWebsocketsProvider>
</SessionProvider>
</SocketIOProvider>
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'use client'

import { useCallback, useEffect, useRef, useState } from 'react'

import { EvaluationDto } from '@latitude-data/core/browser'
import { ProgressIndicator, useCurrentDocument } from '@latitude-data/web-ui'
import { type EventArgs } from '$/components/Providers/WebsocketsProvider/useSockets'

const DISAPERING_IN_MS = 1500
export function EvaluationStatusBanner({
evaluation,
}: {
evaluation: EvaluationDto
}) {
const timeoutRef = useRef<number | null>(null)
const [jobs, setJobs] = useState<EventArgs<'evaluationStatus'>[]>([])
const document = useCurrentDocument()

// FIXME: Remove this line when infra is ready
// @ts-ignore
const _onMessage = useCallback(
(args: EventArgs<'evaluationStatus'>) => {
if (evaluation.id !== args.evaluationId) return
if (document.documentUuid !== args.documentUuid) return

setJobs((prevJobs) => {
const jobIndex = prevJobs.findIndex(
(job) => job.batchId === args.batchId,
)

if (jobIndex === -1) {
return [...prevJobs, args]
} else {
const newJobs = [...prevJobs]
newJobs[jobIndex] = args

if (args.status && args.status === 'finished') {
setTimeout(() => {
setJobs((currentJobs) => {
return currentJobs.filter((job) => job.batchId !== args.batchId)
})
}, DISAPERING_IN_MS)
}

return newJobs
}
})
},
[evaluation.id, document.documentUuid],
)
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current)
}
}
}, [])

// TODO: Implement websockets infra
// useSockets({ event: 'evaluationStatus', onMessage })

return (
<>
{jobs.map((job) => (
<ProgressIndicator
key={job.batchId}
state={job.status === 'finished' ? 'completed' : 'running'}
>
{`Generating batch evaluation (ID: ${job.batchId}) ${job.completed}/${job.initialTotal}`}
</ProgressIndicator>
))}
</>
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { useProviderLog } from '$/stores/providerLogs'

import { EvaluationResultInfo } from './EvaluationResultInfo'
import { EvaluationResultsTable } from './EvaluationResultsTable'
import { EvaluationStatusBanner } from './EvaluationStatusBanner'

export function EvaluationResults({
evaluation,
Expand All @@ -25,6 +26,7 @@ export function EvaluationResults({
return (
<div className='flex flex-col gap-4'>
<Text.H4>Evaluation Results</Text.H4>
<EvaluationStatusBanner evaluation={evaluation} />
<div className='flex flex-row w-full h-full overflow-hidden gap-4'>
<div className='flex-grow min-w-0 h-full'>
<EvaluationResultsTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { useCallback } from 'react'

import { ConversationMetadata } from '@latitude-data/compiler'
import { DocumentVersion, EvaluationDto } from '@latitude-data/core/browser'
import { Button, CloseTrigger, Modal, useToast } from '@latitude-data/web-ui'
import { Button, CloseTrigger, Modal } from '@latitude-data/web-ui'
import { useNavigate } from '$/hooks/useNavigate'
import { ROUTES } from '$/services/routes'

Expand All @@ -25,7 +25,6 @@ export default function CreateBatchEvaluationModal({
evaluation: EvaluationDto
documentMetadata: ConversationMetadata
}) {
const { toast } = useToast()
const navigate = useNavigate()
const documentUuid = document.documentUuid
const goToDetail = useCallback(() => {
Expand All @@ -42,10 +41,6 @@ export default function CreateBatchEvaluationModal({
projectId,
commitUuid,
onSuccess: () => {
toast({
title: 'Success',
description: 'Batch evaluation is processing',
})
goToDetail()
},
})
Expand Down
110 changes: 110 additions & 0 deletions apps/web/src/components/Providers/WebsocketsProvider/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
'use client'

import {
createContext,
ReactNode,
useCallback,
useContext,
useEffect,
} from 'react'

import {
WebClientToServerEvents,
WebServerToClientEvents,
} from '@latitude-data/core/browser'
import { useSession, useToast } from '@latitude-data/web-ui'
import { refreshWebesocketTokenAction } from '$/actions/user/refreshWebsocketTokenAction'
import useCurrentWorkspace from '$/stores/currentWorkspace'
import { IoProvider, useSocket } from 'socket.io-react-hook'

export const SocketIOProvider = ({ children }: { children: ReactNode }) => {
return <IoProvider>{children}</IoProvider>
}

function useJoinWorkspace({ connection }: { connection: IWebsocketConfig }) {
const { currentUser } = useSession()
const { data: workspace } = useCurrentWorkspace()
return useCallback(() => {
connection.socket.emit('joinWorkspace', {
workspaceId: workspace.id,
userId: currentUser.id,
})
}, [workspace.id, connection.socket, connection.connected])
}

export function useSocketConnection({
socketServer,
}: {
socketServer: string
}) {
const { toast } = useToast()
const connection = useSocket<
WebServerToClientEvents,
WebClientToServerEvents
>(
`${socketServer}/web`, // namespace
{
// FIXME: Remove this line when infra is ready
autoConnect: false,
path: '/websocket', // Socket server endpoint
withCredentials: true, // Cookies cross-origin
transports: ['websocket'],
},
)

connection.socket.on('connect_error', async (error) => {
if (error.message.startsWith('AUTH_ERROR')) {
const [data] = await refreshWebesocketTokenAction()

if (data && data.success) {
connection.socket.connect()
} else {
toast({
title: 'We have a problem reconnecting to the server',
description: 'Try logout and login again',
variant: 'destructive',
})
}
}
})

return connection
}

type IWebsocketConfig = ReturnType<typeof useSocketConnection>
const WebsocketConfigContext = createContext<IWebsocketConfig>(
{} as IWebsocketConfig,
)

export const LatitudeWebsocketsProvider = ({
children,
socketServer,
}: {
children: ReactNode
socketServer: string
}) => {
const connection = useSocketConnection({ socketServer })
const joinWorkspace = useJoinWorkspace({ connection })
useEffect(() => {
if (connection.connected) return

joinWorkspace()
}, [connection.connected, joinWorkspace])
return (
<WebsocketConfigContext.Provider value={connection}>
{children}
</WebsocketConfigContext.Provider>
)
}

export const useWebsocketConfig = () => {
const context = useContext(WebsocketConfigContext)

if (!context) {
throw new Error(
'useWebsocketConfig must be used within a WebsocketProvider',
)
}

return context
}
Loading

0 comments on commit 1295d48

Please sign in to comment.