Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set refresh according to stateful vs stateless when indexing alert documents #201209

Merged
merged 16 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions x-pack/plugins/alerting/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
"usageCollection",
"security",
"monitoringCollection",
"spaces",
"serverless"
"spaces"
],
"extraPublicDirs": [
"common",
"common/parse_duration"
]
}
}
}
82 changes: 66 additions & 16 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ describe('Alerts Client', () => {
rule: alertRuleData,
kibanaVersion: '8.9.0',
spaceId: 'space1',
isServerless: false,
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts }),
};
maintenanceWindowsService.getMaintenanceWindows.mockReturnValue({
Expand Down Expand Up @@ -543,10 +544,58 @@ describe('Alerts Client', () => {
});

describe('persistAlerts()', () => {
test('should index new alerts', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);
test('should index new alerts with refresh: wait_for in stateful', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: false,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

// Report 2 new alerts
const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');
alertExecutorService.create('2').scheduleActions('default');

await alertsClient.processAlerts(processAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);

await alertsClient.persistAlerts();

const { alertsToReturn } = alertsClient.getAlertsToSerialize();
const uuid1 = alertsToReturn['1'].meta?.uuid;
const uuid2 = alertsToReturn['2'].meta?.uuid;

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
create: { _id: uuid1, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid1 }),
{
create: { _id: uuid2, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid2, [ALERT_INSTANCE_ID]: '2' }),
],
});
expect(maintenanceWindowsService.getMaintenanceWindows).toHaveBeenCalledWith({
eventLogger: alertingEventLogger,
request: fakeRequest,
ruleTypeCategory: 'test',
spaceId: 'space1',
});
});

test('should index new alerts with refresh: true in stateless', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: true,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

Expand Down Expand Up @@ -659,7 +708,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -732,7 +781,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -867,7 +916,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -940,7 +989,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1039,7 +1088,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1196,7 +1245,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1314,7 +1363,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1518,7 +1567,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1602,6 +1651,7 @@ describe('Alerts Client', () => {
shouldWrite: false,
},
},
isServerless: false,
request: fakeRequest,
namespace: 'default',
rule: alertRuleData,
Expand Down Expand Up @@ -2451,7 +2501,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2725,7 +2775,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2826,7 +2876,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2923,7 +2973,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
kibanaVersion: string;
dataStreamAdapter: DataStreamAdapter;
isServerless: boolean;
}

interface AlertsAffectedByMaintenanceWindows {
Expand Down Expand Up @@ -109,6 +110,7 @@ export class AlertsClient<
private runTimestampString: string | undefined;
private rule: AlertRule;
private ruleType: UntypedNormalizedRuleType;
private readonly isServerless: boolean;

private indexTemplateAndPattern: IIndexPatternString;

Expand Down Expand Up @@ -143,6 +145,7 @@ export class AlertsClient<
this._isUsingDataStreams = this.options.dataStreamAdapter.isUsingDataStreams();
this.ruleInfoMessage = `for ${this.ruleType.id}:${this.options.rule.id} '${this.options.rule.name}'`;
this.logTags = { tags: [this.ruleType.id, this.options.rule.id, 'alerts-client'] };
this.isServerless = options.isServerless;
}

public async initializeExecution(opts: InitializeExecutionOpts) {
Expand Down Expand Up @@ -555,7 +558,9 @@ export class AlertsClient<

try {
const response = await esClient.bulk({
refresh: true,
// On serverless we can force a refresh to we don't wait for the longer refresh interval
// When too many refresh calls are done in a short period of time, they are throttled by stateless Elasticsearch
refresh: this.isServerless ? true : 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
body: bulkBody,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -159,6 +160,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -219,6 +221,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -288,6 +291,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down
Loading