Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the concatener a Readable and upload to google chunk by chunk #90

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions loadtest/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import base64
import gzip
from molotov import scenario, setup, global_setup, teardown, global_teardown
from molotov import set_var, get_var

Expand All @@ -26,6 +27,12 @@
# This is the various file sizes we'll generate in the global setup.
_FILE_SIZES = (512, 1024, 5 * 1024, 20 * 1024)

# These are gzip-compressed chunks of json that we'll concatenate later to
# generate our payload. This takes advantage that a gzip stream is made of
# concatenated gzip chunks.
_COMPRESSED_JSON_PREFIX = gzip.compress(b'{"foo":"')
_COMPRESSED_JSON_SUFFIX = gzip.compress(b'"}')


def setup_api_endpoint():
"""Sets up the _API global that we use in all scenarii.
Expand Down Expand Up @@ -62,7 +69,11 @@ def test_starts(args):
* we generate the various files to be sent in the tests.
"""
setup_api_endpoint()
files = {x: os.urandom(x * 1024) for x in _FILE_SIZES}
# "512" instead of "1024" because writing in hexadecimal takes 2 bytes.
files = {x: gzip.compress(
os.urandom(x * 512).hex().encode(),
compresslevel=0)
for x in _FILE_SIZES}
set_var("files", files)


Expand Down Expand Up @@ -131,6 +142,25 @@ def jwt_base64_decode(payload):
return decoded_str


def payload_from_raw_data(raw_data):
"""Returns a data suitable to publish, that's accepted by the profiler server.

This concatenates separate pre-created gzip-compressed chunks, because we
want that we do as less work as possible at runtime. Here at runtime we
only compress a very small chunk and otherwise concatenate everything.
"""
# By adding some random bytes, the content will change for each test and
# therefore the filename too. This prevents google from erroring while we
# stress test.
unique_data = gzip.compress(os.urandom(10).hex().encode(), compresslevel=0)
return (
_COMPRESSED_JSON_PREFIX +
raw_data +
unique_data +
_COMPRESSED_JSON_SUFFIX
)


async def publish(session, data_size):
"""Publishes a profile with the passed data size
"""
Expand All @@ -143,10 +173,7 @@ async def publish(session, data_size):
)

data = get_var('files')[data_size]
# By adding some random bytes, the content will change for each test and
# therefore the filename too. This prevents google from erroring while we
# stress test.
data = data + os.urandom(10)
data = payload_from_raw_data(data)

async with session.post(_API + '/compressed-store', data=data) as resp:
assert resp.status == 200
Expand Down
43 changes: 38 additions & 5 deletions loadtest/publish_short_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import os
import base64
import gzip
from molotov import scenario, setup, global_setup, teardown, global_teardown
from molotov import set_var, get_var

Expand All @@ -27,6 +28,12 @@
# This is the various file sizes we'll generate in the global setup.
_FILE_SIZES = (1, 10, 50)

# These are gzip-compressed chunks of json that we'll concatenate later to
# generate our payload. This takes advantage that a gzip stream is made of
# concatenated gzip chunks.
_COMPRESSED_JSON_PREFIX = gzip.compress(b'{"foo":"')
_COMPRESSED_JSON_SUFFIX = gzip.compress(b'"}')


def setup_api_endpoint():
"""Sets up the _API global that we use in all scenarii.
Expand Down Expand Up @@ -63,7 +70,11 @@ def test_starts(args):
* we generate the various files to be sent in the tests.
"""
setup_api_endpoint()
files = {x: os.urandom(x * 1024) for x in _FILE_SIZES}
# "512" instead of "1024" because writing in hexadecimal takes 2 bytes.
files = {x: gzip.compress(
os.urandom(x * 512).hex().encode(),
compresslevel=0)
for x in _FILE_SIZES}
set_var("files", files)


Expand Down Expand Up @@ -132,6 +143,25 @@ def jwt_base64_decode(payload):
return decoded_str


def payload_from_raw_data(raw_data):
"""Returns a data suitable to publish, that's accepted by the profiler server.

This concatenates separate pre-created gzip-compressed chunks, because we
want that we do as less work as possible at runtime. Here at runtime we
only compress a very small chunk and otherwise concatenate everything.
"""
# By adding some random bytes, the content will change for each test and
# therefore the filename too. This prevents google from erroring while we
# stress test.
unique_data = gzip.compress(os.urandom(10).hex().encode(), compresslevel=0)
return (
_COMPRESSED_JSON_PREFIX +
raw_data +
unique_data +
_COMPRESSED_JSON_SUFFIX
)


async def publish(session, data_size):
"""Publishes a profile with the passed data size
"""
Expand All @@ -144,10 +174,7 @@ async def publish(session, data_size):
)

data = get_var('files')[data_size]
# By adding some random bytes, the content will change for each test and
# therefore the filename too. This prevents google from erroring while we
# stress test.
data = data + os.urandom(10)
data = payload_from_raw_data(data)

async with session.post(_API + '/compressed-store', data=data) as resp:
assert resp.status == 200
Expand All @@ -173,6 +200,12 @@ async def delete(session, jwt_token):

# Each scenario has a weight. Molotov uses it to determine how often the
# scenario is picked.
@scenario(1)
async def publish_and_delete(session):
jwt_token = await publish(session=session, data_size=10)
await delete(session=session, jwt_token=jwt_token)


@scenario(2)
async def publish_1k(session):
await publish(session=session, data_size=1)
Expand Down
23 changes: 18 additions & 5 deletions src/routes/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
forwardErrors,
GunzipWrapper,
} from '../utils/streams';
import { PayloadTooLargeError } from '../utils/errors';
import { PayloadTooLargeError, BadRequestError } from '../utils/errors';

const MAX_BODY_LENGTH = 50 * 1024 * 1024; // 50MB

Expand Down Expand Up @@ -118,21 +118,34 @@ export function publishRoutes() {
// Koa which will expose it appropriately to the caller.
await Promise.all([pipelinePromise, jsonCheckerPromise]);

// Step 2: Upload the data to Google Cloud Storage.
// All data chunks have been stored in the concatener, that we'll read from
// to feed the google write stream.
const storage = gcsStorageCreate(config);
const hash = hasherTransform.sha1();

const googleStorageStream = storage.getWriteStreamForFile(hash);

// We can't use Stream.finished here because of a problem with Google's
// We don't use `pipeline` because we want to cleanly unpipe and cleanup
// when the request is aborted.
concatener.pipe(googleStorageStream);

ctx.req.on('aborted', () => {
log.debug('request-aborted', 'The request has been aborted!');
concatener.unpipe(googleStorageStream);
const error = new BadRequestError('The request has been aborted.');
googleStorageStream.destroy(error);
concatener.destroy(error);
});
Comment on lines +132 to +138
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my test this doesn't seem to make a big difference. But this does no harm 🤷


// We can't use Stream.finished here either because of a problem with Google's
// library. For more information, you can see
// https://github.com/googleapis/nodejs-storage/issues/937
await new Promise((resolve, reject) => {
const fullContent = concatener.transferContents();
googleStorageStream.once('error', reject);
googleStorageStream.once('finish', resolve);
googleStorageStream.end(fullContent);
});
googleStorageStream.destroy();
concatener.destroy();

const jwtToken = Jwt.generateToken({ profileToken: hash });

Expand Down
33 changes: 18 additions & 15 deletions src/utils/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

// This file holds various utilities about streams.

import { Transform, Writable, Readable } from 'stream';
import { Transform, Duplex, Writable, Readable } from 'stream';
import crypto from 'crypto';
import { StringDecoder } from 'string_decoder';
import { createGunzip, type Gunzip } from 'zlib';
Expand Down Expand Up @@ -89,7 +89,7 @@ export class LengthCheckerPassThrough extends Transform {
* This writable stream keeps all received chunks until the stream is closed.
* Then the chunks are concatenated into a unique Buffer that can be retrieved.
*/
export class Concatenator extends Writable {
export class Concatenator extends Duplex {
log: Logger = getLogger('Concatenator');
chunks: Buffer[] = [];
contents: Buffer | null = null;
Expand All @@ -112,10 +112,23 @@ export class Concatenator extends Writable {
callback();
}

_read() {
let continueReading = true;
while (continueReading) {
if (this.chunks.length) {
// While we have chunks we push them until the subsystem says "stop!".
continueReading = this.push(this.chunks.shift());
} else {
// This is the end of this stream! Pushing null notifies this.
this.push(null);
break;
}
}
}

_destroy(err: ?Error, callback: (error?: Error) => mixed) {
this.log.trace('_destroy()');
this.chunks.length = 0;
this.contents = null;

// Passthrough the error information, if present.
// This line is needed because of the slightly inconsistent
Expand All @@ -124,20 +137,10 @@ export class Concatenator extends Writable {
callback(err);
}

_final(callback: (error?: Error) => mixed) {
this.log.trace('_final()');
this.contents = Buffer.concat(this.chunks);
this.chunks.length = 0;
callback();
}

transferContents(): Buffer {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this one for tests, as it's still useful in that context. But we shouldn't use it for the production code.

I wonder if I should add a check for NODE_ENV==='test'... That wouldn't work when running tests obviously but we'd see it when running integration tests. Thoughts?

this.log.trace('transferContents()');
const contents = this.contents;
if (contents === null) {
throw new Error(`Can't transfer before the stream has been closed.`);
}
this.contents = null;
const contents = Buffer.concat(this.chunks);
this.chunks.length = 0;
return contents;
}
}
Expand Down