From c1c35463bbee62997e541a3a1a13405deaa458b6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 6 Jun 2024 18:20:05 +0100 Subject: [PATCH] api: Livestream recording customization (#2196) * api/schema: Add recordingSpec field to stream and session * api/stream: Propagate recordingSpec on child stream and session * api/webhook: Send recording profiles on upload task * api/stream: Fix build I love that ts trick I made * api/stream: Add test for recordingSpec propagation * api/webhooks: Add tests for recording.waiting processing --- packages/api/package.json | 2 +- packages/api/src/controllers/stream.test.ts | 42 +++++ packages/api/src/controllers/stream.ts | 49 +++--- packages/api/src/schema/api-schema.yaml | 19 ++ packages/api/src/test-server.ts | 5 +- packages/api/src/webhooks/cannon.test.ts | 185 ++++++++++++++++++-- packages/api/src/webhooks/cannon.ts | 1 + 7 files changed, 266 insertions(+), 37 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index 1c3e2fdc25..603e7462b9 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -33,7 +33,7 @@ "go-livepeer:broadcaster": "bin/livepeer -broadcaster -datadir ./bin/broadcaster -orchAddr 127.0.0.1:3086 -rtmpAddr 0.0.0.0:3035 -httpAddr :3085 -cliAddr :3075 -v 6 -authWebhookUrl http://127.0.0.1:3004/api/stream/hook -orchWebhookUrl http://127.0.0.1:3004/api/orchestrator", "go-livepeer:orchestrator": "bin/livepeer -orchestrator -datadir ./bin/orchestrator -transcoder -serviceAddr 127.0.0.1:3086 -cliAddr :3076 -v 6", "test": "POSTGRES_CONNECT_TIMEOUT=120000 jest -i --silent \"${PWD}/src\"", - "test-single": "POSTGRES_CONNECT_TIMEOUT=120000 jest -i --silent \"${PWD}/src/controllers/+$filename.+\"", + "test-single": "POSTGRES_CONNECT_TIMEOUT=120000 jest -i --silent \"${PWD}/src/.*$filename.*\"", "test:dev": "jest \"${PWD}/src\" -i --silent --watch", "test:build": "parcel build --no-autoinstall --no-minify --bundle-node-modules -t browser --out-dir ../dist-worker ../src/worker.js", "coverage": "yarn run test --coverage", diff --git a/packages/api/src/controllers/stream.test.ts b/packages/api/src/controllers/stream.test.ts index 885071d671..4dc5fc8b82 100644 --- a/packages/api/src/controllers/stream.test.ts +++ b/packages/api/src/controllers/stream.test.ts @@ -2350,6 +2350,48 @@ describe("controllers/stream", () => { "http://example-public/playback_id/output.mp4" ); }); + + it("should propagate stream configs to child stream and session", async () => { + // create parent stream + const configs = { + record: true, + recordingSpec: { + profiles: [ + { + name: "720p", + bitrate: 2000000, + fps: 30, + width: 1280, + height: 720, + }, + ], + }, + }; + let res = await client.post(`/stream`, { + ...smallStream, + ...configs, + }); + expect(res.status).toBe(201); + const parent = await res.json(); + expect(parent).toMatchObject(configs); + + // call transcoding hook + const sessionId = uuid(); + res = await client.post( + `/stream/${parent.id}/stream?sessionId=${sessionId}`, + { + name: "session1", + } + ); + expect(res.status).toBe(201); + const childStream = await res.json(); + expect(childStream.parentId).toEqual(parent.id); + expect(childStream.sessionId).toEqual(sessionId); + expect(childStream).toMatchObject(configs); + + const session = await db.session.get(sessionId); + expect(session).toMatchObject(configs); + }); }); }); diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index c329c8c1d7..5c1c4912cc 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -9,8 +9,8 @@ import logger from "../logger"; import { authorizer, geolocateMiddleware, - validatePost, hasAccessToResource, + validatePost, } from "../middleware"; import { CliArgs } from "../parse-cli"; import { @@ -20,9 +20,9 @@ import { StreamPatchPayload, StreamSetActivePayload, User, - Project, } from "../schema/types"; import { db, jobsDb } from "../store"; +import { cache } from "../store/cache"; import { DB } from "../store/db"; import { BadRequestError, @@ -60,7 +60,6 @@ import { } from "./helpers"; import { toExternalSession } from "./session"; import wowzaHydrate from "./wowza-hydrate"; -import { cache } from "../store/cache"; type Profile = DBStream["profiles"][number]; type MultistreamOptions = DBStream["multistream"]; @@ -91,6 +90,7 @@ const EMPTY_NEW_STREAM_PAYLOAD: Required< multistream: undefined, pull: undefined, record: undefined, + recordingSpec: undefined, userTags: undefined, creatorId: undefined, playbackPolicy: undefined, @@ -922,31 +922,39 @@ app.post( const id = stream.playbackId.slice(0, 4) + uuid().slice(4); const createdAt = Date.now(); - const record = stream.record; - const recordObjectStoreId = - stream.recordObjectStoreId || - (record ? req.config.recordObjectStoreId : undefined); + const { + id: parentId, + playbackId, + userId, + projectId, + objectStoreId, + record, + recordingSpec, + recordObjectStoreId = record ? req.config.recordObjectStoreId : undefined, + } = stream; + const profiles = hackMistSettings( + req, + useParentProfiles ? stream.profiles : req.body.profiles + ); const childStream: DBStream = wowzaHydrate({ ...req.body, kind: "stream", - userId: stream.userId, - projectId: stream.projectId, + userId, + projectId, renditions: {}, - objectStoreId: stream.objectStoreId, + profiles, + objectStoreId, record, + recordingSpec, recordObjectStoreId, sessionId, id, createdAt, - parentId: stream.id, + parentId, region, lastSeen: 0, isActive: true, }); - childStream.profiles = hackMistSettings( - req, - useParentProfiles ? stream.profiles : childStream.profiles - ); const existingSession = await db.session.get(sessionId); if (existingSession) { @@ -956,10 +964,10 @@ app.post( } else { const session: DBSession = { id: sessionId, - parentId: stream.id, - playbackId: stream.playbackId, - userId: stream.userId, - projectId: stream.projectId, + parentId, + playbackId, + userId, + projectId, kind: "session", version: "v2", name: req.body.name, @@ -974,8 +982,9 @@ app.post( ingestRate: 0, outgoingRate: 0, deleted: false, - profiles: childStream.profiles, + profiles, record, + recordingSpec, recordObjectStoreId, recordingStatus: record ? "waiting" : undefined, }; diff --git a/packages/api/src/schema/api-schema.yaml b/packages/api/src/schema/api-schema.yaml index b4fc9b3797..c973404484 100644 --- a/packages/api/src/schema/api-schema.yaml +++ b/packages/api/src/schema/api-schema.yaml @@ -637,6 +637,21 @@ components: customization, create and configure an object store. type: boolean example: false + recordingSpec: + type: object + description: | + Configuration for recording the stream. This can only be set if + `record` is true. + additionalProperties: false + properties: + profiles: + type: array + items: + $ref: "#/components/schemas/ffmpeg-profile" + description: | + Profiles to record the stream in. If not specified, the stream + will be recorded in the same profiles as the stream itself. Keep + in mind that the source rendition will always be recorded. multistream: type: object additionalProperties: false @@ -690,6 +705,8 @@ components: $ref: "#/components/schemas/ffmpeg-profile" record: $ref: "#/components/schemas/stream/properties/record" + recordingSpec: + $ref: "#/components/schemas/stream/properties/recordingSpec" multistream: $ref: "#/components/schemas/stream/properties/multistream" userTags: @@ -923,6 +940,8 @@ components: type: array items: $ref: "#/components/schemas/ffmpeg-profile" + recordingSpec: + $ref: "#/components/schemas/stream/properties/recordingSpec" error: type: object properties: diff --git a/packages/api/src/test-server.ts b/packages/api/src/test-server.ts index 7e71244113..e0d8c1f1b8 100644 --- a/packages/api/src/test-server.ts +++ b/packages/api/src/test-server.ts @@ -2,9 +2,9 @@ * This file is imported from all the integration tests. It boots up a server based on the provided argv. */ import fs from "fs-extra"; -import { v4 as uuid } from "uuid"; -import path from "path"; import os from "os"; +import path from "path"; +import { v4 as uuid } from "uuid"; import makeApp, { AppServer } from "./index"; import argParser from "./parse-cli"; @@ -38,6 +38,7 @@ params.sendgridTemplateId = sendgridTemplateId; params.sendgridApiKey = sendgridApiKey; params.postgresUrl = `postgresql://postgres@127.0.0.1/${testId}`; params.recordObjectStoreId = "mock_store"; +params.recordCatalystObjectStoreId = "mock_store"; params.vodObjectStoreId = "mock_vod_store"; params.trustedIpfsGateways = [ "https://ipfs.example.com/ipfs/", diff --git a/packages/api/src/webhooks/cannon.test.ts b/packages/api/src/webhooks/cannon.test.ts index d31d9aeef9..171034330e 100644 --- a/packages/api/src/webhooks/cannon.test.ts +++ b/packages/api/src/webhooks/cannon.test.ts @@ -1,13 +1,21 @@ import fetch from "node-fetch"; -import serverPromise, { TestServer } from "../test-server"; +import { v4 as uuid } from "uuid"; + +import { sign } from "../controllers/helpers"; +import { USER_SESSION_TIMEOUT } from "../controllers/stream"; +import { ObjectStore, Stream, User, Webhook } from "../schema/types"; +import { db } from "../store"; +import { DBSession } from "../store/session-table"; +import { DBStream } from "../store/stream-table"; +import { WithID } from "../store/types"; import { + AuxTestServer, TestClient, clearDatabase, startAuxTestServer, - AuxTestServer, } from "../test-helpers"; +import serverPromise, { TestServer } from "../test-server"; import { semaphore, sleep } from "../util"; -import { sign } from "../controllers/helpers"; const bodyParser = require("body-parser"); jest.setTimeout(15000); @@ -17,10 +25,11 @@ describe("webhook cannon", () => { let webhookServer: AuxTestServer; let testHost; - let mockAdminUser; - let mockNonAdminUser; - let postMockStream; - let mockWebhook; + let mockAdminUser: User; + let mockNonAdminUser: User; + let postMockStream: Stream; + let mockStore: WithID; + let mockWebhook: Webhook; let client, adminUser, adminToken, nonAdminUser, nonAdminToken; async function setupUsers(server) { @@ -108,7 +117,13 @@ describe("webhook cannon", () => { webhookServer = await startAuxTestServer(30000); testHost = `http://127.0.0.1:${webhookServer.port}`; - console.log("beforeALL done"); + + mockStore = { + id: "mock_store", + url: `s3+http://localhost:${webhookServer.port}/bucket-name`, + publicUrl: `http://localhost:${webhookServer.port}/bucket-name`, + userId: mockAdminUser.id, + }; }); afterAll(async () => { @@ -118,6 +133,8 @@ describe("webhook cannon", () => { beforeEach(async () => { ({ client, adminUser, adminToken, nonAdminUser, nonAdminToken } = await setupUsers(server)); + + await db.objectStore.create(mockStore); }); afterEach(async () => { @@ -159,12 +176,13 @@ describe("webhook cannon", () => { beforeAll(() => { webhookServer.app.use(bodyParser.json()); webhookServer.app.use((req, res, next) => { - console.log("WEBHOOK WORKS , body", req.body); - const signatureHeader = String(req.headers["livepeer-signature"]); - const signature: string = signatureHeader.split(",")[1].split("=")[1]; - expect(signature).toEqual( - sign(JSON.stringify(req.body), mockWebhook.sharedSecret) - ); + if (req.path.startsWith("/webhook")) { + const signatureHeader = String(req.headers["livepeer-signature"]); + const signature: string = signatureHeader.split(",")[1].split("=")[1]; + expect(signature).toEqual( + sign(JSON.stringify(req.body), mockWebhook.sharedSecret) + ); + } next(); }); webhookServer.app.post("/webhook", (req, res) => { @@ -395,6 +413,145 @@ describe("webhook cannon", () => { await Promise.all(sems.map((s) => s.wait(3000))); expect(calledCounts).toEqual([4, 2]); }); + + describe("recording.waiting handling", () => { + let parentStream: DBStream; + let childStream: DBStream; + let session: DBSession; + + beforeEach(async () => { + // create parent stream + let res = await client.post(`/stream`, { + ...postMockStream, + record: true, + recordingSpec: { + profiles: [ + { + name: "720p", + bitrate: 2000000, + fps: 30, + width: 1280, + height: 720, + }, + ], + }, + }); + expect(res.status).toBe(201); + parentStream = await res.json(); + + // create child stream and session + const sessionId = uuid(); + res = await client.post( + `/stream/${parentStream.id}/stream?sessionId=${sessionId}`, + { + name: "session1", + } + ); + expect(res.status).toBe(201); + childStream = await res.json(); + expect(childStream).toMatchObject({ + sessionId, + parentId: parentStream.id, + isActive: true, + }); + + session = await db.session.get(sessionId); + expect(session).toMatchObject({ id: sessionId }); + + const lastSeen = Date.now() - 2 * USER_SESSION_TIMEOUT; + await db.stream.update(childStream.id, { lastSeen }); + await db.session.update(session.id, { + lastSeen, + sourceSegments: 1, + }); + + webhookServer.app.all("/bucket-name/*", (req, res) => { + console.log("req.url", req.url); + res.end("a good file"); + }); + }); + + it("should create asset from recording.waiting event", async () => { + const res = await client.post("/webhook", { + ...mockWebhook, + name: "test-recording-waiting", + events: ["recording.waiting"], + }); + expect(res.status).toBe(201); + const webhook = await res.json(); + expect(webhook.userId).toEqual(nonAdminUser.id); + + const sem = semaphore(); + let called = false; + webhookCallback = () => { + called = true; + sem.release(); + }; + + await server.queue.publishWebhook("events.recording.waiting", { + type: "webhook_event", + id: "webhook_test_12", + timestamp: Date.now(), + streamId: parentStream.id, + sessionId: session.id, + event: "recording.waiting", + userId: nonAdminUser.id, + }); + + await sem.wait(3000); + expect(called).toBe(true); + + const asset = await db.asset.get(session.id); + expect(asset).toMatchObject({ + id: session.id, + userId: nonAdminUser.id, + source: { + type: "recording", + sessionId: session.id, + }, + status: { + phase: "waiting", + }, + }); + }); + + it("should propagate recording spec to upload task", async () => { + const res = await client.post("/webhook", { + ...mockWebhook, + name: "test-recording-waiting", + events: ["recording.waiting"], + }); + expect(res.status).toBe(201); + const webhook = await res.json(); + expect(webhook.userId).toEqual(nonAdminUser.id); + + const sem = semaphore(); + let called = false; + webhookCallback = () => { + called = true; + sem.release(); + }; + + await server.queue.publishWebhook("events.recording.waiting", { + type: "webhook_event", + id: "webhook_test_12", + timestamp: Date.now(), + streamId: parentStream.id, + sessionId: session.id, + event: "recording.waiting", + userId: nonAdminUser.id, + }); + + await sem.wait(3000); + expect(called).toBe(true); + + const [tasks] = await db.task.find({ outputAssetId: session.id }); + expect(tasks).toHaveLength(1); + expect(tasks[0].params?.upload).toMatchObject({ + profiles: parentStream.recordingSpec.profiles, + }); + }); + }); }); describe("local IP check", () => { diff --git a/packages/api/src/webhooks/cannon.ts b/packages/api/src/webhooks/cannon.ts index c5f0e1e90d..e9869fc47a 100644 --- a/packages/api/src/webhooks/cannon.ts +++ b/packages/api/src/webhooks/cannon.ts @@ -585,6 +585,7 @@ export default class WebhookCannon { { upload: { url: url, + profiles: session.recordingSpec?.profiles, thumbnails: !(await isExperimentSubject( "vod-thumbs-off", session.userId