Skip to content

Commit

Permalink
[Entity Store] [FTR Tests] Fix flakiness + poll for engine started on…
Browse files Browse the repository at this point in the history
… setup (#196564)

## Summary

Closes #196546
Closes #196526

Unskips flaky entity store tests after fixes. 

Entity store tests were not polling for the engine to be started before
asserting the assets were present.

I have also added some retries to the asset checks as some assets are
not immediately queryable after creation.
  • Loading branch information
hop-dev authored Oct 21, 2024
1 parent 10ec204 commit 0e1b2a3
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,36 @@ export const executeFieldRetentionEnrichPolicy = async ({
export const deleteFieldRetentionEnrichPolicy = async ({
unitedDefinition,
esClient,
logger,
attempts = 5,
delayMs = 2000,
}: {
esClient: ElasticsearchClient;
unitedDefinition: DefinitionMetadata;
esClient: ElasticsearchClient;
logger: Logger;
attempts?: number;
delayMs?: number;
}) => {
const name = getFieldRetentionEnrichPolicyName(unitedDefinition);
return esClient.enrich.deletePolicy({ name }, { ignore: [404] });
let currentAttempt = 1;
while (currentAttempt <= attempts) {
try {
await esClient.enrich.deletePolicy({ name }, { ignore: [404] });
return;
} catch (e) {
// a 429 status code indicates that the enrich policy is being executed
if (currentAttempt === attempts || e.statusCode !== 429) {
logger.error(
`Error deleting enrich policy ${name}: ${e.message} after ${currentAttempt} attempts`
);
throw e;
}

logger.info(
`Enrich policy ${name} is being executed, waiting for it to finish before deleting`
);
await new Promise((resolve) => setTimeout(resolve, delayMs));
currentAttempt++;
}
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ export class EntityStoreDataClient {
logger,
taskManager,
});
logger.info(`Entity store initialized`);
logger.info(`Entity store initialized for ${entityType}`);

return updated;
} catch (err) {
Expand Down Expand Up @@ -362,6 +362,7 @@ export class EntityStoreDataClient {
await deleteFieldRetentionEnrichPolicy({
unitedDefinition,
esClient: this.esClient,
logger,
});

if (deleteData) {
Expand Down Expand Up @@ -450,7 +451,7 @@ export class EntityStoreDataClient {
originalStatus === ENGINE_STATUS.UPDATING
) {
throw new Error(
`Error updating entity store: There is an changes already in progress for engine ${id}`
`Error updating entity store: There are changes already in progress for engine ${id}`
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export default ({ getService }: FtrProviderContext) => {
const supertest = getService('supertest');

const utils = EntityStoreUtils(getService);
// Failing: See https://github.com/elastic/kibana/issues/196526
describe.skip('@ess @skipInServerlessMKI Entity Store Engine APIs', () => {
describe('@ess @skipInServerlessMKI Entity Store Engine APIs', () => {
const dataView = dataViewRouteHelpersFactory(supertest);

before(async () => {
Expand All @@ -33,22 +32,19 @@ export default ({ getService }: FtrProviderContext) => {
});

it('should have installed the expected user resources', async () => {
await utils.initEntityEngineForEntityType('user');
await utils.initEntityEngineForEntityTypesAndWait(['user']);
await utils.expectEngineAssetsExist('user');
});

it('should have installed the expected host resources', async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);
await utils.expectEngineAssetsExist('host');
});
});

describe('get and list', () => {
before(async () => {
await Promise.all([
utils.initEntityEngineForEntityType('host'),
utils.initEntityEngineForEntityType('user'),
]);
await utils.initEntityEngineForEntityTypesAndWait(['host', 'user']);
});

after(async () => {
Expand Down Expand Up @@ -118,7 +114,7 @@ export default ({ getService }: FtrProviderContext) => {

describe('start and stop', () => {
before(async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);
});

after(async () => {
Expand Down Expand Up @@ -160,7 +156,7 @@ export default ({ getService }: FtrProviderContext) => {

describe('delete', () => {
it('should delete the host entity engine', async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);

await api
.deleteEntityEngine({
Expand All @@ -173,7 +169,7 @@ export default ({ getService }: FtrProviderContext) => {
});

it('should delete the user entity engine', async () => {
await utils.initEntityEngineForEntityType('user');
await utils.initEntityEngineForEntityTypesAndWait(['user']);

await api
.deleteEntityEngine({
Expand All @@ -188,7 +184,7 @@ export default ({ getService }: FtrProviderContext) => {

describe('apply_dataview_indices', () => {
before(async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);
});

after(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ export default ({ getService }: FtrProviderContextWithSpaces) => {
const supertest = getService('supertest');
const utils = EntityStoreUtils(getService, namespace);

// Failing: See https://github.com/elastic/kibana/issues/196546
describe.skip('@ess Entity Store Engine APIs in non-default space', () => {
describe('@ess Entity Store Engine APIs in non-default space', () => {
const dataView = dataViewRouteHelpersFactory(supertest, namespace);

before(async () => {
Expand All @@ -43,22 +42,19 @@ export default ({ getService }: FtrProviderContextWithSpaces) => {
});

it('should have installed the expected user resources', async () => {
await utils.initEntityEngineForEntityType('user');
await utils.initEntityEngineForEntityTypesAndWait(['user']);
await utils.expectEngineAssetsExist('user');
});

it('should have installed the expected host resources', async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);
await utils.expectEngineAssetsExist('host');
});
});

describe('get and list', () => {
before(async () => {
await Promise.all([
utils.initEntityEngineForEntityType('host'),
utils.initEntityEngineForEntityType('user'),
]);
await utils.initEntityEngineForEntityTypesAndWait(['host', 'user']);
});

after(async () => {
Expand Down Expand Up @@ -134,7 +130,7 @@ export default ({ getService }: FtrProviderContextWithSpaces) => {

describe('start and stop', () => {
before(async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);
});

after(async () => {
Expand Down Expand Up @@ -188,7 +184,7 @@ export default ({ getService }: FtrProviderContextWithSpaces) => {

describe('delete', () => {
it('should delete the host entity engine', async () => {
await utils.initEntityEngineForEntityType('host');
await utils.initEntityEngineForEntityTypesAndWait(['host']);

await api
.deleteEntityEngine(
Expand All @@ -204,7 +200,7 @@ export default ({ getService }: FtrProviderContextWithSpaces) => {
});

it('should delete the user entity engine', async () => {
await utils.initEntityEngineForEntityType('user');
await utils.initEntityEngineForEntityTypesAndWait(['user']);

await api
.deleteEntityEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { FtrProviderContext } from '@kbn/ftr-common-functional-services';

export const elasticAssetCheckerFactory = (getService: FtrProviderContext['getService']) => {
const es = getService('es');
const retry = getService('retry');
const log = getService('log');

const expectTransformExists = async (transformId: string) => {
return expectTransformStatus(transformId, true);
Expand All @@ -18,45 +20,43 @@ export const elasticAssetCheckerFactory = (getService: FtrProviderContext['getSe
return expectTransformStatus(transformId, false);
};

const expectTransformStatus = async (
transformId: string,
exists: boolean,
attempts: number = 5,
delayMs: number = 2000
) => {
let currentAttempt = 1;
while (currentAttempt <= attempts) {
try {
await es.transform.getTransform({ transform_id: transformId });
if (!exists) {
throw new Error(`Expected transform ${transformId} to not exist, but it does`);
const expectTransformStatus = async (transformId: string, exists: boolean) => {
await retry.waitForWithTimeout(
`transform ${transformId} to ${exists ? 'exist' : 'not exist'}`,
10_000,
async () => {
try {
await es.transform.getTransform({ transform_id: transformId });
return exists;
} catch (e) {
log.debug(`Transform ${transformId} not found: ${e}`);
return !exists;
}
return; // Transform exists, exit the loop
} catch (e) {
if (currentAttempt === attempts) {
if (exists) {
throw new Error(`Expected transform ${transformId} to exist, but it does not: ${e}`);
} else {
return; // Transform does not exist, exit the loop
}
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
currentAttempt++;
}
}
);
};

const expectEnrichPolicyStatus = async (policyId: string, exists: boolean) => {
try {
await es.enrich.getPolicy({ name: policyId });
if (!exists) {
throw new Error(`Expected enrich policy ${policyId} to not exist, but it does`);
}
} catch (e) {
if (exists) {
throw new Error(`Expected enrich policy ${policyId} to exist, but it does not: ${e}`);
await retry.waitForWithTimeout(
`enrich policy ${policyId} to ${exists ? 'exist' : 'not exist'}`,
20_000,
async () => {
try {
const res = await es.enrich.getPolicy({ name: policyId });
const policy = res.policies?.[0];
if (policy) {
log.debug(`Enrich policy ${policyId} found: ${JSON.stringify(res)}`);
return exists;
} else {
log.debug(`Enrich policy ${policyId} not found: ${JSON.stringify(res)}`);
return !exists;
}
} catch (e) {
log.debug(`Enrich policy ${policyId} not found: ${e}`);
return !exists;
}
}
}
);
};

const expectEnrichPolicyExists = async (policyId: string) =>
Expand All @@ -66,18 +66,19 @@ export const elasticAssetCheckerFactory = (getService: FtrProviderContext['getSe
expectEnrichPolicyStatus(policyId, false);

const expectComponentTemplatStatus = async (templateName: string, exists: boolean) => {
try {
await es.cluster.getComponentTemplate({ name: templateName });
if (!exists) {
throw new Error(`Expected component template ${templateName} to not exist, but it does`);
}
} catch (e) {
if (exists) {
throw new Error(
`Expected component template ${templateName} to exist, but it does not: ${e}`
);
await retry.waitForWithTimeout(
`component template ${templateName} to ${exists ? 'exist' : 'not exist'}`,
10_000,
async () => {
try {
await es.cluster.getComponentTemplate({ name: templateName });
return exists; // Component template exists
} catch (e) {
log.debug(`Component template ${templateName} not found: ${e}`);
return !exists; // Component template does not exist
}
}
}
);
};

const expectComponentTemplateExists = async (templateName: string) =>
Expand All @@ -87,23 +88,45 @@ export const elasticAssetCheckerFactory = (getService: FtrProviderContext['getSe
expectComponentTemplatStatus(templateName, false);

const expectIngestPipelineStatus = async (pipelineId: string, exists: boolean) => {
await retry.waitForWithTimeout(
`ingest pipeline ${pipelineId} to ${exists ? 'exist' : 'not exist'}`,
10_000,
async () => {
try {
await es.ingest.getPipeline({ id: pipelineId });
return exists; // Ingest pipeline exists
} catch (e) {
log.debug(`Ingest pipeline ${pipelineId} not found: ${e}`);
return !exists; // Ingest pipeline does not exist
}
}
);
};

const expectIngestPipelineExists = async (pipelineId: string) =>
expectIngestPipelineStatus(pipelineId, true);

const expectIngestPipelineNotFound = async (pipelineId: string) =>
expectIngestPipelineStatus(pipelineId, false);

const expectIndexStatus = async (indexName: string, exists: boolean) => {
try {
await es.ingest.getPipeline({ id: pipelineId });
await es.indices.get({ index: indexName });
if (!exists) {
throw new Error(`Expected ingest pipeline ${pipelineId} to not exist, but it does`);
throw new Error(`Expected index ${indexName} to not exist, but it does`);
}
} catch (e) {
if (exists) {
throw new Error(`Expected ingest pipeline ${pipelineId} to exist, but it does not: ${e}`);
throw new Error(`Expected index ${indexName} to exist, but it does not: ${e}`);
}
}
};

const expectIngestPipelineExists = async (pipelineId: string) =>
expectIngestPipelineStatus(pipelineId, true);
const expectEntitiesIndexExists = async (entityType: string, namespace: string) =>
expectIndexStatus(`.entities.v1.latest.security_${entityType}_${namespace}`, true);

const expectIngestPipelineNotFound = async (pipelineId: string) =>
expectIngestPipelineStatus(pipelineId, false);
const expectEntitiesIndexNotFound = async (entityType: string, namespace: string) =>
expectIndexStatus(`.entities.v1.latest.security_${entityType}_${namespace}`, false);

return {
expectComponentTemplateExists,
Expand All @@ -112,6 +135,8 @@ export const elasticAssetCheckerFactory = (getService: FtrProviderContext['getSe
expectEnrichPolicyNotFound,
expectIngestPipelineExists,
expectIngestPipelineNotFound,
expectEntitiesIndexExists,
expectEntitiesIndexNotFound,
expectTransformExists,
expectTransformNotFound,
};
Expand Down
Loading

0 comments on commit 0e1b2a3

Please sign in to comment.