Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set refresh according to stateful vs stateless when indexing alert documents #201209

Merged
merged 16 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions x-pack/plugins/alerting/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
"usageCollection",
"security",
"monitoringCollection",
"spaces",
"serverless"
"spaces"
],
"extraPublicDirs": [
"common",
"common/parse_duration"
]
}
}
}
82 changes: 66 additions & 16 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ describe('Alerts Client', () => {
rule: alertRuleData,
kibanaVersion: '8.9.0',
spaceId: 'space1',
isServerless: false,
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts }),
};
maintenanceWindowsService.getMaintenanceWindows.mockReturnValue({
Expand Down Expand Up @@ -543,10 +544,58 @@ describe('Alerts Client', () => {
});

describe('persistAlerts()', () => {
test('should index new alerts', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);
test('should index new alerts with refresh: wait_for in stateful', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: false,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

// Report 2 new alerts
const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');
alertExecutorService.create('2').scheduleActions('default');

await alertsClient.processAlerts(processAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);

await alertsClient.persistAlerts();

const { alertsToReturn } = alertsClient.getAlertsToSerialize();
const uuid1 = alertsToReturn['1'].meta?.uuid;
const uuid2 = alertsToReturn['2'].meta?.uuid;

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
create: { _id: uuid1, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid1 }),
{
create: { _id: uuid2, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid2, [ALERT_INSTANCE_ID]: '2' }),
],
});
expect(maintenanceWindowsService.getMaintenanceWindows).toHaveBeenCalledWith({
eventLogger: alertingEventLogger,
request: fakeRequest,
ruleTypeCategory: 'test',
spaceId: 'space1',
});
});

test('should index new alerts with refresh: true in stateless', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: true,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

Expand Down Expand Up @@ -659,7 +708,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -732,7 +781,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -867,7 +916,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -940,7 +989,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1039,7 +1088,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1196,7 +1245,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1314,7 +1363,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1518,7 +1567,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1602,6 +1651,7 @@ describe('Alerts Client', () => {
shouldWrite: false,
},
},
isServerless: false,
request: fakeRequest,
namespace: 'default',
rule: alertRuleData,
Expand Down Expand Up @@ -2451,7 +2501,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2725,7 +2775,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2826,7 +2876,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2923,7 +2973,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down
12 changes: 11 additions & 1 deletion x-pack/plugins/alerting/server/alerts_client/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
kibanaVersion: string;
dataStreamAdapter: DataStreamAdapter;
isServerless: boolean;
}

interface AlertsAffectedByMaintenanceWindows {
Expand Down Expand Up @@ -109,6 +110,7 @@ export class AlertsClient<
private runTimestampString: string | undefined;
private rule: AlertRule;
private ruleType: UntypedNormalizedRuleType;
private readonly isServerless: boolean;

private indexTemplateAndPattern: IIndexPatternString;

Expand Down Expand Up @@ -143,6 +145,7 @@ export class AlertsClient<
this._isUsingDataStreams = this.options.dataStreamAdapter.isUsingDataStreams();
this.ruleInfoMessage = `for ${this.ruleType.id}:${this.options.rule.id} '${this.options.rule.name}'`;
this.logTags = { tags: [this.ruleType.id, this.options.rule.id, 'alerts-client'] };
this.isServerless = options.isServerless;
}

public async initializeExecution(opts: InitializeExecutionOpts) {
Expand Down Expand Up @@ -554,8 +557,15 @@ export class AlertsClient<
);

try {
// eslint-disable-next-line no-console
console.log(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove this console log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's two places it should be removed. I plan to revert 7c19b45 before merging but left it there in case someone wanted to test using the PR deployments / projects. Some serverless projects are still being deployed atm.

'*** Refresh value when indexing alerts:',
this.isServerless ? true : 'wait_for'
);
const response = await esClient.bulk({
refresh: true,
// On serverless we can force a refresh to we don't wait for the longer refresh interval
// When too many refresh calls are done in a short period of time, they are throttled by stateless Elasticsearch
refresh: this.isServerless ? true : 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
body: bulkBody,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -159,6 +160,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -219,6 +221,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -288,6 +291,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down
Loading