Skip to content

Commit

Permalink
api: Allow patching a stream recordingSpec (#2220)
Browse files Browse the repository at this point in the history
* api: Fix the profiles fields

- Add documentation to stream's and asset's profiles
- Reference doc from payloads
- Use right type for recordingSpec.profiles
- Propagate profiles on upload APIs as well (didn't know they already were configurable there)
- Avoid some repetition by cross-referencing payloads

* api: Repeat SDK workaround for recordingSpec.profiles

* api: Support recordingSpec on stream patch as well

* api: Add a couple tests
  • Loading branch information
victorges authored Jun 18, 2024
1 parent 701ddd5 commit 66a2165
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 73 deletions.
100 changes: 53 additions & 47 deletions packages/api/src/controllers/asset.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,20 @@
import { authorizer } from "../middleware";
import { validatePost } from "../middleware";
import { Request, RequestHandler, Router, Response } from "express";
import jwt, { JwtPayload } from "jsonwebtoken";
import { v4 as uuid } from "uuid";
import { FileStore as TusFileStore } from "@tus/file-store";
import { S3Store as TusS3Store } from "@tus/s3-store";
import {
Server as TusServer,
EVENTS as TUS_EVENTS,
Server as TusServer,
Upload as TusUpload,
} from "@tus/server";
import { S3Store as TusS3Store } from "@tus/s3-store";
import { FileStore as TusFileStore } from "@tus/file-store";
import { Request, Response, Router } from "express";
import mung from "express-mung";
import httpProxy from "http-proxy";
import jwt, { JwtPayload } from "jsonwebtoken";
import _ from "lodash";
import {
makeNextHREF,
parseFilters,
parseOrder,
getS3PresignedUrl,
toStringValues,
pathJoin,
getObjectStoreS3Config,
reqUseReplica,
isValidBase64,
mapInputCreatorId,
addDefaultProjectId,
} from "./helpers";
import { db } from "../store";
import os from "os";
import sql from "sql-template-strings";
import {
ForbiddenError,
UnprocessableEntityError,
NotFoundError,
BadRequestError,
InternalServerError,
UnauthorizedError,
NotImplementedError,
} from "../store/errors";
import httpProxy from "http-proxy";
import { generateUniquePlaybackId } from "./generate-keys";
import { v4 as uuid } from "uuid";
import { authorizer, validatePost } from "../middleware";
import { CliArgs } from "../parse-cli";
import {
Asset,
AssetPatchPayload,
Expand All @@ -46,20 +24,40 @@ import {
NewAssetPayload,
ObjectStore,
PlaybackPolicy,
Project,
Task,
} from "../schema/types";
import { WithID } from "../store/types";
import Queue from "../store/queue";
import { taskScheduler, ensureQueueCapacity } from "../task/scheduler";
import os from "os";
import { db } from "../store";
import {
BadRequestError,
ForbiddenError,
InternalServerError,
NotFoundError,
NotImplementedError,
UnauthorizedError,
UnprocessableEntityError,
} from "../store/errors";
import {
ensureExperimentSubject,
isExperimentSubject,
} from "../store/experiment-table";
import { CliArgs } from "../parse-cli";
import mung from "express-mung";
import Queue from "../store/queue";
import { WithID } from "../store/types";
import { ensureQueueCapacity, taskScheduler } from "../task/scheduler";
import { getClips } from "./clip";
import { generateUniquePlaybackId } from "./generate-keys";
import {
addDefaultProjectId,
getObjectStoreS3Config,
getS3PresignedUrl,
isValidBase64,
makeNextHREF,
mapInputCreatorId,
parseFilters,
parseOrder,
pathJoin,
reqUseReplica,
toStringValues,
} from "./helpers";

// 7 Days
const DELETE_ASSET_DELAY = 7 * 24 * 60 * 60 * 1000;
Expand Down Expand Up @@ -197,15 +195,22 @@ export async function validateAssetPayload(
const userId = req.user.id;
const payload = req.body as NewAssetPayload;

if (payload.objectStoreId) {
const os = await getActiveObjectStore(payload.objectStoreId);
const { name, profiles, staticMp4, creatorId, objectStoreId, storage } =
payload;

if (objectStoreId) {
const os = await getActiveObjectStore(objectStoreId);
if (os.userId !== userId) {
throw new ForbiddenError(
`the provided object store is not owned by user`,
);
}
}

if (profiles && !profiles.length) {
throw new BadRequestError("assets must have at least one profile");
}

// Validate playbackPolicy on creation to generate resourceId & check if unifiedAccessControlConditions is present when using lit_signing_condition
const playbackPolicy = await validateAssetPlaybackPolicy(
payload,
Expand Down Expand Up @@ -237,14 +242,15 @@ export async function validateAssetPayload(
phase: source.type === "directUpload" ? "uploading" : "waiting",
updatedAt: createdAt,
},
name: payload.name,
name,
source,
staticMp4: payload.staticMp4,
...(profiles ? { profiles } : null), // avoid serializing null profiles on the asset,
staticMp4,
projectId: req.project?.id,
creatorId: mapInputCreatorId(payload.creatorId),
creatorId: mapInputCreatorId(creatorId),
playbackPolicy,
objectStoreId: payload.objectStoreId || (await defaultObjectStoreId(req)),
storage: storageInputToState(payload.storage),
objectStoreId: objectStoreId || (await defaultObjectStoreId(req)),
storage: storageInputToState(storage),
};
}

Expand Down
64 changes: 58 additions & 6 deletions packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import {
AuxTestServer,
TestClient,
clearDatabase,
createApiToken,
createProject,
setupUsers,
startAuxTestServer,
createProject,
createApiToken,
useApiTokenWithProject,
} from "../test-helpers";
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
Expand Down Expand Up @@ -1266,6 +1265,35 @@ describe("controllers/stream", () => {
expect(json.errors[0]).toContain("additionalProperties");
});

it("should disallow adding recordingSpec without record=true", async () => {
const res = await client.patch(patchPath, {
recordingSpec: { profiles: [{ bitrate: 1600000 }] },
});
expect(res.status).toBe(400);
const json = await res.json();
expect(json.errors[0]).toContain(
"recordingSpec is only supported with record=true",
);
});

it("should allow patching recordingSpec with record=true", async () => {
const res = await client.patch(patchPath, {
record: true,
recordingSpec: { profiles: [{ bitrate: 3000000 }] },
});
expect(res.status).toBe(204);
});

it("should remove null recordingSpec.profiles", async () => {
const res = await client.patch(patchPath, {
record: true,
recordingSpec: { profiles: null },
});
expect(res.status).toBe(204);
const updated = await db.stream.get(stream.id);
expect(updated.recordingSpec).toEqual({});
});

it("should validate field types", async () => {
const testTypeErr = async (payload: any) => {
let res = await client.patch(patchPath, payload);
Expand Down Expand Up @@ -1425,13 +1453,37 @@ describe("controllers/stream", () => {
});

it("should not accept additional properties for creating a stream", async () => {
const postMockLivepeerStream = JSON.parse(JSON.stringify(postMockStream));
postMockLivepeerStream.livepeer = "livepeer";
const res = await client.post("/stream", { ...postMockLivepeerStream });
const res = await client.post("/stream", {
...postMockStream,
livepeer: "livepeer",
});
expect(res.status).toBe(422);
const stream = await res.json();
expect(stream.id).toBeUndefined();
});

it("should not accept recordingSpec without record", async () => {
const res = await client.post("/stream", {
...postMockStream,
recordingSpec: { profiles: [{ bitrate: 1600000 }] },
});
expect(res.status).toBe(400);
const json = await res.json();
expect(json.errors[0]).toContain(
"recordingSpec is only supported with record=true",
);
});

it("should remove null profiles from recordingSpec", async () => {
const res = await client.post("/stream", {
...postMockStream,
record: true,
recordingSpec: { profiles: null },
});
expect(res.status).toBe(201);
const stream = await res.json();
expect(stream.recordingSpec).toEqual({});
});
});

describe("stream endpoint with api key", () => {
Expand Down
36 changes: 34 additions & 2 deletions packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ async function handleCreateStream(req: Request, payload: NewStreamPayload) {
playbackId += "-test";
}

const { objectStoreId } = payload;
let { objectStoreId, recordingSpec } = payload;
if (objectStoreId) {
const store = await db.objectStore.get(objectStoreId);
if (!store || store.deleted || store.disabled) {
Expand All @@ -1415,10 +1415,25 @@ async function handleCreateStream(req: Request, payload: NewStreamPayload) {
}
}

if (recordingSpec) {
if (!payload.record) {
throw new BadRequestError(
`recordingSpec is only supported with record=true`,
);
}
if (!recordingSpec.profiles) {
// remove null profiles from the recordingSpec. it's only supported on the
// input as an SDK workaround but we want to avoid serializing them as null.
const { profiles, ...rest } = recordingSpec;
recordingSpec = rest;
}
}

let doc: DBStream = {
...payload,
profiles: payload.profiles || req.config.defaultStreamProfiles,
kind: "stream",
profiles: payload.profiles || req.config.defaultStreamProfiles,
recordingSpec,
userId: req.user.id,
creatorId: mapInputCreatorId(payload.creatorId),
renditions: {},
Expand Down Expand Up @@ -1932,6 +1947,7 @@ app.patch(
userTags,
creatorId,
profiles,
recordingSpec,
} = payload;
if (record != undefined && stream.isActive && stream.record != record) {
res.status(400);
Expand All @@ -1953,6 +1969,22 @@ app.patch(
creatorId: mapInputCreatorId(creatorId),
};

if (recordingSpec) {
const { record } = typeof payload.record === "boolean" ? payload : stream;
if (!record) {
throw new BadRequestError(
`recordingSpec is only supported with record=true`,
);
}
if (!recordingSpec.profiles) {
// remove null profiles from the recordingSpec. it's only supported on the
// input as an SDK workaround but we want to avoid serializing them as null.
const { profiles, ...rest } = recordingSpec;
recordingSpec = rest;
}
patch = { ...patch, recordingSpec };
}

if (multistream) {
multistream = await validateMultistreamOpts(
req.user.id,
Expand Down
Loading

0 comments on commit 66a2165

Please sign in to comment.