Skip to content

Commit

Permalink
Set refresh according to stateful vs stateless when indexing alert do…
Browse files Browse the repository at this point in the history
…cuments (elastic#201209)

In this PR, I'm making the change so when Kibana is running with
Elasticsearch stateful we set refresh to `wait_for` (instead of `true`)
so we are not putting too much pressure on the Elasticsearch indices
when under load.

## To verify

Very using the Cloud deployment and Serverless project created from this
PR

1. Create an always firing ES Query rule
2. Create an always firing security detection rule w/ alert suppression
3. Verify the ECH cluster logs and observe `*** Refresh value when
indexing alerts: wait_for` and `*** Rule registry - refresh value when
indexing alerts: wait_for` messages
4. Verify the serverless project logs on QA overview and observe `***
Refresh value when indexing alerts: true` and `*** Rule registry -
refresh value when indexing alerts: true` messages

## To-Do

- [x] Revert commit
elastic@7c19b45
that was added for testing purposes

---------

Co-authored-by: kibanamachine <[email protected]>
(cherry picked from commit a4cb330)
  • Loading branch information
mikecote committed Nov 28, 2024
1 parent d45b628 commit c94b166
Show file tree
Hide file tree
Showing 30 changed files with 188 additions and 37 deletions.
5 changes: 2 additions & 3 deletions x-pack/plugins/alerting/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
"usageCollection",
"security",
"monitoringCollection",
"spaces",
"serverless"
"spaces"
],
"extraPublicDirs": [
"common",
"common/parse_duration"
]
}
}
}
82 changes: 66 additions & 16 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ describe('Alerts Client', () => {
rule: alertRuleData,
kibanaVersion: '8.9.0',
spaceId: 'space1',
isServerless: false,
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts }),
};
maintenanceWindowsService.getMaintenanceWindows.mockReturnValue({
Expand Down Expand Up @@ -543,10 +544,58 @@ describe('Alerts Client', () => {
});

describe('persistAlerts()', () => {
test('should index new alerts', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);
test('should index new alerts with refresh: wait_for in stateful', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: false,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

// Report 2 new alerts
const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');
alertExecutorService.create('2').scheduleActions('default');

await alertsClient.processAlerts(processAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);

await alertsClient.persistAlerts();

const { alertsToReturn } = alertsClient.getAlertsToSerialize();
const uuid1 = alertsToReturn['1'].meta?.uuid;
const uuid2 = alertsToReturn['2'].meta?.uuid;

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
create: { _id: uuid1, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid1 }),
{
create: { _id: uuid2, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid2, [ALERT_INSTANCE_ID]: '2' }),
],
});
expect(maintenanceWindowsService.getMaintenanceWindows).toHaveBeenCalledWith({
eventLogger: alertingEventLogger,
request: fakeRequest,
ruleTypeCategory: 'test',
spaceId: 'space1',
});
});

test('should index new alerts with refresh: true in stateless', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: true,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

Expand Down Expand Up @@ -659,7 +708,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -732,7 +781,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -867,7 +916,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -940,7 +989,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1039,7 +1088,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1196,7 +1245,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1314,7 +1363,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1518,7 +1567,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1602,6 +1651,7 @@ describe('Alerts Client', () => {
shouldWrite: false,
},
},
isServerless: false,
request: fakeRequest,
namespace: 'default',
rule: alertRuleData,
Expand Down Expand Up @@ -2451,7 +2501,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2725,7 +2775,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2826,7 +2876,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2923,7 +2973,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
kibanaVersion: string;
dataStreamAdapter: DataStreamAdapter;
isServerless: boolean;
}

interface AlertsAffectedByMaintenanceWindows {
Expand Down Expand Up @@ -109,6 +110,7 @@ export class AlertsClient<
private runTimestampString: string | undefined;
private rule: AlertRule;
private ruleType: UntypedNormalizedRuleType;
private readonly isServerless: boolean;

private indexTemplateAndPattern: IIndexPatternString;

Expand Down Expand Up @@ -143,6 +145,7 @@ export class AlertsClient<
this._isUsingDataStreams = this.options.dataStreamAdapter.isUsingDataStreams();
this.ruleInfoMessage = `for ${this.ruleType.id}:${this.options.rule.id} '${this.options.rule.name}'`;
this.logTags = { tags: [this.ruleType.id, this.options.rule.id, 'alerts-client'] };
this.isServerless = options.isServerless;
}

public async initializeExecution(opts: InitializeExecutionOpts) {
Expand Down Expand Up @@ -555,7 +558,9 @@ export class AlertsClient<

try {
const response = await esClient.bulk({
refresh: true,
// On serverless we can force a refresh to we don't wait for the longer refresh interval
// When too many refresh calls are done in a short period of time, they are throttled by stateless Elasticsearch
refresh: this.isServerless ? true : 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
body: bulkBody,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -159,6 +160,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -219,6 +221,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -288,6 +291,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down
Loading

0 comments on commit c94b166

Please sign in to comment.