Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ResponseOps] implement task claiming strategy mget #180485

Merged
merged 30 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
61c616f
[ResponseOps] implement task claiming stragegy mget
pmuellr Apr 10, 2024
d413be0
add logging, remove hack to delete version property
pmuellr May 7, 2024
d4b127e
make mget the default claim strategy, for testing
pmuellr May 8, 2024
612950a
add a tool to collect event logs
pmuellr May 10, 2024
50f2c1a
add preIdle and duration values to event log tool
pmuellr May 13, 2024
e2a9b4a
set default claim strategy to default (for testing), move APM transac…
pmuellr May 13, 2024
caa444f
rename mget config value to unsafe_mget, lazily validate and fallback…
pmuellr May 13, 2024
04baedb
apply changes suggested in PR by mikec
pmuellr May 13, 2024
11b028c
add support for REMOVED_TYPES
pmuellr May 14, 2024
1476a2c
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine May 15, 2024
0c5c83c
clean up summary log, make debug
pmuellr May 20, 2024
be6f1f9
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine May 20, 2024
de82421
adding tests, add some changes from Mike's PR comments
pmuellr May 23, 2024
872c378
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine May 28, 2024
5e5bbaf
adds FT task_manager_claimer_mget, copy of TM tests from plugin_api_i…
pmuellr May 29, 2024
5a6a740
[CI] Auto-commit changed files from 'node scripts/generate codeowners'
kibanamachine May 29, 2024
de62c5b
attempt to fix FT health report error, unsuccessfully
pmuellr May 30, 2024
7506d71
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine May 30, 2024
cb07c64
skip some flaky tests, clean up before review
pmuellr May 31, 2024
239cae6
merge main, fix conflict in .buildkite/ftr_configs.yml
pmuellr Jun 5, 2024
45b652e
changes driven from PR review
pmuellr Jun 6, 2024
1003255
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine Jun 6, 2024
bc73654
one of the failing FT tests now runs - tm health report limping ...
pmuellr Jun 6, 2024
cc9f8c9
[CI] Auto-commit changed files from 'node scripts/eslint --no-cache -…
kibanamachine Jun 6, 2024
a2e7720
updates from PR comments
pmuellr Jun 6, 2024
dd7482e
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine Jun 6, 2024
36ba3d2
remove FTR test from FTR list, that mistakenly was re-added
pmuellr Jun 10, 2024
47f2a4d
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine Jun 10, 2024
a57d147
merge main, fix conflicts
pmuellr Jun 12, 2024
dcf0c2f
Merge branch 'main' into 155770-tm-poll-mget
kibanamachine Jun 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/ftr_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ enabled:
- x-pack/test/spaces_api_integration/security_and_spaces/config_trial.ts
- x-pack/test/spaces_api_integration/security_and_spaces/copy_to_space_config_trial.ts
- x-pack/test/spaces_api_integration/spaces_only/config.ts
- x-pack/test/task_manager_claimer_mget/config.ts
- x-pack/test/timeline/security_and_spaces/config_trial.ts
- x-pack/test/ui_capabilities/security_and_spaces/config.ts
- x-pack/test/ui_capabilities/spaces_only/config.ts
Expand Down
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ x-pack/plugins/runtime_fields @elastic/kibana-management
packages/kbn-safer-lodash-set @elastic/kibana-security
x-pack/test/security_api_integration/plugins/saml_provider @elastic/kibana-security
x-pack/test/plugin_api_integration/plugins/sample_task_plugin @elastic/response-ops
x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget @elastic/response-ops
mikecote marked this conversation as resolved.
Show resolved Hide resolved
test/plugin_functional/plugins/saved_object_export_transforms @elastic/kibana-core
test/plugin_functional/plugins/saved_object_import_warnings @elastic/kibana-core
x-pack/test/saved_object_api_integration/common/plugins/saved_object_test_plugin @elastic/kibana-security
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@
"@kbn/safer-lodash-set": "link:packages/kbn-safer-lodash-set",
"@kbn/saml-provider-plugin": "link:x-pack/test/security_api_integration/plugins/saml_provider",
"@kbn/sample-task-plugin": "link:x-pack/test/plugin_api_integration/plugins/sample_task_plugin",
"@kbn/sample-task-plugin-mget": "link:x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget",
"@kbn/saved-object-export-transforms-plugin": "link:test/plugin_functional/plugins/saved_object_export_transforms",
"@kbn/saved-object-import-warnings-plugin": "link:test/plugin_functional/plugins/saved_object_import_warnings",
"@kbn/saved-object-test-plugin": "link:x-pack/test/saved_object_api_integration/common/plugins/saved_object_test_plugin",
Expand Down
2 changes: 2 additions & 0 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,8 @@
"@kbn/saml-provider-plugin/*": ["x-pack/test/security_api_integration/plugins/saml_provider/*"],
"@kbn/sample-task-plugin": ["x-pack/test/plugin_api_integration/plugins/sample_task_plugin"],
"@kbn/sample-task-plugin/*": ["x-pack/test/plugin_api_integration/plugins/sample_task_plugin/*"],
"@kbn/sample-task-plugin-mget": ["x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget"],
"@kbn/sample-task-plugin-mget/*": ["x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/*"],
"@kbn/saved-object-export-transforms-plugin": ["test/plugin_functional/plugins/saved_object_export_transforms"],
"@kbn/saved-object-export-transforms-plugin/*": ["test/plugin_functional/plugins/saved_object_export_transforms/*"],
"@kbn/saved-object-import-warnings-plugin": ["test/plugin_functional/plugins/saved_object_import_warnings"],
Expand Down
9 changes: 2 additions & 7 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,7 @@ describe('config validation', () => {
}).not.toThrowError();
});

test('the claim strategy is validated', () => {
const config = { claim_strategy: 'invalid-strategy' };
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"The claim strategy is invalid: Unknown task claiming strategy (invalid-strategy)"`
);
test('any claim strategy is valid', () => {
configSchema.validate({ claim_strategy: 'anything!' });
});
});
7 changes: 1 addition & 6 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { schema, TypeOf } from '@kbn/config-schema';
import { getTaskClaimer } from './task_claimers';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
Expand All @@ -27,6 +26,7 @@ export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;

export const CLAIM_STRATEGY_DEFAULT = 'default';
export const CLAIM_STRATEGY_MGET = 'unsafe_mget';

export const taskExecutionFailureThresholdSchema = schema.object(
{
Expand Down Expand Up @@ -165,11 +165,6 @@ export const configSchema = schema.object(
) {
return `The specified monitored_stats_required_freshness (${config.monitored_stats_required_freshness}) is invalid, as it is below the poll_interval (${config.poll_interval})`;
}
try {
getTaskClaimer(config.claim_strategy);
} catch (err) {
return `The claim strategy is invalid: ${err.message}`;
}
},
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

const DOCS_TO_FETCH = 10000;

// Get the event logs from multiple clusters, focusing on rule runs
// as they test recurring activity easily, and augmenting with other
// bits, producing a single .ndjson file for all clusters.
main();

async function main() {
// get urls and their host names
const urls = process.argv.slice(2);
const urlNoCreds = urls.map((url) => new URL(url)).map((url) => url?.origin || 'unknown');
const urlHosts = urls
.map((url) => new URL(url))
.map((url) => url?.host || 'unknown')
.map((url) => url.split('.')[0]);

if (urls.length === 0) return help();

// get the event logs
const docPromises = urls.map(getRuleRunEventDocs);
const docResults = await Promise.allSettled(docPromises);

/** @type { any[][] } */
const serverDocs = [];

// log errors, and add urls to event logs
for (let i = 0; i < urls.length; i++) {
const url = urls[i];
const docResult = docResults[i];
if (docResult.status === 'rejected') {
console.error(`Failed to get docs from ${url}: ${docResult.reason}`);
} else {
for (const doc of docResult.value) {
if (!doc.kibana) doc.kibana = {};
// add/remove some bits - remove to save space
doc.kibana.url = urlNoCreds[i];
doc.kibana.host = urlHosts[i];
delete doc.kibana.saved_objects;
delete doc.kibana.space_ids;

if (!doc.event) doc.event = {};
if (doc.event.start) doc.event.startMs = new Date(doc.event.start).valueOf();
if (doc.event.end) doc.event.endMs = new Date(doc.event.end).valueOf();
if (doc.event.endMs && doc.event.startMs)
doc.event.durationMs = doc.event.endMs - doc.event.startMs;
}
serverDocs.push(docResult.value);
}
}

// for each server's docs, apply a worker id
for (const docs of serverDocs) {
// sort ascending by timestamp
docs.sort((a, b) => a.event.startMs - b.event.startMs);

assignWorkerIds(docs);

for (const doc of docs) {
console.log(JSON.stringify(doc));
}
}
}

class Worker {
/** @param { string } id */
constructor(id) {
this.id = id;
/** @type { number | undefined } */
this.nextEnd = undefined;
/** @type { number | undefined } */
this.lastEnd = undefined;
}

/** @type { (currentDate: number) => void } */
update(currentDate) {
if (currentDate >= this.nextEnd) {
this.lastEnd = this.nextEnd;
this.nextEnd = undefined;
}
}

/** @type { () => boolean } */
isAvailable() {
return this.nextEnd === undefined;
}

/** @type { (end: number) => void } */
claimTill(end) {
this.nextEnd = end;
}
}

class Workers {
constructor() {
/** @type { Map<string, Worker[]> } */
this.workersByServer = new Map();

/** @type { Map<string, string> } */
this.serverMap = new Map();
}

/** @type { (doc: any) => string } */
getServerId(doc) {
const { server_uuid: serverUuid } = doc?.kibana || {};
return this.serverMap.get(serverUuid) || 'unknown';
}

/** @type { (doc: any) => Worker } */
getAvailableWorker(doc) {
const { startMs, endMs } = doc?.event || {};
const { server_uuid: serverUuid } = doc?.kibana || {};
if (!this.serverMap.has(serverUuid)) {
this.serverMap.set(serverUuid, `${this.serverMap.size + 1}`);
}

const workers = this.getWorkersForServer(serverUuid);

for (const worker of workers) {
worker.update(startMs);
if (worker.isAvailable()) {
worker.claimTill(endMs);
return worker;
}
}
const worker = new Worker(workers.length + 1);
worker.claimTill(endMs);
workers.push(worker);

return worker;
}

/** @type { (serverUuid) => Worker[] } */
getWorkersForServer(serverUuid) {
let workers = this.workersByServer.get(serverUuid);
if (workers !== undefined) return workers;

workers = [];
this.workersByServer.set(serverUuid, workers);
return workers;
}
}

/** @type { (docs: any[]) => void } */
function assignWorkerIds(docs) {
const workers = new Workers();
for (const doc of docs) {
const worker = workers.getAvailableWorker(doc);
const serverId = workers.getServerId(doc).padStart(3, '0');
const workerId = `${worker.id}`.padStart(3, '0');
doc.kibana.worker = `${serverId}-${workerId}`;
doc.event.preIdleMs = worker.lastEnd ? doc.event.startMs - worker.lastEnd : 0;
}
}

/** @type { (url: string) => Promise<any[]>} */
async function getRuleRunEventDocs(url) {
const parsedUrl = new URL(url);
const indices = `.kibana-event-log,.kibana-event-log-ds`;
const options = `expand_wildcards=all&ignore_unavailable=true`;
const searchUrl = `${parsedUrl.origin}/${indices}/_search?${options}`;
const query = getQuery();
const authHeader = getAuthHeader(parsedUrl.username, parsedUrl.password);
const headers = {
'Content-Type': 'application/json',
...(authHeader ? { Authorization: authHeader } : {}),
};
const fetchResult = await fetch(searchUrl, {
method: 'POST',
headers,
body: JSON.stringify(query),
});

if (!fetchResult.ok) {
const text = await fetchResult.text();
throw new Error(`Failed to fetch from ${searchUrl}: ${fetchResult.statusText}\n${text}`);
}

const result = await fetchResult.json();
const sources = result.hits.hits.map((hit) => hit._source);

return sources;
}

/** @type { (username: string, password: string) => string | undefined } */
function getAuthHeader(username, password) {
if (!username || !password) return undefined;
if (username.toUpperCase() === 'APIKEY') return `ApiKey ${password}`;
const encoded = Buffer.from(`${username}:${password}`).toString('base64');
return `Basic ${encoded}`;
}

/** @type { (size: number) => any} */
function getQuery() {
return {
size: DOCS_TO_FETCH,
query: {
bool: {
filter: [
{ term: { 'event.provider': 'alerting' } },
{ term: { 'event.action': 'execute' } },
],
},
},
sort: [{ '@timestamp': { order: 'desc' } }],
};
}

function help() {
console.error(`
usage: [this-command] <es-url1> <es-url2> ... <es-urlN>

Will fetch rule execution event logs from each url, and augment them:
- adds event.startMs - event.start as an epoch number
- adds event.endMs - event.end as an epoch number
- adds event.durationMs - event.end as an epoch number
- adds event.preIdleMs - time worker was idle before this
- adds kibana.url - the URL passed in (which is actually ES)
- adds kibana.host - just the host name from that URL
- adds kibana.worker - worker in form of nodeId-workerId (unique only by url)
- deletes kibana.saved_objects - not needed and confusing
- deletes kibana.space_ids - not needed

The output is a single .ndjson file with all the docs.
`);
}
14 changes: 8 additions & 6 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ interface Opts<H> {
logger: Logger;
initialPollInterval: number;
pollInterval$: Observable<number>;
pollIntervalDelay$: Observable<number>;
pollIntervalDelay$?: Observable<number>;
getCapacity: () => number;
work: WorkFn<H>;
}
Expand Down Expand Up @@ -99,11 +99,13 @@ export function createTaskPoller<T, H>({
pollInterval = interval;
logger.debug(`Task poller now using interval of ${interval}ms`);
});
pollIntervalDelay$.subscribe((delay) => {
pollIntervalDelay = delay;
logger.debug(`Task poller now delaying emission by ${delay}ms`);
});
hasSubscribed = true;
if (pollIntervalDelay$) {
pollIntervalDelay$.subscribe((delay) => {
pollIntervalDelay = delay;
logger.debug(`Task poller now delaying emission by ${delay}ms`);
});
hasSubscribed = true;
mikecote marked this conversation as resolved.
Show resolved Hide resolved
}
}

return {
Expand Down
21 changes: 12 additions & 9 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server';

import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';
import { TaskManagerConfig, CLAIM_STRATEGY_DEFAULT } from './config';

import {
TaskMarkRunning,
Expand Down Expand Up @@ -154,15 +154,18 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
// pipe taskClaiming events into the lifecycle event stream
this.taskClaiming.events.subscribe(emitEvent);

const { poll_interval: pollInterval } = config;
const { poll_interval: pollInterval, claim_strategy: claimStrategy } = config;

const pollIntervalDelay$ = delayOnClaimConflicts(
maxWorkersConfiguration$,
pollIntervalConfiguration$,
this.events$,
config.version_conflict_threshold,
config.monitored_stats_running_average_window
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));
let pollIntervalDelay$: Observable<number> | undefined;
if (claimStrategy === CLAIM_STRATEGY_DEFAULT) {
pollIntervalDelay$ = delayOnClaimConflicts(
maxWorkersConfiguration$,
pollIntervalConfiguration$,
this.events$,
config.version_conflict_threshold,
config.monitored_stats_running_average_window
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));
}

const poller = createTaskPoller<string, TimedFillPoolResult>({
logger,
Expand Down
Loading