Skip to content

Commit

Permalink
[Index Management] Fix bug with long http requests coming from index …
Browse files Browse the repository at this point in the history
…actions (#171735)
  • Loading branch information
sabarasaba authored Nov 23, 2023
1 parent 6905a0f commit 0e3351c
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 44 deletions.
4 changes: 4 additions & 0 deletions x-pack/plugins/index_management/common/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export { API_BASE_PATH, INTERNAL_API_BASE_PATH } from './api_base_path';
export { INVALID_INDEX_PATTERN_CHARS, INVALID_TEMPLATE_NAME_CHARS } from './invalid_characters';
export * from './index_statuses';

// Since each index can have a max length or 255 characters and the max length of
// the request is 4096 bytes we can fit a max of 16 indices in a single request.
export const MAX_INDICES_PER_REQUEST = 16;

export {
UIM_APP_NAME,
UIM_APP_LOAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ export const clearCacheIndices =
dispatch(reloadIndices(indexNames));
notificationService.showSuccessToast(
i18n.translate('xpack.idxMgmt.clearCacheIndicesAction.successMessage', {
defaultMessage: 'Successfully cleared cache: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage:
'Successfully cleared cache for {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export const closeIndices =
dispatch(reloadIndices(indexNames));
notificationService.showSuccessToast(
i18n.translate('xpack.idxMgmt.closeIndicesAction.successfullyClosedIndicesMessage', {
defaultMessage: 'Successfully closed: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage: 'Successfully closed {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ export const deleteIndices =
}
notificationService.showSuccessToast(
i18n.translate('xpack.idxMgmt.deleteIndicesAction.successfullyDeletedIndicesMessage', {
defaultMessage: 'Successfully deleted: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage: 'Successfully deleted {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
})
);
dispatch(deleteIndicesSuccess({ indexNames }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export const flushIndices =
dispatch(reloadIndices(indexNames));
notificationService.showSuccessToast(
i18n.translate('xpack.idxMgmt.flushIndicesAction.successfullyFlushedIndicesMessage', {
defaultMessage: 'Successfully flushed: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage: 'Successfully flushed {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ export const forcemergeIndices =
i18n.translate(
'xpack.idxMgmt.forceMergeIndicesAction.successfullyForceMergedIndicesMessage',
{
defaultMessage: 'Successfully force merged: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage:
'Successfully force merged {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
}
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export const openIndices =
dispatch(reloadIndices(indexNames));
notificationService.showSuccessToast(
i18n.translate('xpack.idxMgmt.openIndicesAction.successfullyOpenedIndicesMessage', {
defaultMessage: 'Successfully opened: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage: 'Successfully opened {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export const refreshIndices =
dispatch(reloadIndices(indexNames));
notificationService.showSuccessToast(
i18n.translate('xpack.idxMgmt.refreshIndicesAction.successfullyRefreshedIndicesMessage', {
defaultMessage: 'Successfully refreshed: [{indexNames}]',
values: { indexNames: indexNames.join(', ') },
defaultMessage: 'Successfully refreshed {count, plural, one {# index} other {# indices} }',
values: { count: indexNames.length },
})
);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { IScopedClusterClient } from '@kbn/core/server';

import { executeAsyncByChunks } from './helpers';

const generateIndices = (count: number) => {
const indices = [];

for (let i = 0; i < count; i++) {
indices.push(`index-${i}`);
}

return indices;
};

const mockClient = {
asCurrentUser: {
indices: {
delete: jest.fn(),
},
},
} as unknown as IScopedClusterClient;

describe('executeAsyncByChunks', () => {
beforeEach(() => {
jest.clearAllMocks();
});

it('should make just one request for one index', async () => {
const params = {
index: generateIndices(1),
};

await executeAsyncByChunks(params, mockClient, 'delete');

expect(mockClient.asCurrentUser.indices.delete).toHaveBeenCalledTimes(1);
});

it('should make 2 requests for 32 indices', async () => {
const params = {
index: generateIndices(32),
};

await executeAsyncByChunks(params, mockClient, 'delete');

expect(mockClient.asCurrentUser.indices.delete).toHaveBeenCalledTimes(2);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { chunk } from 'lodash';

import type { IScopedClusterClient } from '@kbn/core/server';
import { MAX_INDICES_PER_REQUEST } from '../../../../common/constants';

// To avoid having to to match method signatures with the client
// type, we use a generic CallableFn type.
type CallableFn = (args: Record<any, any>) => Promise<any>;

export async function executeAsyncByChunks<T>(
// Since we are using a key to access the index method, we need
// to use a generic type.
params: {
index: T[];
format?: string;
expand_wildcards?: string;
max_num_segments?: number;
},
dataClient: IScopedClusterClient,
methodName: keyof IScopedClusterClient['asCurrentUser']['indices']
) {
const { index: indices, ...commonParams } = params;

// When the number of indices is small, we can execute in a single request
//
// Otherwise we need to split the indices into chunks and execute them in multiple requests because
// if we try to execute an action with too many indices that account for a long string in the request
// ES will throw an error saying that the HTTP line is too large.
if (indices.length <= MAX_INDICES_PER_REQUEST) {
await (dataClient.asCurrentUser.indices[methodName] as CallableFn)({
...commonParams,
index: indices,
});
} else {
const chunks = chunk(indices, MAX_INDICES_PER_REQUEST);

await Promise.all(
chunks.map((chunkOfIndices) =>
(dataClient.asCurrentUser.indices[methodName] as CallableFn)({
...commonParams,
index: chunkOfIndices,
})
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand All @@ -28,7 +29,8 @@ export function registerClearCacheRoute({ router, lib: { handleEsError } }: Rout
};

try {
await client.asCurrentUser.indices.clearCache(params);
await executeAsyncByChunks(params, client, 'clearCache');

return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand All @@ -28,7 +29,7 @@ export function registerCloseRoute({ router, lib: { handleEsError } }: RouteDepe
};

try {
await client.asCurrentUser.indices.close(params);
await executeAsyncByChunks(params, client, 'close');
return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand All @@ -22,13 +23,14 @@ export function registerDeleteRoute({ router, lib: { handleEsError } }: RouteDep
const { indices = [] } = request.body as typeof bodySchema.type;

const params = {
expand_wildcards: 'none' as const,
format: 'json',
expand_wildcards: 'none' as const,
index: indices,
};

try {
await client.asCurrentUser.indices.delete(params);
await executeAsyncByChunks(params, client, 'delete');

return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand All @@ -28,7 +29,8 @@ export function registerFlushRoute({ router, lib: { handleEsError } }: RouteDepe
};

try {
await client.asCurrentUser.indices.flush(params);
await executeAsyncByChunks(params, client, 'flush');

return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand Down Expand Up @@ -36,7 +37,8 @@ export function registerForcemergeRoute({ router, lib: { handleEsError } }: Rout
}

try {
await client.asCurrentUser.indices.forcemerge(params);
await executeAsyncByChunks(params, client, 'forcemerge');

return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand All @@ -28,7 +29,8 @@ export function registerOpenRoute({ router, lib: { handleEsError } }: RouteDepen
};

try {
await client.asCurrentUser.indices.open(params);
await executeAsyncByChunks(params, client, 'open');

return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema } from '@kbn/config-schema';

import { RouteDependencies } from '../../../types';
import { addBasePath } from '..';
import { executeAsyncByChunks } from './helpers';

const bodySchema = schema.object({
indices: schema.arrayOf(schema.string()),
Expand All @@ -28,7 +29,8 @@ export function registerRefreshRoute({ router, lib: { handleEsError } }: RouteDe
};

try {
await client.asCurrentUser.indices.refresh(params);
await executeAsyncByChunks(params, client, 'refresh');

return response.ok();
} catch (error) {
return handleEsError({ error, response });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
* 2.0.
*/

import { chunk } from 'lodash';
import { schema } from '@kbn/config-schema';

import { MAX_INDICES_PER_REQUEST } from '../../../../common/constants';
import { RouteDependencies } from '../../../types';
import { fetchIndices } from '../../../lib/fetch_indices';
import { addBasePath } from '..';
Expand All @@ -30,7 +32,27 @@ export function registerReloadRoute({
const { indexNames = [] } = (request.body as typeof bodySchema.type) ?? {};

try {
const indices = await fetchIndices({ client, indexDataEnricher, config, indexNames });
let indices;

// When the number of indices is small, we can execute in a single request
//
// Otherwise we need to split the indices into chunks and execute them in multiple requests because
// if we try to execute an action with too many indices that account for a long string in the request
// ES will throw an error saying that the HTTP line is too large.
if (indexNames.length <= MAX_INDICES_PER_REQUEST) {
indices = await fetchIndices({ client, indexDataEnricher, config, indexNames });
} else {
const chunks = chunk(indexNames, MAX_INDICES_PER_REQUEST);

indices = (
await Promise.all(
chunks.map((indexNamesChunk) =>
fetchIndices({ client, indexDataEnricher, config, indexNames: indexNamesChunk })
)
)
).flat();
}

return response.ok({ body: indices });
} catch (error) {
return handleEsError({ error, response });
Expand Down
Loading

0 comments on commit 0e3351c

Please sign in to comment.