Skip to content

Commit

Permalink
Handle cluster_block_exception during reindexing the TM index
Browse files Browse the repository at this point in the history
  • Loading branch information
ersin-erdal committed Nov 21, 2024
1 parent 7699806 commit 455e9c5
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class KibanaDiscoveryService {
}

private async scheduleUpsertCurrentNode() {
let retryInterval = this.discoveryInterval;
if (!this.stopped) {
const lastSeenDate = new Date();
const lastSeen = lastSeenDate.toISOString();
Expand All @@ -71,18 +72,21 @@ export class KibanaDiscoveryService {
} catch (e) {
if (!this.started) {
this.logger.error(
`Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}`
`Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}`
);
} else {
this.logger.error(
`Kibana Discovery Service couldn't update this node's last_seen timestamp. id: ${this.currentNode}, last_seen: ${lastSeen}, error:${e.message}`
);
}
if (e.message.includes('cluster_block_exception')) {
retryInterval = 60000;
}
} finally {
this.timer = setTimeout(
async () => await this.scheduleUpsertCurrentNode(),
// The timeout should not be less than the default timeout of two seconds
Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string |
return (error as BulkUpdateError).type;
}
}

export function isClusterBlockException(error: Error | BulkUpdateError): boolean {
return getBulkUpdateErrorType(error) === 'cluster_block_exception';
}
137 changes: 85 additions & 52 deletions x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import { isEsCannotExecuteScriptError } from './identify_es_error';
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config';
import { TaskCost } from '../task';
import { getMsearchStatusCode } from './msearch_error';
import { getBulkUpdateStatusCode } from './bulk_update_error';
import { getBulkUpdateStatusCode, isClusterBlockException } from './bulk_update_error';

const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
export const PREFERRED_MAX_POLL_INTERVAL = 60 * 1000;
export const INTERVAL_AFTER_BLOCK_EXCEPTION = 61 * 1000;

// Capacity is measured in number of normal cost tasks that can be run
// At a minimum, we need to be able to run a single task with the greatest cost
Expand Down Expand Up @@ -46,6 +47,11 @@ interface ManagedConfigurationOpts {
logger: Logger;
}

interface ErrorScanResult {
count: number;
isBlockException: boolean;
}

export interface ManagedConfiguration {
startingCapacity: number;
capacityConfiguration$: Observable<number>;
Expand Down Expand Up @@ -77,7 +83,7 @@ export function createManagedConfiguration({
}

function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingCapacity: number) {
return scan((previousCapacity: number, errorCount: number) => {
return scan((previousCapacity: number, { count: errorCount }: ErrorScanResult) => {
let newCapacity: number;
if (errorCount > 0) {
const minCapacity = getMinCapacity(config);
Expand Down Expand Up @@ -112,52 +118,66 @@ function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingC
}

function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
return scan((previousPollInterval: number, errorCount: number) => {
let newPollInterval: number;
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
// Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval,
// whichever is greater.
newPollInterval = Math.min(
Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE),
Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval))
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
} else {
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.max(
startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
return scan(
(previousPollInterval: number, { count: errorCount, isBlockException }: ErrorScanResult) => {
let newPollInterval: number;
if (isBlockException) {
newPollInterval = INTERVAL_AFTER_BLOCK_EXCEPTION;
} else {
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
// Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval,
// whichever is greater.
newPollInterval = Math.min(
Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE),
Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval))
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
} else {
if (previousPollInterval === INTERVAL_AFTER_BLOCK_EXCEPTION) {
previousPollInterval = startingPollInterval;
}
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.max(
startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
}
}
}
if (newPollInterval !== previousPollInterval) {
logger.debug(
`Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" error(s)`
);
if (previousPollInterval === startingPollInterval) {
logger.warn(
`Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" error(s).`

if (newPollInterval !== previousPollInterval) {
logger.debug(
`Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s)`
);
if (previousPollInterval === startingPollInterval) {
logger.warn(
`Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script and/or "cluster_block_exception"" error(s).`
);
}
}
}
return newPollInterval;
}, startingPollInterval);
return newPollInterval;
},
startingPollInterval
);
}

function countErrors(errors$: Observable<Error>, countInterval: number): Observable<number> {
function countErrors(
errors$: Observable<Error>,
countInterval: number
): Observable<ErrorScanResult> {
return merge(
// Flush error count at fixed interval
interval(countInterval).pipe(map(() => FLUSH_MARKER)),
Expand All @@ -173,43 +193,56 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
getMsearchStatusCode(e) === 503 ||
getBulkUpdateStatusCode(e) === 429 ||
getBulkUpdateStatusCode(e) === 500 ||
getBulkUpdateStatusCode(e) === 503
getBulkUpdateStatusCode(e) === 503 ||
isClusterBlockException(e)
)
)
).pipe(
// When tag is "flush", reset the error counter
// Otherwise increment the error counter
mergeScan(({ count }, next) => {
mergeScan(({ count, isBlockException }, next) => {
return next === FLUSH_MARKER
? of(emitErrorCount(count), resetErrorCount())
: of(incementErrorCount(count));
}, emitErrorCount(0)),
? of(emitErrorCount(count, isBlockException), resetErrorCount())
: of(incrementOrEmitErrorCount(count, isClusterBlockException(next as Error)));
}, emitErrorCount(0, false)),
filter(isEmitEvent),
map(({ count }) => count)
map(({ count, isBlockException }) => {
return { count, isBlockException };
})
);
}

function emitErrorCount(count: number) {
function emitErrorCount(count: number, isBlockException: boolean) {
return {
tag: 'emit',
isBlockException,
count,
};
}

function isEmitEvent(event: { tag: string; count: number }) {
function isEmitEvent(event: { tag: string; count: number; isBlockException: boolean }) {
return event.tag === 'emit';
}

function incementErrorCount(count: number) {
function incrementOrEmitErrorCount(count: number, isBlockException: boolean) {
if (isBlockException) {
return {
tag: 'emit',
isBlockException,
count: count + 1,
};
}
return {
tag: 'inc',
isBlockException,
count: count + 1,
};
}

function resetErrorCount() {
return {
tag: 'initial',
isBlockException: false,
count: 0,
};
}
Expand Down

0 comments on commit 455e9c5

Please sign in to comment.