Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup websockets communication between web and workers #171

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/infra/src/deployments/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export const environment = pulumi
{ name: 'GATEWAY_SSL', value: 'true' },
{ name: 'LATITUDE_DOMAIN', value: 'latitude.so' },
{ name: 'LATITUDE_URL', value: 'https://app.latitude.so' },
{ name: 'WEBSOCKETS_SERVER', value: 'ws.latitude.so' },
{ name: 'FROM_MAILER_EMAIL', value: '[email protected]' },
{ name: 'MAILER_API_KEY', value: mailerApiKey },
{ name: 'SENTRY_DSN', value: sentryDsn },
Expand Down
3 changes: 2 additions & 1 deletion apps/web/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const nextConfig = {
'flydrive/drivers/fs',
'flydrive/drivers/types',
'@sentry/nextjs',
'bullmq'
'bullmq',
'jose',
],
experimental: {
// Dear developer,
Expand Down
2 changes: 2 additions & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"date-fns": "^3.6.0",
"drizzle-orm": "^0.33.0",
"ioredis": "^5.4.1",
"jose": "^5.8.0",
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
Expand All @@ -42,6 +43,7 @@
"nprogress": "^0.2.0",
"react": "19.0.0-rc-f994737d14-20240522",
"react-dom": "19.0.0-rc-f994737d14-20240522",
"socket.io-react-hook": "^2.4.5",
"swr": "^2.2.5",
"use-debounce": "^10.0.1",
"zod": "^3.23.8",
Expand Down
3 changes: 1 addition & 2 deletions apps/web/src/actions/invitations/accept.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ export const acceptInvitationAction = createServerAction()
if (!user) throw new NotFoundError('User not found')

await acceptInvitation({ membership, user })

setSession({
await setSession({
sessionData: {
user,
workspace: { id: Number(workspace.id), name: workspace.name },
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/actions/magicLinkTokens/confirm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const confirmMagicLinkTokenAction = createServerAction()
r.unwrap(),
)

setSession({
await setSession({
sessionData: {
user,
workspace,
Expand Down
17 changes: 17 additions & 0 deletions apps/web/src/actions/user/logoutAction.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
'use server'

import { type TokenType } from '@latitude-data/core/websockets/constants'
import { authProcedure } from '$/actions/procedures'
import { removeSession } from '$/services/auth/removeSession'
import { ROUTES } from '$/services/routes'
import { ReadonlyRequestCookies } from 'next/dist/server/web/spec-extension/adapters/request-cookies'
import { redirect } from 'next/navigation'

function removeSocketCookie({
name,
cookies,
}: {
name: TokenType
cookies: ReadonlyRequestCookies
}) {
cookies.delete(name)
}

export const logoutAction = authProcedure
.createServerAction()
.handler(async ({ ctx }) => {
removeSession({ session: ctx.session })
const { cookies: getCookies } = await import('next/headers')

const cookies = getCookies()
removeSocketCookie({ name: 'websocket', cookies })
removeSocketCookie({ name: 'websocketRefresh', cookies })

redirect(ROUTES.auth.login)
})
31 changes: 31 additions & 0 deletions apps/web/src/actions/user/refreshWebsocketTokenAction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use server'

import { verifyWebsocketToken } from '@latitude-data/core/websockets/utils'
import { setWebsocketSessionCookie } from '$/services/auth/setSession'

import { authProcedure } from '../procedures'

export const refreshWebesocketTokenAction = authProcedure
.createServerAction()
.handler(async ({ ctx: { user, workspace } }) => {
const { cookies } = await import('next/headers')
const refreshWebsocketCookie = cookies().get('websocketRefresh')
const refreshToken = refreshWebsocketCookie?.value
const result = await verifyWebsocketToken({
token: refreshToken,
type: 'websocket',
})

if (!result.error) return { success: true }

await setWebsocketSessionCookie({
name: 'websocket',
sessionData: { user, workspace },
})
await setWebsocketSessionCookie({
name: 'websocketRefresh',
sessionData: { user, workspace },
})

return { success: true }
})
2 changes: 1 addition & 1 deletion apps/web/src/actions/user/setupAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const setupAction = errorHandlingProcedure
const result = await setupService(input)
const sessionData = result.unwrap()

setSession({ sessionData })
await setSession({ sessionData })

redirect(ROUTES.root)
})
15 changes: 12 additions & 3 deletions apps/web/src/app/(private)/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { ReactNode } from 'react'

import { SessionProvider } from '@latitude-data/web-ui/browser'
import {
LatitudeWebsocketsProvider,
SocketIOProvider,
} from '$/components/Providers/WebsocketsProvider'
import env from '$/env'
import { getCurrentUser } from '$/services/auth/getCurrentUser'
import { getSession } from '$/services/auth/getSession'
import { ROUTES } from '$/services/routes'
Expand All @@ -17,8 +22,12 @@ export default async function PrivateLayout({
const { workspace, user } = await getCurrentUser()

return (
<SessionProvider currentUser={user} workspace={workspace}>
{children}
</SessionProvider>
<SocketIOProvider>
<SessionProvider currentUser={user} workspace={workspace}>
<LatitudeWebsocketsProvider socketServer={env.WEBSOCKETS_SERVER}>
{children}
</LatitudeWebsocketsProvider>
</SessionProvider>
</SocketIOProvider>
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'use client'

import { useCallback, useEffect, useRef, useState } from 'react'

import { EvaluationDto } from '@latitude-data/core/browser'
import { ProgressIndicator, useCurrentDocument } from '@latitude-data/web-ui'
import { type EventArgs } from '$/components/Providers/WebsocketsProvider/useSockets'

const DISAPERING_IN_MS = 1500
export function EvaluationStatusBanner({
evaluation,
}: {
evaluation: EvaluationDto
}) {
const timeoutRef = useRef<number | null>(null)
const [jobs, setJobs] = useState<EventArgs<'evaluationStatus'>[]>([])
const document = useCurrentDocument()

// FIXME: Remove this line when infra is ready
// @ts-ignore
const _onMessage = useCallback(
(args: EventArgs<'evaluationStatus'>) => {
if (evaluation.id !== args.evaluationId) return
if (document.documentUuid !== args.documentUuid) return

setJobs((prevJobs) => {
const jobIndex = prevJobs.findIndex(
(job) => job.batchId === args.batchId,
)

if (jobIndex === -1) {
return [...prevJobs, args]
} else {
const newJobs = [...prevJobs]
newJobs[jobIndex] = args

if (args.status && args.status === 'finished') {
setTimeout(() => {
setJobs((currentJobs) => {
return currentJobs.filter((job) => job.batchId !== args.batchId)
})
}, DISAPERING_IN_MS)
}

return newJobs
}
})
},
[evaluation.id, document.documentUuid],
)
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current)
}
}
}, [])

// TODO: Implement websockets infra
// useSockets({ event: 'evaluationStatus', onMessage })

return (
<>
{jobs.map((job) => (
<ProgressIndicator
key={job.batchId}
state={job.status === 'finished' ? 'completed' : 'running'}
>
{`Generating batch evaluation (ID: ${job.batchId}) ${job.completed}/${job.initialTotal}`}
</ProgressIndicator>
))}
</>
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { useProviderLog } from '$/stores/providerLogs'

import { EvaluationResultInfo } from './EvaluationResultInfo'
import { EvaluationResultsTable } from './EvaluationResultsTable'
import { EvaluationStatusBanner } from './EvaluationStatusBanner'

export function EvaluationResults({
evaluation,
Expand All @@ -25,6 +26,7 @@ export function EvaluationResults({
return (
<div className='flex flex-col gap-4'>
<Text.H4>Evaluation Results</Text.H4>
<EvaluationStatusBanner evaluation={evaluation} />
<div className='flex flex-row w-full h-full overflow-hidden gap-4'>
<div className='flex-grow min-w-0 h-full'>
<EvaluationResultsTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { useCallback } from 'react'

import { ConversationMetadata } from '@latitude-data/compiler'
import { DocumentVersion, EvaluationDto } from '@latitude-data/core/browser'
import { Button, CloseTrigger, Modal, useToast } from '@latitude-data/web-ui'
import { Button, CloseTrigger, Modal } from '@latitude-data/web-ui'
import { useNavigate } from '$/hooks/useNavigate'
import { ROUTES } from '$/services/routes'

Expand All @@ -25,7 +25,6 @@ export default function CreateBatchEvaluationModal({
evaluation: EvaluationDto
documentMetadata: ConversationMetadata
}) {
const { toast } = useToast()
const navigate = useNavigate()
const documentUuid = document.documentUuid
const goToDetail = useCallback(() => {
Expand All @@ -42,10 +41,6 @@ export default function CreateBatchEvaluationModal({
projectId,
commitUuid,
onSuccess: () => {
toast({
title: 'Success',
description: 'Batch evaluation is processing',
})
goToDetail()
},
})
Expand Down
110 changes: 110 additions & 0 deletions apps/web/src/components/Providers/WebsocketsProvider/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
'use client'

import {
createContext,
ReactNode,
useCallback,
useContext,
useEffect,
} from 'react'

import {
WebClientToServerEvents,
WebServerToClientEvents,
} from '@latitude-data/core/browser'
import { useSession, useToast } from '@latitude-data/web-ui'
import { refreshWebesocketTokenAction } from '$/actions/user/refreshWebsocketTokenAction'
import useCurrentWorkspace from '$/stores/currentWorkspace'
import { IoProvider, useSocket } from 'socket.io-react-hook'

export const SocketIOProvider = ({ children }: { children: ReactNode }) => {
return <IoProvider>{children}</IoProvider>
}

function useJoinWorkspace({ connection }: { connection: IWebsocketConfig }) {
const { currentUser } = useSession()
const { data: workspace } = useCurrentWorkspace()
return useCallback(() => {
connection.socket.emit('joinWorkspace', {
workspaceId: workspace.id,
userId: currentUser.id,
})
}, [workspace.id, connection.socket, connection.connected])
}

export function useSocketConnection({
socketServer,
}: {
socketServer: string
}) {
const { toast } = useToast()
const connection = useSocket<
WebServerToClientEvents,
WebClientToServerEvents
>(
`${socketServer}/web`, // namespace
{
// FIXME: Remove this line when infra is ready
autoConnect: false,
path: '/websocket', // Socket server endpoint
withCredentials: true, // Cookies cross-origin
transports: ['websocket'],
},
)

connection.socket.on('connect_error', async (error) => {
if (error.message.startsWith('AUTH_ERROR')) {
const [data] = await refreshWebesocketTokenAction()

if (data && data.success) {
connection.socket.connect()
} else {
toast({
title: 'We have a problem reconnecting to the server',
description: 'Try logout and login again',
variant: 'destructive',
})
}
}
})

return connection
}

type IWebsocketConfig = ReturnType<typeof useSocketConnection>
const WebsocketConfigContext = createContext<IWebsocketConfig>(
{} as IWebsocketConfig,
)

export const LatitudeWebsocketsProvider = ({
children,
socketServer,
}: {
children: ReactNode
socketServer: string
}) => {
const connection = useSocketConnection({ socketServer })
const joinWorkspace = useJoinWorkspace({ connection })
useEffect(() => {
if (connection.connected) return

joinWorkspace()
}, [connection.connected, joinWorkspace])
return (
<WebsocketConfigContext.Provider value={connection}>
{children}
</WebsocketConfigContext.Provider>
)
}

export const useWebsocketConfig = () => {
const context = useContext(WebsocketConfigContext)

if (!context) {
throw new Error(
'useWebsocketConfig must be used within a WebsocketProvider',
)
}

return context
}
Loading
Loading