From 12597675986b9bad17fad9aa8e75a0dd8f6995f7 Mon Sep 17 00:00:00 2001 From: Ersin Erdal <92688503+ersin-erdal@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:17:27 +0100 Subject: [PATCH] Handle cluster_block_exception during reindexing the TM index (#201297) Resolves: https://github.com/elastic/response-ops-team/issues/249 This PR increases task claiming interval in case of `cluster_block_exception` to avoid generating too many error during TM index reindexing. ## To verify: - Run your local Kibana, - Create a user with `kibana_system` and `kibana_admin` roles - Logout and login with your new user - Use below request to put a write block on TM index. `PUT /.kibana_task_manager_9.0.0_001/_block/write` - Observe the error messages and their occurring interval on your terminal. - Use below request on the Kibana console to halt write block. ``` PUT /.kibana_task_manager_9.0.0_001/_settings { "index": { "blocks.write": false } } ``` --- x-pack/plugins/task_manager/server/config.ts | 1 + .../managed_configuration.test.ts | 36 ++- .../kibana_discovery_service.test.ts | 49 ++++- .../kibana_discovery_service.ts | 11 +- .../server/lib/bulk_update_error.ts | 7 + .../lib/create_managed_configuration.test.ts | 20 +- .../lib/create_managed_configuration.ts | 208 +++++++++++------- 7 files changed, 241 insertions(+), 91 deletions(-) diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index 002f18380a747..92d7429501cd9 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -36,6 +36,7 @@ export const CLAIM_STRATEGY_MGET = 'mget'; export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes +export const DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS = 6 * 1000 * 10; // 60 seconds export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s'; const FIVE_MIN_IN_MS = 5 * 60 * 1000; diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index ab1d1bc0498fd..d048cc4063fa2 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -13,6 +13,7 @@ import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration' import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin'; import { coreMock } from '@kbn/core/server/mocks'; import { TaskManagerConfig } from '../config'; +import { BulkUpdateError } from '../lib/bulk_update_error'; describe('managed configuration', () => { let taskManagerStart: TaskManagerStartContract; @@ -130,14 +131,41 @@ describe('managed configuration', () => { clock.tick(ADJUST_THROUGHPUT_INTERVAL); expect(logger.warn).toHaveBeenCalledWith( - 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).' + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' ); expect(logger.debug).toHaveBeenCalledWith( - 'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)' + 'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' ); expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms'); }); + test('should increase poll interval when Elasticsearch returns a cluster_block_exception error', async () => { + savedObjectsClient.create.mockRejectedValueOnce( + new BulkUpdateError({ + statusCode: 403, + message: 'index is blocked', + type: 'cluster_block_exception', + }) + ); + + await expect( + taskManagerStart.schedule({ + taskType: 'foo', + state: {}, + params: {}, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"index is blocked"`); + clock.tick(100000); + + expect(logger.warn).toHaveBeenCalledWith( + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' + ); + expect(logger.debug).toHaveBeenCalledWith( + 'Poll interval configuration changing from 3000 to 61000 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' + ); + expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 61000ms'); + }); + test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => { const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked; childEsClient.search.mockImplementationOnce(async () => { @@ -151,10 +179,10 @@ describe('managed configuration', () => { clock.tick(ADJUST_THROUGHPUT_INTERVAL); expect(logger.warn).toHaveBeenCalledWith( - 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).' + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' ); expect(logger.debug).toHaveBeenCalledWith( - 'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" error(s)' + 'Poll interval configuration changing from 3000 to 3600 after seeing 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' ); expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms'); }); diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts index 3a4870b6a4763..beb686c8ea4ba 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts @@ -10,7 +10,11 @@ import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects'; import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server'; import { createFindResponse, createFindSO } from './mock_kibana_discovery_service'; -import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config'; +import { + DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, + DEFAULT_DISCOVERY_INTERVAL_MS, + DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS, +} from '../config'; const currentNode = 'current-node-id'; const now = '2024-08-10T10:00:00.000Z'; @@ -199,6 +203,49 @@ describe('KibanaDiscoveryService', () => { ); }); + it('reschedules discovery job in case of cluster_block_exception', async () => { + savedObjectsRepository.update.mockResolvedValueOnce( + {} as SavedObjectsUpdateResponse + ); + + const kibanaDiscoveryService = new KibanaDiscoveryService({ + savedObjectsRepository, + logger, + currentNode, + config: { + active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, + interval: DEFAULT_DISCOVERY_INTERVAL_MS, + }, + }); + await kibanaDiscoveryService.start(); + + expect(kibanaDiscoveryService.isStarted()).toBe(true); + expect(setTimeout).toHaveBeenCalledTimes(1); + expect(setTimeout).toHaveBeenNthCalledWith( + 1, + expect.any(Function), + DEFAULT_DISCOVERY_INTERVAL_MS + ); + + savedObjectsRepository.update.mockRejectedValueOnce( + new Error('failed due to cluster_block_exception, task_manager index') + ); + + await jest.advanceTimersByTimeAsync(15000); + + expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2); + expect(setTimeout).toHaveBeenCalledTimes(2); + expect(setTimeout).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS + ); + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith( + "Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:failed due to cluster_block_exception, task_manager index" + ); + }); + it('does not schedule when Kibana is shutting down', async () => { savedObjectsRepository.update.mockResolvedValueOnce( {} as SavedObjectsUpdateResponse diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts index 1c4fcb00981a0..fbd797355d1e8 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts @@ -9,7 +9,8 @@ import type { ISavedObjectsRepository } from '@kbn/core/server'; import { Logger } from '@kbn/core/server'; import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects'; import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node'; -import { TaskManagerConfig } from '../config'; +import { DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS, TaskManagerConfig } from '../config'; +import { isClusterBlockException } from '../lib/bulk_update_error'; interface DiscoveryServiceParams { config: TaskManagerConfig['discovery']; @@ -59,6 +60,7 @@ export class KibanaDiscoveryService { } private async scheduleUpsertCurrentNode() { + let retryInterval = this.discoveryInterval; if (!this.stopped) { const lastSeenDate = new Date(); const lastSeen = lastSeenDate.toISOString(); @@ -69,9 +71,12 @@ export class KibanaDiscoveryService { this.started = true; } } catch (e) { + if (isClusterBlockException(e)) { + retryInterval = DISCOVERY_INTERVAL_AFTER_BLOCK_EXCEPTION_MS; + } if (!this.started) { this.logger.error( - `Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}` + `Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}` ); } else { this.logger.error( @@ -82,7 +87,7 @@ export class KibanaDiscoveryService { this.timer = setTimeout( async () => await this.scheduleUpsertCurrentNode(), // The timeout should not be less than the default timeout of two seconds - Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT) + Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT) ); } } diff --git a/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts b/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts index f7e0552e5a738..822980f4f9466 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts @@ -43,3 +43,10 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string | return (error as BulkUpdateError).type; } } + +export function isClusterBlockException(error: Error | BulkUpdateError): boolean { + return ( + getBulkUpdateErrorType(error) === 'cluster_block_exception' || + error.message.includes('cluster_block_exception') + ); +} diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts index d453edc8e7003..cd13ac20026ed 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -11,6 +11,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { createManagedConfiguration, ADJUST_THROUGHPUT_INTERVAL, + INTERVAL_AFTER_BLOCK_EXCEPTION, } from './create_managed_configuration'; import { mockLogger } from '../test_utils'; import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config'; @@ -420,12 +421,29 @@ describe('createManagedConfiguration()', () => { expect(subscription).toHaveBeenNthCalledWith(2, 120); }); + test('should increase configuration at the next interval when an error with cluster_block_exception type is emitted, then decreases back to normal', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next( + new BulkUpdateError({ + statusCode: 403, + message: 'index is blocked', + type: 'cluster_block_exception', + }) + ); + expect(subscription).toHaveBeenNthCalledWith(1, 100); + // It emits the error with cluster_block_exception type immediately + expect(subscription).toHaveBeenNthCalledWith(2, INTERVAL_AFTER_BLOCK_EXCEPTION); + clock.tick(INTERVAL_AFTER_BLOCK_EXCEPTION); + expect(subscription).toHaveBeenCalledTimes(3); + expect(subscription).toHaveBeenNthCalledWith(3, 100); + }); + test('should log a warning when the configuration changes from the starting value', async () => { const { errors$ } = setupScenario(100); errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); clock.tick(ADJUST_THROUGHPUT_INTERVAL); expect(logger.warn).toHaveBeenCalledWith( - 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).' + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).' ); }); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts index 00736f2c36cdb..2105f29e9c617 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -13,11 +13,12 @@ import { isEsCannotExecuteScriptError } from './identify_es_error'; import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config'; import { TaskCost } from '../task'; import { getMsearchStatusCode } from './msearch_error'; -import { getBulkUpdateStatusCode } from './bulk_update_error'; +import { getBulkUpdateStatusCode, isClusterBlockException } from './bulk_update_error'; const FLUSH_MARKER = Symbol('flush'); export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000; export const PREFERRED_MAX_POLL_INTERVAL = 60 * 1000; +export const INTERVAL_AFTER_BLOCK_EXCEPTION = 61 * 1000; // Capacity is measured in number of normal cost tasks that can be run // At a minimum, we need to be able to run a single task with the greatest cost @@ -46,6 +47,11 @@ interface ManagedConfigurationOpts { logger: Logger; } +interface ErrorScanResult { + count: number; + isBlockException: boolean; +} + export interface ManagedConfiguration { startingCapacity: number; capacityConfiguration$: Observable; @@ -77,87 +83,112 @@ export function createManagedConfiguration({ } function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingCapacity: number) { - return scan((previousCapacity: number, errorCount: number) => { - let newCapacity: number; - if (errorCount > 0) { - const minCapacity = getMinCapacity(config); - // Decrease capacity by CAPACITY_DECREASE_PERCENTAGE while making sure it doesn't go lower than minCapacity. - // Using Math.floor to make sure the number is different than previous while not being a decimal value. - newCapacity = Math.max( - Math.floor(previousCapacity * CAPACITY_DECREASE_PERCENTAGE), - minCapacity - ); - } else { - // Increase capacity by CAPACITY_INCREASE_PERCENTAGE while making sure it doesn't go - // higher than the starting value. Using Math.ceil to make sure the number is different than - // previous while not being a decimal value - newCapacity = Math.min( - startingCapacity, - Math.ceil(previousCapacity * CAPACITY_INCREASE_PERCENTAGE) - ); - } - - if (newCapacity !== previousCapacity) { - logger.debug( - `Capacity configuration changing from ${previousCapacity} to ${newCapacity} after seeing ${errorCount} "too many request" and/or "execute [inline] script" error(s)` - ); - if (previousCapacity === startingCapacity) { - logger.warn( - `Capacity configuration is temporarily reduced after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" error(s).` + return scan( + (previousCapacity: number, { count: errorCount, isBlockException }: ErrorScanResult) => { + let newCapacity: number; + if (isBlockException) { + newCapacity = previousCapacity; + } else { + if (errorCount > 0) { + const minCapacity = getMinCapacity(config); + // Decrease capacity by CAPACITY_DECREASE_PERCENTAGE while making sure it doesn't go lower than minCapacity. + // Using Math.floor to make sure the number is different than previous while not being a decimal value. + newCapacity = Math.max( + Math.floor(previousCapacity * CAPACITY_DECREASE_PERCENTAGE), + minCapacity + ); + } else { + // Increase capacity by CAPACITY_INCREASE_PERCENTAGE while making sure it doesn't go + // higher than the starting value. Using Math.ceil to make sure the number is different than + // previous while not being a decimal value + newCapacity = Math.min( + startingCapacity, + Math.ceil(previousCapacity * CAPACITY_INCREASE_PERCENTAGE) + ); + } + } + + if (newCapacity !== previousCapacity) { + logger.debug( + `Capacity configuration changing from ${previousCapacity} to ${newCapacity} after seeing ${errorCount} "too many request" and/or "execute [inline] script" error(s)` ); + if (previousCapacity === startingCapacity) { + logger.warn( + `Capacity configuration is temporarily reduced after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" error(s).` + ); + } } - } - return newCapacity; - }, startingCapacity); + return newCapacity; + }, + startingCapacity + ); } function createPollIntervalScan(logger: Logger, startingPollInterval: number) { - return scan((previousPollInterval: number, errorCount: number) => { - let newPollInterval: number; - if (errorCount > 0) { - // Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to - // make sure the number is different than previous while not being a decimal value. - // Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval, - // whichever is greater. - newPollInterval = Math.min( - Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE), - Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval)) - ); - if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { - logger.error( - `Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` - ); - newPollInterval = previousPollInterval; - } - } else { - // Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to - // make sure the number is different than previous while not being a decimal value. - newPollInterval = Math.max( - startingPollInterval, - Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE) - ); - if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { - logger.error( - `Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` - ); - newPollInterval = previousPollInterval; + return scan( + (previousPollInterval: number, { count: errorCount, isBlockException }: ErrorScanResult) => { + let newPollInterval: number; + if (isBlockException) { + newPollInterval = INTERVAL_AFTER_BLOCK_EXCEPTION; + } else { + if (errorCount > 0) { + // Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to + // make sure the number is different than previous while not being a decimal value. + // Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval, + // whichever is greater. + newPollInterval = Math.min( + Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE), + Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval)) + ); + if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { + logger.error( + `Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` + ); + newPollInterval = previousPollInterval; + } + } else { + if (previousPollInterval === INTERVAL_AFTER_BLOCK_EXCEPTION) { + newPollInterval = startingPollInterval; + } else { + // Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to + // make sure the number is different than previous while not being a decimal value. + newPollInterval = Math.max( + startingPollInterval, + Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE) + ); + } + + if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { + logger.error( + `Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` + ); + newPollInterval = previousPollInterval; + } + } } - } - if (newPollInterval !== previousPollInterval) { - logger.debug( - `Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" error(s)` - ); - if (previousPollInterval === startingPollInterval) { - logger.warn( - `Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" error(s).` - ); + + if (newPollInterval !== previousPollInterval) { + if (previousPollInterval !== INTERVAL_AFTER_BLOCK_EXCEPTION) { + logger.debug( + `Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).` + ); + } + if (previousPollInterval === startingPollInterval) { + logger.warn( + `Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s).` + ); + } } - } - return newPollInterval; - }, startingPollInterval); + return newPollInterval; + }, + startingPollInterval + ); } -function countErrors(errors$: Observable, countInterval: number): Observable { +function countErrors( + errors$: Observable, + countInterval: number +): Observable { return merge( // Flush error count at fixed interval interval(countInterval).pipe(map(() => FLUSH_MARKER)), @@ -173,36 +204,48 @@ function countErrors(errors$: Observable, countInterval: number): Observa getMsearchStatusCode(e) === 503 || getBulkUpdateStatusCode(e) === 429 || getBulkUpdateStatusCode(e) === 500 || - getBulkUpdateStatusCode(e) === 503 + getBulkUpdateStatusCode(e) === 503 || + isClusterBlockException(e) ) ) ).pipe( // When tag is "flush", reset the error counter // Otherwise increment the error counter - mergeScan(({ count }, next) => { + mergeScan(({ count, isBlockException }, next) => { return next === FLUSH_MARKER - ? of(emitErrorCount(count), resetErrorCount()) - : of(incementErrorCount(count)); - }, emitErrorCount(0)), + ? of(emitErrorCount(count, isBlockException), resetErrorCount()) + : of(incrementOrEmitErrorCount(count, isClusterBlockException(next as Error))); + }, emitErrorCount(0, false)), filter(isEmitEvent), - map(({ count }) => count) + map(({ count, isBlockException }) => { + return { count, isBlockException }; + }) ); } -function emitErrorCount(count: number) { +function emitErrorCount(count: number, isBlockException: boolean) { return { tag: 'emit', + isBlockException, count, }; } -function isEmitEvent(event: { tag: string; count: number }) { +function isEmitEvent(event: { tag: string; count: number; isBlockException: boolean }) { return event.tag === 'emit'; } -function incementErrorCount(count: number) { +function incrementOrEmitErrorCount(count: number, isBlockException: boolean) { + if (isBlockException) { + return { + tag: 'emit', + isBlockException, + count: count + 1, + }; + } return { tag: 'inc', + isBlockException, count: count + 1, }; } @@ -210,6 +253,7 @@ function incementErrorCount(count: number) { function resetErrorCount() { return { tag: 'initial', + isBlockException: false, count: 0, }; }