Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle cluster_block_exception during reindexing the TM index #201297

Merged
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const CLAIM_STRATEGY_MGET = 'mget';
export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds
const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second
const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes
export const DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS = 6 * 1000 * 10; // 60 seconds

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s';
const FIVE_MIN_IN_MS = 5 * 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration'
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { coreMock } from '@kbn/core/server/mocks';
import { TaskManagerConfig } from '../config';
import { BulkUpdateError } from '../lib/bulk_update_error';

describe('managed configuration', () => {
let taskManagerStart: TaskManagerStartContract;
Expand Down Expand Up @@ -130,14 +131,41 @@ describe('managed configuration', () => {
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});

test('should increase poll interval when Elasticsearch returns a cluster_block_exception error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
new BulkUpdateError({
statusCode: 403,
message: 'index is blocked',
type: 'cluster_block_exception',
})
);

await expect(
taskManagerStart.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"index is blocked"`);
clock.tick(100000);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 61000 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 61000ms');
});

test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
Expand All @@ -151,10 +179,10 @@ describe('managed configuration', () => {
clock.tick(ADJUST_THROUGHPUT_INTERVAL);

expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)'
'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';
import {
DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
DEFAULT_DISCOVERY_INTERVAL_MS,
DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS,
} from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -199,6 +203,49 @@ describe('KibanaDiscoveryService', () => {
);
});

it('reschedules discovery job in case of cluster_block_exception', async () => {
savedObjectsRepository.update.mockResolvedValueOnce(
{} as SavedObjectsUpdateResponse<unknown>
);

const kibanaDiscoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

expect(kibanaDiscoveryService.isStarted()).toBe(true);
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

savedObjectsRepository.update.mockRejectedValueOnce(
new Error('failed due to cluster_block_exception, task_manager index')
);

await jest.advanceTimersByTimeAsync(15000);

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenNthCalledWith(
2,
expect.any(Function),
DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS
);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
"Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:failed due to cluster_block_exception, task_manager index"
);
});

it('does not schedule when Kibana is shutting down', async () => {
savedObjectsRepository.update.mockResolvedValueOnce(
{} as SavedObjectsUpdateResponse<unknown>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import type { ISavedObjectsRepository } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { TaskManagerConfig } from '../config';
import { DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS, TaskManagerConfig } from '../config';
import { isClusterBlockException } from '../lib/bulk_update_error';

interface DiscoveryServiceParams {
config: TaskManagerConfig['discovery'];
Expand Down Expand Up @@ -59,6 +60,7 @@ export class KibanaDiscoveryService {
}

private async scheduleUpsertCurrentNode() {
let retryInterval = this.discoveryInterval;
if (!this.stopped) {
const lastSeenDate = new Date();
const lastSeen = lastSeenDate.toISOString();
Expand All @@ -69,9 +71,12 @@ export class KibanaDiscoveryService {
this.started = true;
}
} catch (e) {
if (isClusterBlockException(e)) {
retryInterval = DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS;
}
if (!this.started) {
this.logger.error(
`Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}`
`Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}`
);
} else {
this.logger.error(
Expand All @@ -82,7 +87,7 @@ export class KibanaDiscoveryService {
this.timer = setTimeout(
async () => await this.scheduleUpsertCurrentNode(),
// The timeout should not be less than the default timeout of two seconds
Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
);
}
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string |
return (error as BulkUpdateError).type;
}
}

export function isClusterBlockException(error: Error | BulkUpdateError): boolean {
return (
getBulkUpdateErrorType(error) === 'cluster_block_exception' ||
error.message.includes('cluster_block_exception')
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
INTERVAL_AFTER_BLOCK_EXCEPTION,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
Expand Down Expand Up @@ -420,12 +421,29 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should increase configuration at the next interval when an error with cluster_block_exception type is emitted, then decreases back to normal', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(
new BulkUpdateError({
statusCode: 403,
message: 'index is blocked',
type: 'cluster_block_exception',
})
);
expect(subscription).toHaveBeenNthCalledWith(1, 100);
// It emits the error with cluster_block_exception type immediately
expect(subscription).toHaveBeenNthCalledWith(2, INTERVAL_AFTER_BLOCK_EXCEPTION);
clock.tick(INTERVAL_AFTER_BLOCK_EXCEPTION);
expect(subscription).toHaveBeenCalledTimes(3);
expect(subscription).toHaveBeenNthCalledWith(3, 100);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).'
);
});

Expand Down
Loading
Loading