Skip to content

Commit

Permalink
Document logs
Browse files Browse the repository at this point in the history
  • Loading branch information
csansoon committed Aug 8, 2024
1 parent f7cdb1e commit 61615d4
Show file tree
Hide file tree
Showing 41 changed files with 1,625 additions and 97 deletions.
1 change: 1 addition & 0 deletions apps/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"@hono/zod-validator": "^0.2.2",
"@latitude-data/core": "workspace:^",
"@latitude-data/env": "workspace:^",
"@latitude-data/jobs": "workspace:^",
"@t3-oss/env-core": "^0.10.1",
"hono": "^4.5.3",
"zod": "^3.23.8"
Expand Down
19 changes: 10 additions & 9 deletions apps/gateway/src/common/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@ import '@latitude-data/env'
import { createEnv } from '@t3-oss/env-core'
import { z } from 'zod'

let env
let localEnv = {}
if (process.env.NODE_ENV === 'development') {
env = await import('./env/development').then((r) => r.default)
localEnv = await import('./env/development').then((r) => r.default)
} else if (process.env.NODE_ENV === 'test') {
env = await import('./env/test').then((r) => r.default)
} else {
env = process.env as {
GATEWAY_PORT: string
GATEWAY_HOST: string
}
localEnv = await import('./env/test').then((r) => r.default)
}

export default createEnv({
Expand All @@ -21,6 +16,12 @@ export default createEnv({
server: {
GATEWAY_PORT: z.string().optional().default('8787'),
GATEWAY_HOST: z.string().optional().default('localhost'),
REDIS_HOST: z.string(),
REDIS_PORT: z.string(),
REDIS_PASSWORD: z.string().optional(),
},
runtimeEnv: {
...process.env,
...localEnv,
},
runtimeEnv: env,
})
10 changes: 10 additions & 0 deletions apps/gateway/src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { setupJobs } from '@latitude-data/jobs'
import env from '$/common/env'

export const { queues } = setupJobs({
connectionParams: {
host: env.REDIS_HOST,
port: Number(env.REDIS_PORT),
password: env.REDIS_PASSWORD,
},
})
1 change: 1 addition & 0 deletions apps/gateway/src/middlewares/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const authMiddleware = () =>
if (workspaceResult.error) return false

c.set('workspace', workspaceResult.value)
c.set('apiKey', apiKeyResult.value)

return true
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ import {
} from '@latitude-data/core'
import type { Workspace } from '@latitude-data/core/browser'

const toDocumentPath = (path: string) => {
if (path.startsWith('/')) {
return path
}

return `/${path}`
}

export const getData = async ({
workspace,
projectId,
Expand All @@ -35,10 +27,7 @@ export const getData = async ({
.getCommitByUuid({ project, uuid: commitUuid })
.then((r) => r.unwrap())
const document = await docsScope
.getDocumentByPath({
commit,
path: toDocumentPath(documentPath),
})
.getDocumentByPath({ commit, path: documentPath })
.then((r) => r.unwrap())

return { project, commit, document }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import {
} from '@latitude-data/core'
import app from '$/index'
import { eq } from 'drizzle-orm'
import { describe, expect, it } from 'vitest'
import { describe, expect, it, vi } from 'vitest'

vi.mock('$/jobs', () => ({
queues: { jobs: { enqueueUpdateApiKeyProviderJob: vi.fn() } },
}))

describe('GET documents', () => {
describe('unauthorized', () => {
Expand All @@ -26,7 +30,7 @@ describe('GET documents', () => {
const apikey = await database.query.apiKeys.findFirst({
where: eq(apiKeys.workspaceId, workspace.id),
})
const path = '/path/to/document'
const path = 'path/to/document'
const { commit } = await factories.createDraft({
project,
user,
Expand All @@ -42,7 +46,7 @@ describe('GET documents', () => {
.getDocumentByPath({ commit, path })
.then((r) => r.unwrap())

const route = `/api/v1/projects/${project!.id}/commits/${commit!.uuid}/documents/${document.documentVersion.path.slice(1)}`
const route = `/api/v1/projects/${project!.id}/commits/${commit!.uuid}/documents/${document.documentVersion.path}`
const res = await app.request(route, {
headers: {
Authorization: `Bearer ${apikey!.token}`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { describe, expect, it, vi } from 'vitest'

const mocks = vi.hoisted(() => ({
runDocumentAtCommit: vi.fn(),
queues: { jobs: { enqueueUpdateApiKeyProviderJob: vi.fn() } },
}))

vi.mock('@latitude-data/core', async (importOriginal) => {
Expand All @@ -23,6 +24,10 @@ vi.mock('@latitude-data/core', async (importOriginal) => {
}
})

vi.mock('$/jobs', () => ({
queues: mocks.queues,
}))

describe('POST /run', () => {
describe('unauthorized', () => {
it('fails', async () => {
Expand Down Expand Up @@ -112,6 +117,7 @@ describe('POST /run', () => {
},
})

expect(mocks.queues)
expect(res.status).toBe(200)
expect(res.body).toBeInstanceOf(ReadableStream)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { zValidator } from '@hono/zod-validator'
import { runDocumentAtCommit } from '@latitude-data/core'
import {
LogSources,
runDocumentAtCommit,
streamToGenerator,
} from '@latitude-data/core'
import { queues } from '$/jobs'
import { Factory } from 'hono/factory'
import { SSEStreamingApi, streamSSE } from 'hono/streaming'
import { z } from 'zod'
Expand All @@ -21,6 +26,7 @@ export const runHandler = factory.createHandlers(
const { documentPath, parameters } = c.req.valid('json')

const workspace = c.get('workspace')
const apiKey = c.get('apiKey')

const { document, commit } = await getData({
workspace,
Expand All @@ -33,6 +39,13 @@ export const runHandler = factory.createHandlers(
documentUuid: document.documentUuid,
commit,
parameters,
logHandler: (log) => {
queues.defaultQueue.jobs.enqueueCreateProviderLogJob({
...log,
source: LogSources.API,
apiKeyId: apiKey.id,
})
},
}).then((r) => r.unwrap())

await pipeToStream(stream, result.stream)
Expand All @@ -45,12 +58,7 @@ async function pipeToStream(
readableStream: ReadableStream,
) {
let id = 0
const reader = readableStream.getReader()

while (true) {
const { done, value } = await reader.read()
if (done) break

for await (const value of streamToGenerator(readableStream)) {
stream.write(
JSON.stringify({
...value,
Expand Down
1 change: 1 addition & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"ai": "^3.2.42",
"bcrypt": "^5.1.1",
"bullmq": "^5.8.5",
"date-fns": "^3.6.0",
"ioredis": "^5.4.1",
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
Expand Down
27 changes: 18 additions & 9 deletions apps/web/src/actions/documents/streamTextAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
streamToGenerator,
validateConfig,
} from '@latitude-data/core'
import { PROVIDER_EVENT } from '@latitude-data/core/browser'
import { LogSources, PROVIDER_EVENT } from '@latitude-data/core/browser'
import { queues } from '$/jobs'
import { getCurrentUser } from '$/services/auth/getCurrentUser'
import { createStreamableValue, StreamableValue } from 'ai/rsc'

Expand All @@ -26,7 +27,7 @@ export async function streamTextAction({
messages,
}: StreamTextActionProps): StreamTextActionResponse {
const { workspace } = await getCurrentUser()
const { provider, model, ...rest } = validateConfig(config)
const { provider, ...rest } = validateConfig(config)
const providerApiKeysScope = new ProviderApiKeysRepository(workspace.id)
const apiKey = await providerApiKeysScope
.findByName(provider)
Expand All @@ -35,13 +36,21 @@ export async function streamTextAction({

;(async () => {
try {
const result = await ai({
apiKey: apiKey.token,
provider: apiKey.provider,
model,
messages,
config: rest,
})
const result = await ai(
{
provider: apiKey,
messages,
config: rest,
},
{
logHandler: (log) => {
queues.defaultQueue.jobs.enqueueCreateProviderLogJob({
...log,
source: LogSources.Playground,
})
},
},
)

for await (const value of streamToGenerator(result.fullStream)) {
stream.update({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,22 @@ import {
} from '@latitude-data/web-ui'
import useProviderApiKeys from '$/stores/providerApiKeys'
import useUsers from '$/stores/users'
import { format, formatDistanceToNow, formatRelative } from 'date-fns'

import NewApiKey from './New'

const HOURS = 1000 * 60 * 60
const DAYS = HOURS * 24
function relativeTime(date: Date | null) {
if (date == null) return 'never'

const now = new Date()
const diff = now.getTime() - date.getTime()
if (diff < 1 * HOURS) return formatDistanceToNow(date, { addSuffix: true })
if (diff < 7 * DAYS) return formatRelative(date, new Date())
return format(date, 'PPpp')
}

export default function ProviderApiKeys() {
const { data: providerApiKeys, destroy } = useProviderApiKeys()
const [open, setOpen] = useState(false)
Expand Down Expand Up @@ -97,7 +110,7 @@ const ProviderApiKeysTable = ({
</TableCell>
<TableCell>
<Text.H4 color='foregroundMuted'>
{apiKey.lastUsedAt?.toISOString() || 'never'}
{relativeTime(apiKey.lastUsedAt)}
</Text.H4>
</TableCell>
<TableCell>
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { setupJobs } from '@latitude-data/jobs'
import env from '$/env'

export default setupJobs({
export const { queues } = setupJobs({
connectionParams: {
host: env.REDIS_HOST,
port: Number(env.REDIS_PORT),
Expand Down
63 changes: 63 additions & 0 deletions packages/core/drizzle/0026_latitude_rocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
DO $$ BEGIN
CREATE TYPE "latitude"."log_source" AS ENUM('playground', 'api');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS "latitude"."document_logs" (
"id" bigserial PRIMARY KEY NOT NULL,
"uuid" uuid NOT NULL,
"document_uuid" uuid NOT NULL,
"commit_id" bigint NOT NULL,
"resolved_content" text NOT NULL,
"parameters" json NOT NULL,
"custom_identifier" text,
"duration" bigint NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "document_logs_uuid_unique" UNIQUE("uuid")
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS "latitude"."provider_logs" (
"id" bigserial PRIMARY KEY NOT NULL,
"uuid" uuid NOT NULL,
"provider_id" bigint NOT NULL,
"model" varchar,
"config" json NOT NULL,
"messages" json NOT NULL,
"response_text" text,
"tool_calls" json,
"tokens" bigint NOT NULL,
"duration" bigint NOT NULL,
"document_log_id" bigint,
"source" "latitude"."log_source" NOT NULL,
"apiKeyId" bigint,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "provider_logs_uuid_unique" UNIQUE("uuid")
);
--> statement-breakpoint
ALTER TABLE "latitude"."api_keys" ADD COLUMN "last_used_at" timestamp;--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "latitude"."document_logs" ADD CONSTRAINT "document_logs_commit_id_commits_id_fk" FOREIGN KEY ("commit_id") REFERENCES "latitude"."commits"("id") ON DELETE restrict ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "latitude"."provider_logs" ADD CONSTRAINT "provider_logs_provider_id_provider_api_keys_id_fk" FOREIGN KEY ("provider_id") REFERENCES "latitude"."provider_api_keys"("id") ON DELETE restrict ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "latitude"."provider_logs" ADD CONSTRAINT "provider_logs_document_log_id_document_logs_id_fk" FOREIGN KEY ("document_log_id") REFERENCES "latitude"."document_logs"("id") ON DELETE restrict ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "latitude"."provider_logs" ADD CONSTRAINT "provider_logs_apiKeyId_api_keys_id_fk" FOREIGN KEY ("apiKeyId") REFERENCES "latitude"."api_keys"("id") ON DELETE restrict ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
Loading

0 comments on commit 61615d4

Please sign in to comment.