From 3b8cf1236b1b6ba67862f35f47fcb250d88ac4c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Mon, 21 Oct 2024 09:02:59 -0400 Subject: [PATCH] Improve task manager functional tests in preperation for mget task claimer being the default (#196399) Resolves https://github.com/elastic/kibana/issues/184942 Resolves https://github.com/elastic/kibana/issues/192023 Resolves https://github.com/elastic/kibana/issues/195573 In this PR, I'm improving the flakiness found in our functional tests in preperation for mget being the default task claimer that all these tests run with (https://github.com/elastic/kibana/issues/194625). Because the mget task claimer works differently and also polls more frequently, we end-up in situations where tasks run faster than they were with update_by_query, creating more race conditions that are now fixed in this PR. Issues were surfaced via https://github.com/elastic/kibana/pull/190148 where I set `mget` as the default task claiming strategy. Flaky test runs (some of these failed on other tests that are flaky): - https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7151 - https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7169 - https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7172 - https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7175 - https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7176 - https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7185 (for https://github.com/elastic/kibana/pull/196399/commits/0fcf1ae68927277a8f544278903edbf5912a1649) --- .../actions/server/lib/retry_if_conflicts.ts | 2 +- .../alerting/server/lib/retry_if_conflicts.ts | 2 +- .../server/task_claimers/strategy_mget.ts | 5 +- .../plugins/alerts/server/action_types.ts | 1 + .../packages/helpers/es_test_index_tool.ts | 17 ++++- .../group1/tests/alerting/backfill/api_key.ts | 2 +- .../group4/tests/alerting/alerts.ts | 8 +-- .../tests/alerting/group1/event_log.ts | 72 ++++++++++++++----- .../alerts_as_data/alerts_as_data_flapping.ts | 12 +++- .../builtin_alert_types/long_running/rule.ts | 12 ++-- .../tests/alerting/group4/notify_when.ts | 6 +- .../test_suites/task_manager/metrics_route.ts | 27 ++++--- .../task_manager/task_management.ts | 2 +- .../task_management_removed_types.ts | 14 +++- .../server/init_routes.ts | 1 + .../test_suites/task_manager/health_route.ts | 14 ++-- .../task_manager/task_management.ts | 15 ++-- .../task_management_removed_types.ts | 22 +++++- 18 files changed, 174 insertions(+), 60 deletions(-) diff --git a/x-pack/plugins/actions/server/lib/retry_if_conflicts.ts b/x-pack/plugins/actions/server/lib/retry_if_conflicts.ts index 4778e1ced1013..bae6bd95a682f 100644 --- a/x-pack/plugins/actions/server/lib/retry_if_conflicts.ts +++ b/x-pack/plugins/actions/server/lib/retry_if_conflicts.ts @@ -20,7 +20,7 @@ export const RetryForConflictsAttempts = 2; // note: we considered making this random, to help avoid a stampede, but // with 1 retry it probably doesn't matter, and adding randomness could // make it harder to diagnose issues -const RetryForConflictsDelay = 250; +const RetryForConflictsDelay = 100; // retry an operation if it runs into 409 Conflict's, up to a limit export async function retryIfConflicts( diff --git a/x-pack/plugins/alerting/server/lib/retry_if_conflicts.ts b/x-pack/plugins/alerting/server/lib/retry_if_conflicts.ts index 8b7a3b5f76c8a..0f9377e83ee6c 100644 --- a/x-pack/plugins/alerting/server/lib/retry_if_conflicts.ts +++ b/x-pack/plugins/alerting/server/lib/retry_if_conflicts.ts @@ -22,7 +22,7 @@ export const RetryForConflictsAttempts = 2; // note: we considered making this random, to help avoid a stampede, but // with 1 retry it probably doesn't matter, and adding randomness could // make it harder to diagnose issues -const RetryForConflictsDelay = 250; +const RetryForConflictsDelay = 100; // retry an operation if it runs into 409 Conflict's, up to a limit export async function retryIfConflicts( diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index aa69742998c74..407cf6b90dd6c 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -357,7 +357,10 @@ async function searchAvailableTasks({ // Task must be enabled EnabledTask, // a task type that's not excluded (may be removed or not) - OneOfTaskTypes('task.taskType', claimPartitions.unlimitedTypes), + OneOfTaskTypes( + 'task.taskType', + claimPartitions.unlimitedTypes.concat(Array.from(removedTypes)) + ), // Either a task with idle status and runAt <= now or // status running or claiming with a retryAt <= now. shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/action_types.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/action_types.ts index 8d5caf79a4c89..bf69461b0382a 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/action_types.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/action_types.ts @@ -132,6 +132,7 @@ function getIndexRecordActionType() { secrets, reference: params.reference, source: 'action:test.index-record', + '@timestamp': new Date(), }, }); return { status: 'ok', actionId }; diff --git a/x-pack/test/alerting_api_integration/packages/helpers/es_test_index_tool.ts b/x-pack/test/alerting_api_integration/packages/helpers/es_test_index_tool.ts index 1a84915a5c935..b0de8872e177d 100644 --- a/x-pack/test/alerting_api_integration/packages/helpers/es_test_index_tool.ts +++ b/x-pack/test/alerting_api_integration/packages/helpers/es_test_index_tool.ts @@ -4,6 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import { omit } from 'lodash'; import type { Client } from '@elastic/elasticsearch'; import { DeleteByQueryRequest } from '@elastic/elasticsearch/lib/api/types'; @@ -61,6 +62,9 @@ export class ESTestIndexTool { group: { type: 'keyword', }, + '@timestamp': { + type: 'date', + }, host: { properties: { hostname: { @@ -109,6 +113,7 @@ export class ESTestIndexTool { async search(source: string, reference?: string) { const body = reference ? { + sort: [{ '@timestamp': 'asc' }], query: { bool: { must: [ @@ -127,6 +132,7 @@ export class ESTestIndexTool { }, } : { + sort: [{ '@timestamp': 'asc' }], query: { term: { source, @@ -138,7 +144,16 @@ export class ESTestIndexTool { size: 1000, body, }; - return await this.es.search(params, { meta: true }); + const result = await this.es.search(params, { meta: true }); + result.body.hits.hits = result.body.hits.hits.map((hit) => { + return { + ...hit, + // Easier to remove @timestamp than to have all the downstream code ignore it + // in their assertions + _source: omit(hit._source as Record, '@timestamp'), + }; + }); + return result; } async getAll(size: number = 10) { diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/backfill/api_key.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/backfill/api_key.ts index bbb97281b82b1..46a92d176bab0 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/backfill/api_key.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/backfill/api_key.ts @@ -125,7 +125,7 @@ export default function apiKeyBackfillTests({ getService }: FtrProviderContext) } it('should wait to invalidate API key until backfill for rule is complete', async () => { - const start = moment().utc().startOf('day').subtract(7, 'days').toISOString(); + const start = moment().utc().startOf('day').subtract(13, 'days').toISOString(); const end = moment().utc().startOf('day').subtract(4, 'day').toISOString(); const spaceId = SuperuserAtSpace1.space.id; diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group4/tests/alerting/alerts.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group4/tests/alerting/alerts.ts index 9d3cac9ef9a6d..78213729efdf8 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group4/tests/alerting/alerts.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group4/tests/alerting/alerts.ts @@ -1184,7 +1184,7 @@ instanceStateValue: true reference, overwrites: { enabled: false, - schedule: { interval: '1s' }, + schedule: { interval: '1m' }, }, }); @@ -1288,7 +1288,7 @@ instanceStateValue: true ); // @ts-expect-error doesnt handle total: number - expect(searchResult.body.hits.total.value).to.eql(1); + expect(searchResult.body.hits.total.value).to.be.greaterThan(0); // @ts-expect-error _source: unknown expect(searchResult.body.hits.hits[0]._source.params.message).to.eql( 'Alerts, all:2, new:2 IDs:[1,2,], ongoing:0 IDs:[], recovered:0 IDs:[]' @@ -1304,7 +1304,7 @@ instanceStateValue: true const response = await alertUtils.createAlwaysFiringRuleWithSummaryAction({ reference, overwrites: { - schedule: { interval: '1s' }, + schedule: { interval: '1h' }, }, notifyWhen: 'onActiveAlert', throttle: null, @@ -1435,7 +1435,7 @@ instanceStateValue: true const response = await alertUtils.createAlwaysFiringRuleWithSummaryAction({ reference, overwrites: { - schedule: { interval: '1s' }, + schedule: { interval: '3s' }, }, notifyWhen: 'onThrottleInterval', throttle: '10s', diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/event_log.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/event_log.ts index e25d64e509101..e3023a0d6c8f7 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/event_log.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/event_log.ts @@ -82,8 +82,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) { .send( getTestRuleData({ rule_type_id: 'test.patternFiring', - schedule: { interval: '1s' }, - throttle: null, + schedule: { interval: '2s' }, + throttle: '1s', params: { pattern, }, @@ -665,6 +665,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -763,6 +767,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -871,6 +879,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -964,6 +976,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -1067,6 +1083,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 5, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -1166,6 +1186,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -1192,7 +1216,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) { .send( getTestRuleData({ rule_type_id: 'test.patternFiring', - schedule: { interval: '1s' }, + schedule: { interval: '2s' }, + notify_when: RuleNotifyWhen.THROTTLE, throttle: '1s', params: { pattern, @@ -1263,6 +1288,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -1289,7 +1318,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { .send( getTestRuleData({ rule_type_id: 'test.patternFiring', - schedule: { interval: '1s' }, + schedule: { interval: '2s' }, throttle: null, notify_when: null, params: { @@ -1302,8 +1331,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { params: {}, frequency: { summary: false, - throttle: '1s', - notify_when: RuleNotifyWhen.THROTTLE, + notify_when: RuleNotifyWhen.ACTIVE, }, }, { @@ -1312,8 +1340,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { params: {}, frequency: { summary: false, - throttle: '1s', - notify_when: RuleNotifyWhen.THROTTLE, + notify_when: RuleNotifyWhen.ACTIVE, }, }, ], @@ -1371,6 +1398,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -1396,7 +1427,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { .send( getTestRuleData({ rule_type_id: 'test.patternFiring', - schedule: { interval: '1s' }, + schedule: { interval: '2s' }, throttle: '1s', params: { pattern, @@ -1463,6 +1494,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) { status_change_threshold: 4, }) .expect(200); + + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + const { body: createdAction } = await supertest .post(`${getUrlPrefix(space.id)}/api/actions/connector`) .set('kbn-xsrf', 'foo') @@ -1488,7 +1523,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { .send( getTestRuleData({ rule_type_id: 'test.patternFiring', - schedule: { interval: '1s' }, + schedule: { interval: '2s' }, throttle: null, notify_when: null, params: { @@ -1501,8 +1536,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { params: {}, frequency: { summary: false, - throttle: '1s', - notify_when: RuleNotifyWhen.THROTTLE, + notify_when: RuleNotifyWhen.ACTIVE, }, }, { @@ -1511,8 +1545,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) { params: {}, frequency: { summary: false, - throttle: '1s', - notify_when: RuleNotifyWhen.THROTTLE, + notify_when: RuleNotifyWhen.ACTIVE, }, }, ], @@ -1567,6 +1600,9 @@ export default function eventLogTests({ getService }: FtrProviderContext) { }) .expect(200); + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + // flap and then recover, then active again const instance = [true, false, true, false, true].concat( ...new Array(6).fill(false), @@ -1709,8 +1745,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) { .send( getTestRuleData({ rule_type_id: 'test.patternFiring', - schedule: { interval: '1s' }, - throttle: null, + schedule: { interval: '2s' }, + throttle: '1s', params: { pattern, }, @@ -1942,8 +1978,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) { provider: 'alerting', actions: new Map([ // make sure the counts of the # of events per type are as expected - ['execute-start', { equal: 6 }], - ['execute', { equal: 6 }], + ['execute-start', { gte: 6 }], + ['execute', { gte: 6 }], ['new-instance', { equal: 1 }], ['active-instance', { equal: 2 }], ['recovered-instance', { equal: 1 }], diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_flapping.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_flapping.ts index 4ee2ea9e18c3c..1035ba1902dfe 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_flapping.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_flapping.ts @@ -35,9 +35,7 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid const alertsAsDataIndex = '.alerts-test.patternfiring.alerts-default'; - // FLAKY: https://github.com/elastic/kibana/issues/195573 - // Failing: See https://github.com/elastic/kibana/issues/195573 - describe.skip('alerts as data flapping', function () { + describe('alerts as data flapping', function () { this.tags('skipFIPS'); beforeEach(async () => { await es.deleteByQuery({ @@ -712,6 +710,9 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid }) .expect(200); + // wait so cache expires + await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME); + // Wait for the rule to run once let run = 1; let runWhichItFlapped = 0; @@ -754,6 +755,11 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid const searchResult = await es.search({ index: alertsAsDataIndex, body: { + sort: [ + { + '@timestamp': 'desc', + }, + ], query: { bool: { must: { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/builtin_alert_types/long_running/rule.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/builtin_alert_types/long_running/rule.ts index 7ea429f1be92f..49ad3abeb063b 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/builtin_alert_types/long_running/rule.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/builtin_alert_types/long_running/rule.ts @@ -110,11 +110,13 @@ export default function ruleTests({ getService }: FtrProviderContext) { }); }); - const { status, body: rule } = await supertest.get( - `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` - ); - expect(status).to.eql(200); - expect(rule.execution_status.status).to.eql('active'); + await retry.try(async () => { + const { status, body: rule } = await supertest.get( + `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` + ); + expect(status).to.eql(200); + expect(rule.execution_status.status).to.eql('active'); + }); }); it('still logs alert docs when rule exceeds timeout when cancelAlertsOnRuleTimeout is false on rule type', async () => { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/notify_when.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/notify_when.ts index e32813934e4c2..5d7b6fc29264b 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/notify_when.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/notify_when.ts @@ -92,7 +92,11 @@ export default function createNotifyWhenTests({ getService }: FtrProviderContext }); }); - const executeActionEvents = getEventsByAction(events, 'execute-action'); + // Slice in case the rule ran more times than we are asserting on + const executeActionEvents = getEventsByAction(events, 'execute-action').slice( + 0, + expectedActionGroupBasedOnPattern.length + ); const executeActionEventsActionGroup = executeActionEvents.map( (event) => event?.kibana?.alerting?.action_group_id ); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts index fb8ee402fcc88..50568fe1c206c 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts @@ -49,10 +49,18 @@ export default function ({ getService }: FtrProviderContext) { describe('task manager metrics', () => { describe('task claim', () => { it('should increment task claim success/total counters', async () => { - // counters are reset every 30 seconds, so wait until the start of a - // fresh counter cycle to make sure values are incrementing + // reset metrics counter + await getMetrics(true); + const metricsResetTime = Date.now(); + // we've resetted the metrics and have 30 seconds before they reset again + // wait for the first set of metrics to be returned after the reset const initialMetrics = ( - await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1) + await getMetrics( + false, + (metrics) => + !!metrics?.metrics?.task_claim?.timestamp && + new Date(metrics?.metrics?.task_claim?.timestamp).getTime() > metricsResetTime + ) ).metrics; expect(initialMetrics).not.to.be(null); expect(initialMetrics?.task_claim).not.to.be(null); @@ -92,7 +100,7 @@ export default function ({ getService }: FtrProviderContext) { const initialMetrics = ( await getMetrics( false, - (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + (metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue ) ).metrics; expect(initialMetrics).not.to.be(null); @@ -101,7 +109,10 @@ export default function ({ getService }: FtrProviderContext) { // retry until counter value resets const resetMetrics = ( - await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1) + await getMetrics( + false, + (m: NodeMetrics) => (m?.metrics?.task_claim?.value.total || 0) >= 1 + ) ).metrics; expect(resetMetrics).not.to.be(null); expect(resetMetrics?.task_claim).not.to.be(null); @@ -113,7 +124,7 @@ export default function ({ getService }: FtrProviderContext) { const initialMetrics = ( await getMetrics( false, - (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + (metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue ) ).metrics; expect(initialMetrics).not.to.be(null); @@ -133,8 +144,8 @@ export default function ({ getService }: FtrProviderContext) { expect(metrics?.task_claim).not.to.be(null); expect(metrics?.task_claim?.value).not.to.be(null); - expect(metrics?.task_claim?.value.success).to.equal(1); - expect(metrics?.task_claim?.value.total).to.equal(1); + expect(metrics?.task_claim?.value.success).to.be.greaterThan(0); + expect(metrics?.task_claim?.value.total).to.be.greaterThan(0); previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!; diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index c7e9dc4536fb0..defb7763d89ce 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -797,7 +797,7 @@ export default function ({ getService }: FtrProviderContext) { await retry.try(async () => { const [scheduledTask] = (await currentTasks()).docs; expect(scheduledTask.id).to.eql(task.id); - expect(scheduledTask.status).to.eql('claiming'); + expect(['claiming', 'running'].includes(scheduledTask.status)).to.be(true); expect(scheduledTask.attempts).to.be.greaterThan(3); }); }); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts index e35c2c4730815..5c7ef55577861 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts @@ -77,12 +77,16 @@ export default function ({ getService }: FtrProviderContext) { } it('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => { + const testStart = new Date(); const scheduledTask = await scheduleTask({ taskType: 'sampleTask', schedule: { interval: `1s` }, params: {}, }); + let scheduledTaskRuns = 0; + let scheduledTaskInstanceRunAt = scheduledTask.runAt; + await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(tasks.length).to.eql(3); @@ -98,8 +102,16 @@ export default function ({ getService }: FtrProviderContext) { ); const removedTaskInstance = tasks.find((task) => task.id === REMOVED_TASK_TYPE_ID); - expect(scheduledTaskInstance?.status).to.eql('claiming'); + if (scheduledTaskInstance && scheduledTaskInstance.runAt !== scheduledTaskInstanceRunAt) { + scheduledTaskRuns++; + scheduledTaskInstanceRunAt = scheduledTaskInstance.runAt; + } + + expect(scheduledTaskRuns).to.be.greaterThan(2); expect(unregisteredTaskInstance?.status).to.eql('idle'); + expect(new Date(unregisteredTaskInstance?.runAt || testStart).getTime()).to.be.lessThan( + testStart.getTime() + ); expect(removedTaskInstance?.status).to.eql('unrecognized'); }); }); diff --git a/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts b/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts index 1c346584beaf2..f1e697399fe09 100644 --- a/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts +++ b/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts @@ -371,6 +371,7 @@ export function initRoutes( do { const { docs: tasks } = await taskManager.fetch({ query: taskManagerQuery, + size: 1000, }); tasksFound = tasks.length; await Promise.all(tasks.map((task) => taskManager.remove(task.id))); diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts index 241bd8adcd40d..23e387061830a 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts @@ -6,9 +6,7 @@ */ import expect from '@kbn/expect'; -import url from 'url'; import { keyBy, mapValues } from 'lodash'; -import supertest from 'supertest'; import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; import { FtrProviderContext } from '../../ftr_provider_context'; @@ -82,14 +80,13 @@ interface MonitoringStats { } export default function ({ getService }: FtrProviderContext) { - const config = getService('config'); const retry = getService('retry'); - const request = supertest(url.format(config.get('servers.kibana'))); + const supertest = getService('supertest'); const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); function getHealthRequest() { - return request.get('/api/task_manager/_health').set('kbn-xsrf', 'foo'); + return supertest.get('/api/task_manager/_health').set('kbn-xsrf', 'foo'); } function getHealth(): Promise { @@ -114,7 +111,7 @@ export default function ({ getService }: FtrProviderContext) { } function scheduleTask(task: Partial): Promise { - return request + return supertest .post('/api/sample_tasks/schedule') .set('kbn-xsrf', 'xxx') .send({ task }) @@ -125,6 +122,11 @@ export default function ({ getService }: FtrProviderContext) { const monitoredAggregatedStatsRefreshRate = 5000; describe('health', () => { + afterEach(async () => { + // clean up after each test + return await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200); + }); + it('should return basic configuration of task manager', async () => { const health = await getHealth(); expect(health.status).to.eql('OK'); diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts index 6323cef329ed6..f03023fb10ee8 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts @@ -469,8 +469,7 @@ export default function ({ getService }: FtrProviderContext) { }); }); - // always failing - it.skip('should only run as many instances of a task as its maxConcurrency will allow', async () => { + it('should only run as many instances of a task as its maxConcurrency will allow', async () => { // should run as there's only one and maxConcurrency on this TaskType is 1 const firstWithSingleConcurrency = await scheduleTask({ taskType: 'sampleTaskWithSingleConcurrency', @@ -762,18 +761,24 @@ export default function ({ getService }: FtrProviderContext) { }); }); - // flaky - it.skip('should continue claiming recurring task even if maxAttempts has been reached', async () => { + it('should continue claiming recurring task even if maxAttempts has been reached', async () => { const task = await scheduleTask({ taskType: 'sampleRecurringTaskTimingOut', schedule: { interval: '1s' }, params: {}, }); + let taskRuns = 0; + let taskRetryAt = task.retryAt; + await retry.try(async () => { const [scheduledTask] = (await currentTasks()).docs; expect(scheduledTask.id).to.eql(task.id); - expect(scheduledTask.status).to.eql('claiming'); + if (scheduledTask.retryAt !== taskRetryAt) { + taskRuns++; + taskRetryAt = scheduledTask.retryAt; + } + expect(taskRuns).to.be.greaterThan(3); expect(scheduledTask.attempts).to.be.greaterThan(3); }); }); diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management_removed_types.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management_removed_types.ts index e13615cceab0c..aae90a52572c7 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management_removed_types.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management_removed_types.ts @@ -56,6 +56,11 @@ export default function ({ getService }: FtrProviderContext) { await esArchiver.unload('x-pack/test/functional/es_archives/task_manager_removed_types'); }); + afterEach(async () => { + // clean up after each test + return await request.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200); + }); + function scheduleTask( task: Partial ): Promise { @@ -76,14 +81,17 @@ export default function ({ getService }: FtrProviderContext) { .then((response) => response.body); } - // flaky - it.skip('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => { + it('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => { + const testStart = new Date(); const scheduledTask = await scheduleTask({ taskType: 'sampleTask', schedule: { interval: `1s` }, params: {}, }); + let scheduledTaskRuns = 0; + let scheduledTaskInstanceRunAt = scheduledTask.runAt; + await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(tasks.length).to.eql(3); @@ -99,8 +107,16 @@ export default function ({ getService }: FtrProviderContext) { ); const removedTaskInstance = tasks.find((task) => task.id === REMOVED_TASK_TYPE_ID); - expect(scheduledTaskInstance?.status).to.eql('claiming'); + if (scheduledTaskInstance && scheduledTaskInstance.runAt !== scheduledTaskInstanceRunAt) { + scheduledTaskRuns++; + scheduledTaskInstanceRunAt = scheduledTaskInstance.runAt; + } + + expect(scheduledTaskRuns).to.be.greaterThan(2); expect(unregisteredTaskInstance?.status).to.eql('idle'); + expect(new Date(unregisteredTaskInstance?.runAt || testStart).getTime()).to.be.lessThan( + testStart.getTime() + ); expect(removedTaskInstance?.status).to.eql('unrecognized'); }); });