Skip to content

Commit

Permalink
[8.x] Handle cluster_block_exception during reindexing the TM index (#…
Browse files Browse the repository at this point in the history
…201297) (#203609)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Handle cluster_block_exception during reindexing the TM index
(#201297)](#201297)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Ersin
Erdal","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-12-10T15:17:27Z","message":"Handle
cluster_block_exception during reindexing the TM index
(#201297)\n\nResolves:
https://github.com/elastic/response-ops-team/issues/249\r\n\r\nThis PR
increases task claiming interval in case of\r\n`cluster_block_exception`
to avoid generating too many error during TM\r\nindex
reindexing.\r\n\r\n## To verify:\r\n\r\n- Run your local Kibana,\r\n-
Create a user with `kibana_system` and `kibana_admin` roles\r\n- Logout
and login with your new user\r\n- Use below request to put a write block
on TM index.\r\n `PUT /.kibana_task_manager_9.0.0_001/_block/write`\r\n-
Observe the error messages and their occurring interval on
your\r\nterminal.\r\n- Use below request on the Kibana console to halt
write block.\r\n```\r\nPUT
/.kibana_task_manager_9.0.0_001/_settings\r\n{\r\n \"index\": {\r\n
\"blocks.write\": false\r\n
}\r\n}\r\n```","sha":"7aa80ce53027df7ac0e5fc01d206ef38ac3f9575","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:ResponseOps","v9.0.0","backport:prev-minor"],"title":"Handle
cluster_block_exception during reindexing the TM
index","number":201297,"url":"https://github.com/elastic/kibana/pull/201297","mergeCommit":{"message":"Handle
cluster_block_exception during reindexing the TM index
(#201297)\n\nResolves:
https://github.com/elastic/response-ops-team/issues/249\r\n\r\nThis PR
increases task claiming interval in case of\r\n`cluster_block_exception`
to avoid generating too many error during TM\r\nindex
reindexing.\r\n\r\n## To verify:\r\n\r\n- Run your local Kibana,\r\n-
Create a user with `kibana_system` and `kibana_admin` roles\r\n- Logout
and login with your new user\r\n- Use below request to put a write block
on TM index.\r\n `PUT /.kibana_task_manager_9.0.0_001/_block/write`\r\n-
Observe the error messages and their occurring interval on
your\r\nterminal.\r\n- Use below request on the Kibana console to halt
write block.\r\n```\r\nPUT
/.kibana_task_manager_9.0.0_001/_settings\r\n{\r\n \"index\": {\r\n
\"blocks.write\": false\r\n
}\r\n}\r\n```","sha":"7aa80ce53027df7ac0e5fc01d206ef38ac3f9575"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/201297","number":201297,"mergeCommit":{"message":"Handle
cluster_block_exception during reindexing the TM index
(#201297)\n\nResolves:
https://github.com/elastic/response-ops-team/issues/249\r\n\r\nThis PR
increases task claiming interval in case of\r\n`cluster_block_exception`
to avoid generating too many error during TM\r\nindex
reindexing.\r\n\r\n## To verify:\r\n\r\n- Run your local Kibana,\r\n-
Create a user with `kibana_system` and `kibana_admin` roles\r\n- Logout
and login with your new user\r\n- Use below request to put a write block
on TM index.\r\n `PUT /.kibana_task_manager_9.0.0_001/_block/write`\r\n-
Observe the error messages and their occurring interval on
your\r\nterminal.\r\n- Use below request on the Kibana console to halt
write block.\r\n```\r\nPUT
/.kibana_task_manager_9.0.0_001/_settings\r\n{\r\n \"index\": {\r\n
\"blocks.write\": false\r\n
}\r\n}\r\n```","sha":"7aa80ce53027df7ac0e5fc01d206ef38ac3f9575"}}]}]
BACKPORT-->

Co-authored-by: Ersin Erdal <[email protected]>
  • Loading branch information
kibanamachine and ersin-erdal authored Dec 10, 2024
1 parent ebd2ef3 commit af76299
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 91 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const CLAIM_STRATEGY_MGET = 'mget';
export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds
const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second
const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes
export const DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS = 6 * 1000 * 10; // 60 seconds

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s';
const FIVE_MIN_IN_MS = 5 * 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration'
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { coreMock } from '@kbn/core/server/mocks';
import { TaskManagerConfig } from '../config';
import { BulkUpdateError } from '../lib/bulk_update_error';

describe('managed configuration', () => {
let taskManagerStart: TaskManagerStartContract;
Expand Down Expand Up @@ -130,14 +131,41 @@ describe('managed configuration', () => {
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});

test('should increase poll interval when Elasticsearch returns a cluster_block_exception error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
new BulkUpdateError({
statusCode: 403,
message: 'index is blocked',
type: 'cluster_block_exception',
})
);

await expect(
taskManagerStart.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"index is blocked"`);
clock.tick(100000);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 61000 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 61000ms');
});

test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
Expand All @@ -151,10 +179,10 @@ describe('managed configuration', () => {
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';
import {
DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
DEFAULT_DISCOVERY_INTERVAL_MS,
DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS,
} from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -199,6 +203,49 @@ describe('KibanaDiscoveryService', () => {
);
});

it('reschedules discovery job in case of cluster_block_exception', async () => {
savedObjectsRepository.update.mockResolvedValueOnce(
{} as SavedObjectsUpdateResponse<unknown>
);

const kibanaDiscoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

expect(kibanaDiscoveryService.isStarted()).toBe(true);
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

savedObjectsRepository.update.mockRejectedValueOnce(
new Error('failed due to cluster_block_exception, task_manager index')
);

await jest.advanceTimersByTimeAsync(15000);

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenNthCalledWith(
2,
expect.any(Function),
DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS
);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
"Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:failed due to cluster_block_exception, task_manager index"
);
});

it('does not schedule when Kibana is shutting down', async () => {
savedObjectsRepository.update.mockResolvedValueOnce(
{} as SavedObjectsUpdateResponse<unknown>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import type { ISavedObjectsRepository } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { TaskManagerConfig } from '../config';
import { DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS, TaskManagerConfig } from '../config';
import { isClusterBlockException } from '../lib/bulk_update_error';

interface DiscoveryServiceParams {
config: TaskManagerConfig['discovery'];
Expand Down Expand Up @@ -59,6 +60,7 @@ export class KibanaDiscoveryService {
}

private async scheduleUpsertCurrentNode() {
let retryInterval = this.discoveryInterval;
if (!this.stopped) {
const lastSeenDate = new Date();
const lastSeen = lastSeenDate.toISOString();
Expand All @@ -69,9 +71,12 @@ export class KibanaDiscoveryService {
this.started = true;
}
} catch (e) {
if (isClusterBlockException(e)) {
retryInterval = DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS;
}
if (!this.started) {
this.logger.error(
`Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}`
`Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}`
);
} else {
this.logger.error(
Expand All @@ -82,7 +87,7 @@ export class KibanaDiscoveryService {
this.timer = setTimeout(
async () => await this.scheduleUpsertCurrentNode(),
// The timeout should not be less than the default timeout of two seconds
Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
);
}
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string |
return (error as BulkUpdateError).type;
}
}

export function isClusterBlockException(error: Error | BulkUpdateError): boolean {
return (
getBulkUpdateErrorType(error) === 'cluster_block_exception' ||
error.message.includes('cluster_block_exception')
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
INTERVAL_AFTER_BLOCK_EXCEPTION,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
Expand Down Expand Up @@ -420,12 +421,29 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should increase configuration at the next interval when an error with cluster_block_exception type is emitted, then decreases back to normal', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(
new BulkUpdateError({
statusCode: 403,
message: 'index is blocked',
type: 'cluster_block_exception',
})
);
expect(subscription).toHaveBeenNthCalledWith(1, 100);
// It emits the error with cluster_block_exception type immediately
expect(subscription).toHaveBeenNthCalledWith(2, INTERVAL_AFTER_BLOCK_EXCEPTION);
clock.tick(INTERVAL_AFTER_BLOCK_EXCEPTION);
expect(subscription).toHaveBeenCalledTimes(3);
expect(subscription).toHaveBeenNthCalledWith(3, 100);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
});

Expand Down
Loading

0 comments on commit af76299

Please sign in to comment.