Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qstash publish receive #7

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions src/app/api/batches/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Prisma } from '@prisma/client'
import { prisma } from '@/lib/prisma'
import { randomUUID } from 'crypto'
import { NextResponse } from 'next/server'
import { z } from 'zod'
import { publishMessage } from '@/lib/qstash'

const createBatchSchema = z.object({
files: z
Expand All @@ -24,24 +26,42 @@ export async function POST(request: Request) {
const batchId = randomUUID()

try {
await prisma.uploadBatch.create({
data: {
id: batchId,
videos: {
createMany: {
data: files.map((file, index) => {
return {
uploadOrder: index + 1,
title: file.title,
storageKey: `inputs/${file.id}.mp4`,
audioStorageKey: `inputs/${file.id}.mp3`,
sizeInBytes: file.sizeInBytes,
duration: file.duration,
}
}),
await prisma.$transaction(async (tx) => {
const videos: Prisma.VideoCreateManyUploadBatchInput[] = files.map(
(file, index) => {
return {
id: randomUUID(),
uploadOrder: index + 1,
title: file.title,
storageKey: `inputs/${file.id}.mp4`,
audioStorageKey: `inputs/${file.id}.mp3`,
sizeInBytes: file.sizeInBytes,
duration: file.duration,
}
},
)

await tx.uploadBatch.create({
data: {
id: batchId,
videos: {
createMany: {
data: videos,
},
},
},
},
})

await Promise.all(
videos.map(async (video) => {
await publishMessage({
topic: 'jupiter.upload-created',
body: {
videoId: video.id,
},
})
}),
)
})

return NextResponse.json({ batchId })
Expand Down
16 changes: 12 additions & 4 deletions src/app/api/webhooks/create-video-transcription/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import FormData from 'form-data'
import axios from 'axios'
import { z } from 'zod'
import { env } from '@/env'
import { validateQStashSignature } from '@/lib/qstash'

const createTranscriptionBodySchema = z.object({
videoId: z.string().uuid(),
Expand All @@ -20,9 +21,11 @@ interface OpenAITranscriptionResponse {
}

export async function POST(request: Request) {
const { videoId } = createTranscriptionBodySchema.parse(await request.json())

try {
const { bodyAsJSON } = await validateQStashSignature({ request })

const { videoId } = createTranscriptionBodySchema.parse(bodyAsJSON)

const video = await prisma.video.findUniqueOrThrow({
where: {
id: videoId,
Expand Down Expand Up @@ -107,7 +110,12 @@ export async function POST(request: Request) {
})

return new Response()
} catch (err) {
console.log(err)
} catch (err: any) {
console.error(err)

return NextResponse.json(
{ message: 'Error processing video.', error: err?.message || '' },
{ status: 401 },
)
}
}
66 changes: 20 additions & 46 deletions src/app/api/webhooks/process-video/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,29 @@ import { r2 } from '@/lib/cloudflare-r2'
import { prisma } from '@/lib/prisma'
import { CopyObjectCommand } from '@aws-sdk/client-s3'
import { NextResponse } from 'next/server'
import { Receiver } from '@upstash/qstash/nodejs'
import { z } from 'zod'
import { publishMessage, validateQStashSignature } from '@/lib/qstash'

const processVideoBodySchema = z.object({
videoId: z.string().uuid(),
})

export async function POST(request: Request) {
if (env.NODE_ENV === 'production') {
const signature = request.headers.get('upstash-signature')

const receiver = new Receiver({
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY,
})

if (!signature) {
return NextResponse.json(
{ message: 'QStash signature not found.' },
{ status: 401 },
)
}

const isValid = await receiver
.verify({
signature,
body: await request.text(),
})
.catch((err) => {
console.error(err)
return false
})

if (!isValid) {
return NextResponse.json(
{ message: 'QStash signature is invalid.' },
{ status: 401 },
)
}
}
try {
const { bodyAsJSON } = await validateQStashSignature({ request })

const { videoId } = processVideoBodySchema.parse(await request.json())
const { videoId } = processVideoBodySchema.parse(bodyAsJSON)

try {
const video = await prisma.video.findUniqueOrThrow({
where: {
id: videoId,
},
})

if (video.processedAt) {
return NextResponse.json(
{ message: 'Video has already been processed.' },
{
status: 409,
},
)
return NextResponse.json({
message: 'Video has already been processed.',
})
}

const bucket = env.CLOUDFLARE_BUCKET_NAME
Expand Down Expand Up @@ -97,12 +63,20 @@ export async function POST(request: Request) {
},
})

/**
* TODO: Add transcription and external provider upload to QStash
*/
await publishMessage({
topic: 'jupiter.upload-processed',
body: {
videoId,
},
})

return new Response()
} catch (err) {
console.log(err)
} catch (err: any) {
console.error(err)

return NextResponse.json(
{ message: 'Error processing video.', error: err?.message || '' },
{ status: 401 },
)
}
}
9 changes: 3 additions & 6 deletions src/app/api/webhooks/update-external-provider-status/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { z } from 'zod'
const pandaWebhookBodySchema = z.object({
action: z.enum(['video.changeStatus']),
video_id: z.string().uuid(),
folder_id: z.string().uuid().optional(),
status: z.enum(['DRAFT', 'CONVERTING', 'CONVERTED', 'FAILED']),
video_external_id: z.string().uuid(),
})
Expand All @@ -20,13 +19,11 @@ export async function POST(request: Request) {
},
})

if (video.externalProviderId) {
if (!video || video.externalProviderId) {
/**
* TODO: It would be cool to store the external provider status instead
* of just storing the external ID.
*
* Here we return a success response as the webhook can be called with
* videos that were not stored on jupiter.
* videos that were not stored on jupiter or the video could already had
* been updated.
*/
return new Response()
}
Expand Down
30 changes: 12 additions & 18 deletions src/app/api/webhooks/upload-to-external-provider/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ import { r2 } from '@/lib/cloudflare-r2'
import { prisma } from '@/lib/prisma'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { NextResponse } from 'next/server'
import FormData from 'form-data'
import axios from 'axios'
import { z } from 'zod'
import { env } from '@/env'
import { validateQStashSignature } from '@/lib/qstash'

const createTranscriptionBodySchema = z.object({
videoId: z.string().uuid(),
})

export async function POST(request: Request) {
const { videoId } = createTranscriptionBodySchema.parse(await request.json())

try {
const { bodyAsJSON } = await validateQStashSignature({ request })

const { videoId } = createTranscriptionBodySchema.parse(bodyAsJSON)

const video = await prisma.video.findUniqueOrThrow({
where: {
id: videoId,
Expand Down Expand Up @@ -50,19 +52,6 @@ export async function POST(request: Request) {
return
}

// const formData = new FormData()

// formData.append('file', videoFile.Body, {
// contentType: videoFile.ContentType,
// knownLength: videoFile.ContentLength,
// filename: video.audioStorageKey,
// })

// formData.append('model', 'whisper-1')
// formData.append('response_format', 'json')
// formData.append('temperature', '0')
// formData.append('language', 'pt')

const response = await axios.post(
'https://uploader-us01.pandavideo.com.br/files',
videoFile.Body,
Expand All @@ -81,7 +70,12 @@ export async function POST(request: Request) {
)

return NextResponse.json({ data: response.data })
} catch (err) {
console.log(err)
} catch (err: any) {
console.error(err)

return NextResponse.json(
{ message: 'Error uploading video.', error: err?.message || '' },
{ status: 401 },
)
}
}
1 change: 1 addition & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const env = createEnv({
NEXTAUTH_SECRET: z.string().min(1),
GOOGLE_CLIENT_ID: z.string().min(1),
GOOGLE_CLIENT_SECRET: z.string().min(1),
QSTASH_TOKEN: z.string().refine(requiredOnEnv('production')),
QSTASH_CURRENT_SIGNING_KEY: z.string().refine(requiredOnEnv('production')),
QSTASH_NEXT_SIGNING_KEY: z.string().refine(requiredOnEnv('production')),
},
Expand Down
73 changes: 73 additions & 0 deletions src/lib/qstash.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { env } from '@/env'
import { Receiver, Client } from '@upstash/qstash/nodejs'

const qstash = new Client({
token: env.QSTASH_TOKEN,
})

export async function publishMessage<T = any>({
topic,
body,
runInDev = false,
}: {
topic: string
body: T
runInDev?: boolean
}) {
if (env.NODE_ENV === 'development' && runInDev === false) {
console.log(`[Skipped] Publish to "${topic}: ${JSON.stringify(body)}"`)

return
}

await qstash.publishJSON({
topic,
contentBasedDeduplication: true,
body,
})
}

export async function validateQStashSignature({
request,
runInDev = false,
}: {
request: Request
runInDev?: boolean
}) {
const requestBodyAsText = await request.text()

if (env.NODE_ENV === 'development' && runInDev === false) {
return {
bodyAsJSON: JSON.parse(requestBodyAsText),
}
}

const signature = request.headers.get('upstash-signature')

const receiver = new Receiver({
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY,
})

if (!signature) {
throw new Error('QStash signature not found.')
}

const isValid = await receiver
.verify({
signature,
body: requestBodyAsText,
})
.catch((err) => {
console.error(err)
return false
})

if (!isValid) {
throw new Error('QStash signature is invalid.')
}

return {
bodyAsJSON: JSON.parse(requestBodyAsText),
}
}
Loading