Skip to content

Commit

Permalink
Limit download concurrency for files within Kibana (elastic#168601)
Browse files Browse the repository at this point in the history
## Summary

Closes elastic#151986

This PR adds a limit to the number of concurrent downloads a user can
initiate from kibana.

P.S.

This PR renames the previous static method `configureConcurrentUpload`
exposed by the `ElasticsearchBlobStorageClient` to
`configureConcurrentTransfers` so that one might use a single static
method to configure the limits for concurrent transfers possible for
upload and download transfers in one go.

The new static method `configureConcurrentTransfers` accepts either a
number or a tuple, when a number is passed it's value is set as the
concurrent limit for both uploads and transfers, when a tuple is passed
the value of the first index sets the concurrent limit for uploads,
whilst the value of the second index sets the concurrent limit for
downloads.


<!--
### Checklist

Delete any items that are not applicable to this PR.

- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] Any UI touched in this PR is usable by keyboard only (learn more
about [keyboard accessibility](https://webaim.org/techniques/keyboard/))
- [ ] Any UI touched in this PR does not create any new axe failures
(run axe in browser:
[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),
[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This renders correctly on smaller devices using a responsive
layout. (You can test this [in your
browser](https://www.browserstack.com/guide/responsive-testing-on-local-server))
- [ ] This was checked for [cross-browser
compatibility](https://www.elastic.co/support/matrix#matrix_browsers)


### Risk Matrix

Delete this section if it is not applicable to this PR.

Before closing this PR, invite QA, stakeholders, and other developers to
identify risks that should be tested prior to the change/feature
release.

When forming the risk matrix, consider some of the following examples
and how they may potentially impact the change:

| Risk | Probability | Severity | Mitigation/Notes |

|---------------------------|-------------|----------|-------------------------|
| Multiple Spaces&mdash;unexpected behavior in non-default Kibana Space.
| Low | High | Integration tests will verify that all features are still
supported in non-default Kibana Space and when user switches between
spaces. |
| Multiple nodes&mdash;Elasticsearch polling might have race conditions
when multiple Kibana nodes are polling for the same tasks. | High | Low
| Tasks are idempotent, so executing them multiple times will not result
in logical error, but will degrade performance. To test for this case we
add plenty of unit tests around this logic and document manual testing
procedure. |
| Code should gracefully handle cases when feature X or plugin Y are
disabled. | Medium | High | Unit tests will verify that any feature flag
or plugin combination still results in our service operational. |
| [See more potential risk
examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) |


### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
-->
  • Loading branch information
eokoneyo authored Oct 30, 2023
1 parent e804ef2 commit af88e09
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 18 deletions.
3 changes: 2 additions & 1 deletion packages/kbn-logging-mocks/src/logger.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ const createLoggerMock = (context: string[] = []) => {
isLevelEnabled: jest.fn(),
};
mockLog.get.mockImplementation((...ctx) => ({
ctx,
...mockLog,
context: Array.isArray(context) ? context.concat(ctx) : [context, ...ctx].filter(Boolean),
}));

mockLog.isLevelEnabled.mockReturnValue(true);

return mockLog;
Expand Down
109 changes: 103 additions & 6 deletions src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@
*/

import { Readable } from 'stream';
import { encode } from 'cbor-x';
import { promisify } from 'util';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Semaphore } from '@kbn/std';
import { errors } from '@elastic/elasticsearch';
import type { GetResponse } from '@elastic/elasticsearch/lib/api/types';

import { ElasticsearchBlobStorageClient } from './es';
import { errors } from '@elastic/elasticsearch';

const setImmediate = promisify(global.setImmediate);

describe('ElasticsearchBlobStorageClient', () => {
let esClient: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
let semaphore: Semaphore;
let uploadSemaphore: Semaphore;
let downloadSemaphore: Semaphore;
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

// Exposed `clearCache()` which resets the cache for the memoized `createIndexIfNotExists()` method
Expand All @@ -38,20 +41,24 @@ describe('ElasticsearchBlobStorageClient', () => {
index,
undefined,
logger,
semaphore,
uploadSemaphore,
downloadSemaphore,
indexIsAlias
);
};

beforeEach(() => {
semaphore = new Semaphore(1);
uploadSemaphore = new Semaphore(1);
downloadSemaphore = new Semaphore(1);
logger = loggingSystemMock.createLogger();
esClient = elasticsearchServiceMock.createElasticsearchClient();

jest.clearAllMocks();
});

test('limits max concurrent uploads', async () => {
const blobStoreClient = createBlobStoreClient();
const acquireSpy = jest.spyOn(semaphore, 'acquire');
const uploadAcquireSpy = jest.spyOn(uploadSemaphore, 'acquire');
esClient.index.mockImplementation(() => {
return new Promise((res, rej) => setTimeout(() => rej('failed'), 100));
});
Expand All @@ -62,7 +69,7 @@ describe('ElasticsearchBlobStorageClient', () => {
blobStoreClient.upload(Readable.from(['test'])).catch(() => {}),
];
await setImmediate();
expect(acquireSpy).toHaveBeenCalledTimes(4);
expect(uploadAcquireSpy).toHaveBeenCalledTimes(4);
await p1;
expect(esClient.index).toHaveBeenCalledTimes(1);
await p2;
Expand All @@ -71,6 +78,96 @@ describe('ElasticsearchBlobStorageClient', () => {
expect(esClient.index).toHaveBeenCalledTimes(4);
});

test('limits max concurrent downloads', async () => {
const index = 'someplace';

const blobStoreClient = createBlobStoreClient(index);
const downloadAcquireSpy = jest.spyOn(downloadSemaphore, 'acquire');

const downloadsToQueueCount = 4;
const documentsChunkCount = 2;

const createDownloadContent = (documentId: number, chunkId: number) => {
return Buffer.concat([
Buffer.from(`download content ${documentId}.${chunkId}`, 'utf8'),
Buffer.alloc(10 * 1028, `chunk ${chunkId}`),
]);
};

const downloadContentMap = Array.from(new Array(downloadsToQueueCount)).map(
(_, documentIdx) => ({
fileContent: Array.from(new Array(documentsChunkCount)).map((__, chunkIdx) =>
createDownloadContent(documentIdx, chunkIdx)
),
})
);

esClient.get.mockImplementation(({ id: headChunkId }) => {
const [documentId, chunkId] = headChunkId.split(/\./);

return new Promise(function (resolve) {
setTimeout(
() =>
resolve(
Readable.from([
encode({
found: true,
_source: {
data: downloadContentMap[Number(documentId)].fileContent[Number(chunkId)],
},
}),
]) as unknown as GetResponse
),
100
);
});
});

const getDownloadStreamContent = async (stream: Readable) => {
const chunks: Buffer[] = [];

for await (const chunk of stream) {
chunks.push(chunk);
}

/**
* we are guaranteed that the chunks for the complete document
* will equal the document chunk count specified within this test suite.
* See {@link ContentStream#isRead}
*/
expect(chunks.length).toBe(documentsChunkCount);

return Buffer.concat(chunks).toString();
};

const [p1, p2, ...rest] = downloadContentMap.map(({ fileContent }, idx) => {
// expected document size will be our returned mock file content
// will be the sum of the lengths of chunks the entire document is split into
const documentSize = fileContent.reduce((total, chunk) => total + chunk.length, 0);

return blobStoreClient.download({
id: String(idx),
size: documentSize,
});
});

await setImmediate();
expect(downloadAcquireSpy).toHaveBeenCalledTimes(downloadsToQueueCount);

const p1DownloadStream = await p1;
const p1DownloadContent = await getDownloadStreamContent(p1DownloadStream);
expect(esClient.get).toHaveBeenCalledTimes(1 * documentsChunkCount);
expect(p1DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s0.*/));

const p2DownloadStream = await p2;
const p2DownloadContent = await getDownloadStreamContent(p2DownloadStream);
expect(esClient.get).toHaveBeenCalledTimes(2 * documentsChunkCount);
expect(p2DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s1.*/));

await Promise.all(rest.map((dp) => dp.then((ds) => getDownloadStreamContent(ds))));
expect(esClient.get).toHaveBeenCalledTimes(downloadsToQueueCount * documentsChunkCount);
});

describe('.createIndexIfNotExists()', () => {
let data: Readable;

Expand Down
39 changes: 31 additions & 8 deletions src/plugins/files/server/blob_storage_service/adapters/es/es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Semaphore } from '@kbn/std';
import { Readable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { promisify } from 'util';
import { lastValueFrom, defer } from 'rxjs';
import { lastValueFrom, defer, firstValueFrom } from 'rxjs';
import { PerformanceMetricEvent, reportPerformanceMetricEvent } from '@kbn/ebt-tools';
import { memoize } from 'lodash';
import { FilesPlugin } from '../../../plugin';
Expand All @@ -38,14 +38,21 @@ interface UploadOptions {
}

export class ElasticsearchBlobStorageClient implements BlobStorageClient {
private static defaultSemaphore: Semaphore;
private static defaultUploadSemaphore: Semaphore;
private static defaultDownloadSemaphore: Semaphore;

/**
* Call this function once to globally set a concurrent upload limit for
* Call this function once to globally set the concurrent transfer (upload/download) limit for
* all {@link ElasticsearchBlobStorageClient} instances.
*/
public static configureConcurrentUpload(capacity: number) {
this.defaultSemaphore = new Semaphore(capacity);
public static configureConcurrentTransfers(capacity: number | [number, number]) {
if (Array.isArray(capacity)) {
this.defaultUploadSemaphore = new Semaphore(capacity[0]);
this.defaultDownloadSemaphore = new Semaphore(capacity[1]);
} else {
this.defaultUploadSemaphore = new Semaphore(capacity);
this.defaultDownloadSemaphore = new Semaphore(capacity);
}
}

constructor(
Expand All @@ -57,11 +64,23 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
* Override the default concurrent upload limit by passing in a different
* semaphore
*/
private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultSemaphore,
private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultUploadSemaphore,
/**
* Override the default concurrent download limit by passing in a different
* semaphore
*/
private readonly downloadSemaphore = ElasticsearchBlobStorageClient.defaultDownloadSemaphore,
/** Indicates that the index provided is an alias (changes how content is retrieved internally) */
private readonly indexIsAlias: boolean = false
) {
assert(this.uploadSemaphore, `No default semaphore provided and no semaphore was passed in.`);
assert(
this.uploadSemaphore,
`No default semaphore provided and no semaphore was passed in for uploads.`
);
assert(
this.downloadSemaphore,
`No default semaphore provided and no semaphore was passed in for downloads.`
);
}

/**
Expand Down Expand Up @@ -187,7 +206,11 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
// right after uploading it, we refresh the index before downloading the file.
await this.esClient.indices.refresh({ index: this.index });

return this.getReadableContentStream(id, size);
return firstValueFrom(
defer(() => Promise.resolve(this.getReadableContentStream(id, size))).pipe(
this.downloadSemaphore.acquire()
)
);
}

public async delete(id: string): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('Elasticsearch blob storage', () => {
let esRefreshIndexSpy: jest.SpyInstance;

beforeAll(async () => {
ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity);
ElasticsearchBlobStorageClient.configureConcurrentTransfers(Infinity);
const { startES, startKibana } = createTestServers({ adjustTimeout: jest.setTimeout });
manageES = await startES();
manageKbn = await startKibana();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ export class BlobStorageService {
*/
private readonly concurrentUploadsToES = 20;

/**
* The number of downloads per Kibana instance that can be running simultaneously
*/
private readonly concurrentDownloadsFromES = 5;

constructor(private readonly esClient: ElasticsearchClient, private readonly logger: Logger) {
ElasticsearchBlobStorageClient.configureConcurrentUpload(this.concurrentUploadsToES);
ElasticsearchBlobStorageClient.configureConcurrentTransfers([
this.concurrentUploadsToES,
this.concurrentDownloadsFromES,
]);
}

private createESBlobStorage({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('When initializing file client via createESFileClient()', () => {
let logger: MockedLogger;

beforeEach(() => {
ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity);
ElasticsearchBlobStorageClient.configureConcurrentTransfers(Infinity);
esClient = elasticsearchServiceMock.createElasticsearchClient();
logger = loggingSystemMock.createLogger();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export function createEsFileClient(arg: CreateEsFileClientArgs): FileClient {
undefined,
logger,
undefined,
undefined,
indexIsAlias
),
undefined,
Expand Down

0 comments on commit af88e09

Please sign in to comment.