Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc backfill actions #199218

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions x-pack/plugins/actions/server/create_execute_function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
*/

import { SavedObjectsBulkResponse, SavedObjectsClientContract, Logger } from '@kbn/core/server';
import { RunNowResult, TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import {
RunNowResult,
TaskManagerStartContract,
TaskPriority,
} from '@kbn/task-manager-plugin/server';
import {
RawAction,
ActionTypeRegistryContract,
Expand Down Expand Up @@ -36,6 +40,7 @@ export interface ExecuteOptions
apiKey: string | null;
executionId: string;
actionTypeId: string;
priority?: TaskPriority;
}

interface ActionTaskParams
Expand Down Expand Up @@ -179,7 +184,7 @@ export function createBulkExecutionEnqueuerFunction({
const actionTaskParamsRecords: SavedObjectsBulkResponse<ActionTaskParams> =
await unsecuredSavedObjectsClient.bulkCreate(actions, { refresh: false });

const taskInstances = actionTaskParamsRecords.saved_objects.map((so) => {
const taskInstances = actionTaskParamsRecords.saved_objects.map((so, index) => {
const actionId = so.attributes.actionId;
return {
taskType: `actions:${actionTypeIds[actionId]}`,
Expand All @@ -189,6 +194,7 @@ export function createBulkExecutionEnqueuerFunction({
},
state: {},
scope: ['actions'],
...(runnableActions[index].priority ? { priority: runnableActions[index].priority } : {}),
};
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const scheduleBodySchema = schema.arrayOf(
rule_id: schema.string(),
start: schema.string(),
end: schema.maybe(schema.string()),
run_actions: schema.boolean({ defaultValue: true }),
},
{
validate({ start, end }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ export async function findBackfill(
...(params.sortOrder ? { sortOrder: params.sortOrder } : {}),
});

const actionsClient = await context.getActionsClient();

const transformedData: Backfill[] = data.map((so: SavedObject<AdHocRunSO>) => {
context.auditLogger?.log(
adHocRunAuditEvent({
Expand All @@ -110,7 +112,7 @@ export async function findBackfill(
})
);

return transformAdHocRunToBackfillResult(so) as Backfill;
return transformAdHocRunToBackfillResult(actionsClient, so) as Backfill;
});

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ export async function getBackfill(context: RulesClientContext, id: string): Prom
})
);

return transformAdHocRunToBackfillResult(result) as Backfill;
const actionsClient = await context.getActionsClient();
return transformAdHocRunToBackfillResult(actionsClient, result) as Backfill;
} catch (err) {
const errorMessage = `Failed to get backfill by id: ${id}`;
context.logger.error(`${errorMessage} - ${err}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export async function scheduleBackfill(

const actionsClient = await context.getActionsClient();
return await context.backfillClient.bulkQueue({
actionsClient,
auditLogger: context.auditLogger,
params,
rules: rulesToSchedule.map(({ id, attributes, references }) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const scheduleBackfillParamSchema = schema.object(
ruleId: schema.string(),
start: schema.string(),
end: schema.maybe(schema.string()),
runActions: schema.boolean({ defaultValue: true }),
},
{
validate({ start, end }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { schema } from '@kbn/config-schema';
import { ruleParamsSchema } from '@kbn/response-ops-rule-params';
import { adHocRunStatus } from '../../../../../common/constants';
import { actionSchema as ruleActionSchema } from '../../../rule/schemas/action_schemas';

export const statusSchema = schema.oneOf([
schema.literal(adHocRunStatus.COMPLETE),
Expand All @@ -32,6 +33,7 @@ export const backfillSchema = schema.object({
id: schema.string(),
name: schema.string(),
tags: schema.arrayOf(schema.string()),
actions: schema.arrayOf(ruleActionSchema),
alertTypeId: schema.string(),
params: ruleParamsSchema,
apiKeyOwner: schema.nullable(schema.string()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
*/

import { SavedObject, SavedObjectsBulkCreateObject } from '@kbn/core/server';
import { ActionsClient } from '@kbn/actions-plugin/server';
import { AdHocRunSO } from '../../../data/ad_hoc_run/types';
import { createBackfillError } from '../../../backfill_client/lib';
import { ScheduleBackfillResult } from '../methods/schedule/types';
import { transformRawActionsToDomainActions } from '../../rule/transforms';

export const transformAdHocRunToBackfillResult = (
actionsClient: ActionsClient,
{ id, attributes, references, error }: SavedObject<AdHocRunSO>,
originalSO?: SavedObjectsBulkCreateObject<AdHocRunSO>
): ScheduleBackfillResult => {
Expand Down Expand Up @@ -55,6 +58,13 @@ export const transformAdHocRunToBackfillResult = (
rule: {
...attributes.rule,
id: references[0].id,
actions: transformRawActionsToDomainActions({
ruleId: id,
actions: attributes.rule.actions,
references,
isSystemAction: (connectorId: string) => actionsClient.isSystemAction(connectorId),
omitGeneratedValues: true,
}),
},
spaceId: attributes.spaceId,
start: attributes.start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { isString } from 'lodash';
import { DenormalizedAction } from '../../../rules_client';
import { AdHocRunSO } from '../../../data/ad_hoc_run/types';
import { calculateSchedule } from '../../../backfill_client/lib';
import { adHocRunStatus } from '../../../../common/constants';
Expand All @@ -15,6 +16,7 @@ import { ScheduleBackfillParam } from '../methods/schedule/types';
export const transformBackfillParamToAdHocRun = (
param: ScheduleBackfillParam,
rule: RuleDomain,
actions: DenormalizedAction[],
spaceId: string
): AdHocRunSO => {
const schedule = calculateSchedule(param.start, rule.schedule.interval, param.end);
Expand All @@ -32,6 +34,7 @@ export const transformBackfillParamToAdHocRun = (
params: rule.params,
apiKeyOwner: rule.apiKeyOwner,
apiKeyCreatedByUser: rule.apiKeyCreatedByUser,
actions: param.runActions ? (actions as unknown as AdHocRunSO['rule']['actions']) : [],
consumer: rule.consumer,
enabled: rule.enabled,
schedule: rule.schedule,
Expand Down
37 changes: 30 additions & 7 deletions x-pack/plugins/alerting/server/backfill_client/backfill_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
TaskPriority,
} from '@kbn/task-manager-plugin/server';
import { isNumber } from 'lodash';
import { ActionsClient } from '@kbn/actions-plugin/server';
import { asyncForEach } from '@kbn/std';
import {
ScheduleBackfillError,
ScheduleBackfillParam,
Expand All @@ -42,6 +44,8 @@ import { AD_HOC_RUN_SAVED_OBJECT_TYPE, RULE_SAVED_OBJECT_TYPE } from '../saved_o
import { TaskRunnerFactory } from '../task_runner';
import { RuleTypeRegistry } from '../types';
import { createBackfillError } from './lib';
import { denormalizeActions } from '../rules_client/lib/denormalize_actions';
import { NormalizedAlertActionWithGeneratedValues } from '../rules_client';

export const BACKFILL_TASK_TYPE = 'ad_hoc_run-backfill';

Expand All @@ -53,6 +57,7 @@ interface ConstructorOpts {
}

interface BulkQueueOpts {
actionsClient: ActionsClient;
auditLogger?: AuditLogger;
params: ScheduleBackfillParams;
rules: RuleDomain[];
Expand Down Expand Up @@ -86,6 +91,7 @@ export class BackfillClient {
}

public async bulkQueue({
actionsClient,
auditLogger,
params,
rules,
Expand Down Expand Up @@ -118,7 +124,7 @@ export class BackfillClient {

const soToCreateIndexOrErrorMap: Map<number, number | ScheduleBackfillError> = new Map();

params.forEach((param: ScheduleBackfillParam, ndx: number) => {
await asyncForEach(params, async (param: ScheduleBackfillParam, ndx: number) => {
// For this schedule request, look up the rule or return error
const { rule, error } = getRuleOrError(param.ruleId, rules, ruleTypeRegistry);
if (rule) {
Expand All @@ -129,19 +135,25 @@ export class BackfillClient {
name: `rule`,
type: RULE_SAVED_OBJECT_TYPE,
};
const allActions = [
...rule.actions,
...(rule.systemActions ?? []),
] as NormalizedAlertActionWithGeneratedValues[];
const { references: actionReferences, actions } = await denormalizeActions(
() => Promise.resolve(actionsClient),
allActions
);
adHocSOsToCreate.push({
type: AD_HOC_RUN_SAVED_OBJECT_TYPE,
attributes: transformBackfillParamToAdHocRun(param, rule, spaceId),
references: [reference],
attributes: transformBackfillParamToAdHocRun(param, rule, actions, spaceId),
references: [reference, ...actionReferences],
});
} else if (error) {
// keep track of the error encountered for this request by index so
// we can return it in order
soToCreateIndexOrErrorMap.set(ndx, error);
this.logger.warn(
`No rule found for ruleId ${param.ruleId} - not scheduling backfill for ${JSON.stringify(
param
)}`
`Error for ruleId ${param.ruleId} - not scheduling backfill for ${JSON.stringify(param)}`
);
}
});
Expand Down Expand Up @@ -175,7 +187,7 @@ export class BackfillClient {
})
);
}
return transformAdHocRunToBackfillResult(so, adHocSOsToCreate?.[index]);
return transformAdHocRunToBackfillResult(actionsClient, so, adHocSOsToCreate?.[index]);
}
);

Expand Down Expand Up @@ -343,5 +355,16 @@ function getRuleOrError(
};
}

// validate that we can run all the actions that are configured
const hasUnsupportedActions = rule.actions.some(
(action) => action.frequency?.notifyWhen !== 'onActiveAlert'
);

if (hasUnsupportedActions) {
return {
error: createBackfillError(`Rule ${ruleId} has unsupported actions`, ruleId, rule.name),
};
}

return { rule };
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import { RawRule } from '../../../types';
import { RuleDomain } from '../../../application/rule/types';
import { AdHocRunStatus } from '../../../../common/constants';

Expand All @@ -23,10 +24,11 @@ export interface AdHocRunSchedule extends Record<string, unknown> {
// the backfill job was scheduled. if there are updates to the rule configuration
// after the backfill is scheduled, they will not be reflected during the backfill run.
type AdHocRunSORule = Pick<
RuleDomain,
RawRule,
| 'name'
| 'tags'
| 'alertTypeId'
| 'actions'
| 'params'
| 'apiKeyOwner'
| 'apiKeyCreatedByUser'
Expand All @@ -43,7 +45,7 @@ type AdHocRunSORule = Pick<

// This is the rule information after loaded from persistence with the
// rule ID injected from the SO references array
type AdHocRunRule = AdHocRunSORule & Pick<RuleDomain, 'id'>;
type AdHocRunRule = Omit<AdHocRunSORule, 'actions'> & Pick<RuleDomain, 'id' | 'actions'>;

export interface AdHocRunSO extends Record<string, unknown> {
apiKeyId: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ import { ScheduleBackfillRequestBodyV1 } from '../../../../../../../common/route
import { ScheduleBackfillParams } from '../../../../../../application/backfill/methods/schedule/types';

export const transformRequest = (request: ScheduleBackfillRequestBodyV1): ScheduleBackfillParams =>
request.map(({ rule_id, start, end }) => ({ ruleId: rule_id, start, end }));
request.map(({ rule_id, start, end, run_actions }) => ({
ruleId: rule_id,
start,
end,
runActions: run_actions,
}));
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,22 @@
* 2.0.
*/
import { SavedObjectReference } from '@kbn/core/server';
import { ActionsClient } from '@kbn/actions-plugin/server';
import {
preconfiguredConnectorActionRefPrefix,
systemConnectorActionRefPrefix,
} from '../common/constants';
import {
DenormalizedAction,
NormalizedAlertActionWithGeneratedValues,
RulesClientContext,
} from '../types';
import { DenormalizedAction, NormalizedAlertActionWithGeneratedValues } from '../types';

export async function denormalizeActions(
context: RulesClientContext,
getActionsClient: () => Promise<ActionsClient>,
alertActions: NormalizedAlertActionWithGeneratedValues[]
): Promise<{ actions: DenormalizedAction[]; references: SavedObjectReference[] }> {
const references: SavedObjectReference[] = [];
const actions: DenormalizedAction[] = [];

if (alertActions.length) {
const actionsClient = await context.getActionsClient();
const actionsClient = await getActionsClient();
const actionIds = [...new Set(alertActions.map((alertAction) => alertAction.id))];

const actionResults = await actionsClient.getBulk({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ export async function extractReferences<
params: ExtractedParams;
references: SavedObjectReference[];
}> {
const { references: actionReferences, actions } = await denormalizeActions(context, ruleActions);
const { references: actionReferences, actions } = await denormalizeActions(
context.getActionsClient,
ruleActions
);

// Extracts any references using configured reference extractor if available
const extractedRefsAndParams = ruleType?.useSavedObjectReferences?.extractReferences
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
*/

import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server';
import { rawAdHocRunParamsSchemaV1 } from '../schemas/raw_ad_hoc_run_params';
import {
rawAdHocRunParamsSchemaV1,
rawAdHocRunParamsSchemaV2,
} from '../schemas/raw_ad_hoc_run_params';

export const adHocRunParamsModelVersions: SavedObjectsModelVersionMap = {
'1': {
Expand All @@ -16,4 +19,11 @@ export const adHocRunParamsModelVersions: SavedObjectsModelVersionMap = {
create: rawAdHocRunParamsSchemaV1,
},
},
'2': {
changes: [],
schemas: {
forwardCompatibility: rawAdHocRunParamsSchemaV2.extends({}, { unknowns: 'ignore' }),
create: rawAdHocRunParamsSchemaV2,
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
*/

export { rawAdHocRunParamsSchema as rawAdHocRunParamsSchemaV1 } from './v1';
export { rawAdHocRunParamsSchema as rawAdHocRunParamsSchemaV2 } from './v2';
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { TypeOf } from '@kbn/config-schema';
import { rawAdHocRunParamsSchema } from './v2';

export type RawAdHocRunParams = TypeOf<typeof rawAdHocRunParamsSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const rawAdHocRunSchedule = schema.object({
runAt: schema.string(),
});

const rawAdHocRunParamsRuleSchema = schema.object({
export const rawAdHocRunParamsRuleSchema = schema.object({
name: schema.string(),
tags: schema.arrayOf(schema.string()),
alertTypeId: schema.string(),
Expand Down
Loading