Skip to content

Commit

Permalink
Handle cluster_block_exception during reindexing the TM index (#201297)
Browse files Browse the repository at this point in the history
Resolves: elastic/response-ops-team#249

This PR increases task claiming interval in case of
`cluster_block_exception` to avoid generating too many error during TM
index reindexing.

## To verify:

- Run your local Kibana,
- Create a user with `kibana_system` and `kibana_admin` roles
- Logout and login with your new user
- Use below request to put a write block on TM index.
   `PUT /.kibana_task_manager_9.0.0_001/_block/write`
- Observe the error messages and their occurring interval on your
terminal.
- Use below request on the Kibana console to halt write block.
```
PUT /.kibana_task_manager_9.0.0_001/_settings
{
  "index": {
    "blocks.write": false
  }
}
```
  • Loading branch information
ersin-erdal authored Dec 10, 2024
1 parent bd70664 commit 7aa80ce
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 91 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const CLAIM_STRATEGY_MGET = 'mget';
export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds
const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second
const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes
export const DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS = 6 * 1000 * 10; // 60 seconds

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s';
const FIVE_MIN_IN_MS = 5 * 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration'
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { coreMock } from '@kbn/core/server/mocks';
import { TaskManagerConfig } from '../config';
import { BulkUpdateError } from '../lib/bulk_update_error';

describe('managed configuration', () => {
let taskManagerStart: TaskManagerStartContract;
Expand Down Expand Up @@ -130,14 +131,41 @@ describe('managed configuration', () => {
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});

test('should increase poll interval when Elasticsearch returns a cluster_block_exception error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
new BulkUpdateError({
statusCode: 403,
message: 'index is blocked',
type: 'cluster_block_exception',
})
);

await expect(
taskManagerStart.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"index is blocked"`);
clock.tick(100000);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 61000 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 61000ms');
});

test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
Expand All @@ -151,10 +179,10 @@ describe('managed configuration', () => {
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';
import {
DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
DEFAULT_DISCOVERY_INTERVAL_MS,
DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS,
} from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -199,6 +203,49 @@ describe('KibanaDiscoveryService', () => {
);
});

it('reschedules discovery job in case of cluster_block_exception', async () => {
savedObjectsRepository.update.mockResolvedValueOnce(
{} as SavedObjectsUpdateResponse<unknown>
);

const kibanaDiscoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

expect(kibanaDiscoveryService.isStarted()).toBe(true);
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

savedObjectsRepository.update.mockRejectedValueOnce(
new Error('failed due to cluster_block_exception, task_manager index')
);

await jest.advanceTimersByTimeAsync(15000);

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenNthCalledWith(
2,
expect.any(Function),
DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS
);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
"Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:failed due to cluster_block_exception, task_manager index"
);
});

it('does not schedule when Kibana is shutting down', async () => {
savedObjectsRepository.update.mockResolvedValueOnce(
{} as SavedObjectsUpdateResponse<unknown>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import type { ISavedObjectsRepository } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { TaskManagerConfig } from '../config';
import { DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS, TaskManagerConfig } from '../config';
import { isClusterBlockException } from '../lib/bulk_update_error';

interface DiscoveryServiceParams {
config: TaskManagerConfig['discovery'];
Expand Down Expand Up @@ -59,6 +60,7 @@ export class KibanaDiscoveryService {
}

private async scheduleUpsertCurrentNode() {
let retryInterval = this.discoveryInterval;
if (!this.stopped) {
const lastSeenDate = new Date();
const lastSeen = lastSeenDate.toISOString();
Expand All @@ -69,9 +71,12 @@ export class KibanaDiscoveryService {
this.started = true;
}
} catch (e) {
if (isClusterBlockException(e)) {
retryInterval = DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS;
}
if (!this.started) {
this.logger.error(
`Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}`
`Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}`
);
} else {
this.logger.error(
Expand All @@ -82,7 +87,7 @@ export class KibanaDiscoveryService {
this.timer = setTimeout(
async () => await this.scheduleUpsertCurrentNode(),
// The timeout should not be less than the default timeout of two seconds
Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
);
}
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string |
return (error as BulkUpdateError).type;
}
}

export function isClusterBlockException(error: Error | BulkUpdateError): boolean {
return (
getBulkUpdateErrorType(error) === 'cluster_block_exception' ||
error.message.includes('cluster_block_exception')
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
INTERVAL_AFTER_BLOCK_EXCEPTION,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
Expand Down Expand Up @@ -420,12 +421,29 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should increase configuration at the next interval when an error with cluster_block_exception type is emitted, then decreases back to normal', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(
new BulkUpdateError({
statusCode: 403,
message: 'index is blocked',
type: 'cluster_block_exception',
})
);
expect(subscription).toHaveBeenNthCalledWith(1, 100);
// It emits the error with cluster_block_exception type immediately
expect(subscription).toHaveBeenNthCalledWith(2, INTERVAL_AFTER_BLOCK_EXCEPTION);
clock.tick(INTERVAL_AFTER_BLOCK_EXCEPTION);
expect(subscription).toHaveBeenCalledTimes(3);
expect(subscription).toHaveBeenNthCalledWith(3, 100);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
});

Expand Down
Loading

0 comments on commit 7aa80ce

Please sign in to comment.