Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: allow multipart large file uploads #752

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
"serve-static": "^1.15.0"
},
"engines": {
"node": ">=16.13.0",
"node": ">=18.0.0",
"npm": ">=8.0.0"
},
"lint-staged": {
Expand Down
2 changes: 1 addition & 1 deletion src/migrator/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Result, Tx } from '../database-layer/db';
import type { Resolvable } from '../sbvr-api/common-types';

import { createHash } from 'crypto';
import { createHash } from 'node:crypto';
import { Engines } from '@balena/abstract-sql-compiler';
import _ from 'lodash';
import { TypedError } from 'typed-error';
Expand Down
2 changes: 2 additions & 0 deletions src/server-glue/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import './sbvr-loader';
import * as dbModule from '../database-layer/db';
import * as configLoader from '../config-loader/config-loader';
import * as migrator from '../migrator/sync';
import * as webResourceHandler from '../webresource-handler';
import type * as migratorUtils from '../migrator/utils';

import * as sbvrUtils from '../sbvr-api/sbvr-utils';
Expand Down Expand Up @@ -63,6 +64,7 @@ export const init = async <T extends string>(
await sbvrUtils.setup(app, db);
const cfgLoader = await configLoader.setup(app);
await cfgLoader.loadConfig(migrator.config);
await cfgLoader.loadConfig(webResourceHandler.config);

const promises: Array<Promise<void>> = [];
if (process.env.SBVR_SERVER_ENABLED) {
Expand Down
21 changes: 21 additions & 0 deletions src/webresource-handler/handlers/NoopHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import type { WebResourceType as WebResource } from '@balena/sbvr-types';
import type { IncomingFile, UploadResponse, WebResourceHandler } from '..';
import type {
BeginUploadHandlerResponse,
BeginUploadPayload,
CommitUploadHandlerPayload,
} from '../multipartUpload';

export class NoopHandler implements WebResourceHandler {
public async handleFile(resource: IncomingFile): Promise<UploadResponse> {
Expand All @@ -18,4 +23,20 @@ export class NoopHandler implements WebResourceHandler {
public async onPreRespond(webResource: WebResource): Promise<WebResource> {
return webResource;
}

public async beginUpload(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_fieldName: string,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_payload: BeginUploadPayload,
): Promise<BeginUploadHandlerResponse> {
return { fileKey: 'noop', uploadId: 'noop', uploadUrls: [] };
}

public async commitUpload(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_payload: CommitUploadHandlerPayload,
): Promise<WebResource> {
return { filename: 'noop', href: 'noop' };
}
}
134 changes: 130 additions & 4 deletions src/webresource-handler/handlers/S3Handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,27 @@ import {
WebResourceError,
type WebResourceHandler,
} from '..';
import type {
BeginUploadHandlerResponse,
BeginUploadPayload,
CommitUploadHandlerPayload,
UploadUrl,
} from '../multipartUpload';
import {
S3Client,
type S3ClientConfig,
DeleteObjectCommand,
type PutObjectCommandInput,
GetObjectCommand,
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
HeadObjectCommand,
} from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';

import { randomUUID } from 'crypto';
import { randomUUID } from 'node:crypto';
import type { WebResourceType as WebResource } from '@balena/sbvr-types';
import memoize from 'memoizee';

Expand Down Expand Up @@ -71,9 +81,7 @@ export class S3Handler implements WebResourceHandler {

public async handleFile(resource: IncomingFile): Promise<UploadResponse> {
let size = 0;
const key = `${resource.fieldname}_${randomUUID()}_${
resource.originalname
}`;
const key = this.getFileKey(resource.fieldname, resource.originalname);
const params: PutObjectCommandInput = {
Bucket: this.bucket,
Key: key,
Expand Down Expand Up @@ -122,6 +130,62 @@ export class S3Handler implements WebResourceHandler {
return webResource;
}

public async beginUpload(
fieldName: string,
payload: BeginUploadPayload,
): Promise<BeginUploadHandlerResponse> {
const fileKey = this.getFileKey(fieldName, payload.filename);

const createMultiPartResponse = await this.client.send(
new CreateMultipartUploadCommand({
Bucket: this.bucket,
Key: fileKey,
ContentType: payload.content_type,
}),
);

if (createMultiPartResponse.UploadId == null) {
throw new WebResourceError('Failed to create multipart upload.');
}

const uploadUrls = await this.getPartUploadUrls(
fileKey,
createMultiPartResponse.UploadId,
payload,
);
return { fileKey, uploadId: createMultiPartResponse.UploadId, uploadUrls };
}

public async commitUpload({
fileKey,
uploadId,
filename,
multipartUploadChecksums,
}: CommitUploadHandlerPayload): Promise<WebResource> {
await this.client.send(
new CompleteMultipartUploadCommand({
Bucket: this.bucket,
Key: fileKey,
UploadId: uploadId,
MultipartUpload: multipartUploadChecksums,
}),
);

const headResult = await this.client.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: fileKey,
}),
);

return {
href: this.getS3URL(fileKey),
filename: filename,
size: headResult.ContentLength,
content_type: headResult.ContentType,
};
}

private s3SignUrl(fileKey: string): Promise<string> {
const command = new GetObjectCommand({
Bucket: this.bucket,
Expand All @@ -136,8 +200,70 @@ export class S3Handler implements WebResourceHandler {
return `${this.config.endpoint}/${this.bucket}/${key}`;
}

private getFileKey(fieldName: string, fileName: string) {
return `${fieldName}_${randomUUID()}_${fileName}`;
}

private getKeyFromHref(href: string): string {
const hrefWithoutParams = normalizeHref(href);
return hrefWithoutParams.substring(hrefWithoutParams.lastIndexOf('/') + 1);
}

private async getPartUploadUrls(
fileKey: string,
uploadId: string,
payload: BeginUploadPayload,
): Promise<UploadUrl[]> {
const chunkSizesWithParts = await this.getChunkSizesWithParts(
payload.size,
payload.chunk_size,
);
return Promise.all(
chunkSizesWithParts.map(async ({ chunkSize, partNumber }) => ({
chunkSize,
partNumber,
url: await this.getPartUploadUrl(
fileKey,
uploadId,
partNumber,
chunkSize,
),
})),
);
}

private async getPartUploadUrl(
fileKey: string,
uploadId: string,
partNumber: number,
partSize: number,
): Promise<string> {
const command = new UploadPartCommand({
Bucket: this.bucket,
Key: fileKey,
UploadId: uploadId,
PartNumber: partNumber,
ContentLength: partSize,
});

return getSignedUrl(this.client, command, {
expiresIn: this.signedUrlExpireTimeSeconds,
});
}

private async getChunkSizesWithParts(
size: number,
chunkSize: number,
): Promise<Array<Pick<UploadUrl, 'chunkSize' | 'partNumber'>>> {
const chunkSizesWithParts = [];
let partNumber = 1;
let remainingSize = size;
while (remainingSize > 0) {
const currentChunkSize = Math.min(remainingSize, chunkSize);
chunkSizesWithParts.push({ chunkSize: currentChunkSize, partNumber });
remainingSize -= currentChunkSize;
partNumber += 1;
}
return chunkSizesWithParts;
}
}
57 changes: 46 additions & 11 deletions src/webresource-handler/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import type * as Express from 'express';
import busboy from 'busboy';
import type * as stream from 'node:stream';
import * as uriParser from '../sbvr-api/uri-parser';
import * as sbvrUtils from '../sbvr-api/sbvr-utils';
import type { HookArgs } from '../sbvr-api/hooks';
import { getApiRoot, getModel } from '../sbvr-api/sbvr-utils';
import { checkPermissions } from '../sbvr-api/permissions';
import { NoopHandler } from './handlers/NoopHandler';
import {
odataNameToSqlName,
sqlNameToODataName,
} from '@balena/odata-to-abstract-sql';
import { errors, permissions } from '../server-glue/module';
import type { WebResourceType as WebResource } from '@balena/sbvr-types';
import busboy from 'busboy';
import type * as Express from 'express';
import type * as stream from 'node:stream';
import { TypedError } from 'typed-error';
import type { HookArgs } from '../sbvr-api/hooks';
import { checkPermissions } from '../sbvr-api/permissions';
import * as sbvrUtils from '../sbvr-api/sbvr-utils';
import { getApiRoot, getModel } from '../sbvr-api/sbvr-utils';
import * as uriParser from '../sbvr-api/uri-parser';
import { errors, permissions } from '../server-glue/module';
import { NoopHandler } from './handlers/NoopHandler';
import type {
BeginUploadHandlerResponse,
BeginUploadPayload,
CommitUploadHandlerPayload,
} from './multipartUpload';
import { multipartUploadHooks } from './multipartUpload';

export * from './handlers';

Expand All @@ -34,6 +40,14 @@ export interface WebResourceHandler {
handleFile: (resource: IncomingFile) => Promise<UploadResponse>;
removeFile: (fileReference: string) => Promise<void>;
onPreRespond: (webResource: WebResource) => Promise<WebResource>;

beginUpload: (
fieldName: string,
payload: BeginUploadPayload,
) => Promise<BeginUploadHandlerResponse>;
commitUpload: (
commitInfo: CommitUploadHandlerPayload,
) => Promise<WebResource>;
}

export class WebResourceError extends TypedError {}
Expand Down Expand Up @@ -216,7 +230,7 @@ export const getUploaderMiddlware = (
};
};

const getWebResourceFields = (
export const getWebResourceFields = (
request: uriParser.ODataRequest,
useTranslations = true,
): string[] => {
Expand Down Expand Up @@ -249,6 +263,8 @@ const throwIfWebresourceNotInMultipart = (
{ req, request }: HookArgs,
) => {
if (
request.custom.isAction !== 'beginUpload' &&
request.custom.isAction !== 'commitUpload' &&
!req.is?.('multipart') &&
webResourceFields.some((field) => request.values[field] != null)
) {
Expand Down Expand Up @@ -447,4 +463,23 @@ export const setupUploadHooks = (
resourceName,
getCreateWebResourceHooks(handler),
);

sbvrUtils.addPureHook(
'POST',
apiRoot,
resourceName,
multipartUploadHooks(handler),
);
};

// eslint-disable-next-line @typescript-eslint/no-var-requires
const webresourceModel: string = require('./webresource.sbvr');
export const config = {
models: [
{
apiRoot: 'webresource',
modelText: webresourceModel,
modelName: 'webresource',
},
] as sbvrUtils.ExecutableModel[],
};
Loading
Loading