Skip to content

Commit

Permalink
[Streams] Adding the first integration test (elastic#201293)
Browse files Browse the repository at this point in the history
## Summary

This PR introduces the first integration test for the Streams project.
This test covers the following basic functionality:

- Enable streams
- Index a document to `logs`
- Create a `logs.nginx` for that reroutes based on `log.logger ==
'nginx'`
- Index a document to `logs.nginx`
- Create a `logs.nginx.access` that reroutes based on `log.level ==
'info'`
- Index a document to `log.nginx.access`

---------

Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Elastic Machine <[email protected]>
Co-authored-by: Joe Reuter <[email protected]>
(cherry picked from commit aebd13e)
  • Loading branch information
simianhacker committed Dec 3, 2024
1 parent 5716092 commit 97a702a
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 0 deletions.
1 change: 1 addition & 0 deletions .buildkite/ftr_oblt_stateful_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ enabled:
- x-pack/test/api_integration/apis/synthetics/config.ts
- x-pack/test/api_integration/apis/uptime/config.ts
- x-pack/test/api_integration/apis/entity_manager/config.ts
- x-pack/test/api_integration/apis/streams/config.ts
- x-pack/test/apm_api_integration/basic/config.ts
- x-pack/test/apm_api_integration/cloud/config.ts
- x-pack/test/apm_api_integration/rules/config.ts
Expand Down
16 changes: 16 additions & 0 deletions x-pack/test/api_integration/apis/streams/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { FtrConfigProviderContext } from '@kbn/test';

export default async function ({ readConfigFile }: FtrConfigProviderContext) {
const baseIntegrationTestsConfig = await readConfigFile(require.resolve('../../config.ts'));
return {
...baseIntegrationTestsConfig.getAll(),
testFiles: [require.resolve('.')],
};
}
140 changes: 140 additions & 0 deletions x-pack/test/api_integration/apis/streams/full_flow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import {
deleteStream,
enableStreams,
fetchDocument,
forkStream,
indexDocument,
} from './helpers/requests';
import { FtrProviderContext } from '../../ftr_provider_context';
import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers';
import { cleanUpRootStream } from './helpers/cleanup';

export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esClient = getService('es');
const retryService = getService('retry');
const logger = getService('log');

describe('Basic functionality', () => {
after(async () => {
await deleteStream(supertest, 'logs.nginx');
await cleanUpRootStream(esClient);
});

// Note: Each step is dependent on the previous
describe('Full flow', () => {
it('Enable streams', async () => {
await enableStreams(supertest);
});

it('Index a JSON document to logs, should go to logs', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:00.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger });

const result = await fetchDocument(esClient, 'logs', response._id);
expect(result._index).to.match(/^\.ds\-logs-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:00.000Z',
message: 'test',
log: { level: 'info', logger: 'nginx' },
});
});

it('Fork logs to logs.nginx', async () => {
const body = {
stream: {
id: 'logs.nginx',
fields: [],
processing: [],
},
condition: {
field: 'log.logger',
operator: 'eq',
value: 'nginx',
},
};
const response = await forkStream(supertest, 'logs', body);
expect(response).to.have.property('acknowledged', true);
});

it('Index an Nginx access log message, should goto logs.nginx', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:10.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({ esClient, indexName: 'logs.nginx', retryService, logger });

const result = await fetchDocument(esClient, 'logs.nginx', response._id);
expect(result._index).to.match(/^\.ds\-logs.nginx-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:10.000Z',
message: 'test',
log: { level: 'info', logger: 'nginx' },
});
});

it('Fork logs to logs.nginx.access', async () => {
const body = {
stream: {
id: 'logs.nginx.access',
fields: [],
processing: [],
},
condition: { field: 'log.level', operator: 'eq', value: 'info' },
};
const response = await forkStream(supertest, 'logs.nginx', body);
expect(response).to.have.property('acknowledged', true);
});

it('Index an Nginx access log message, should goto logs.nginx.access', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:20.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({
esClient,
indexName: 'logs.nginx.access',
retryService,
logger,
});

const result = await fetchDocument(esClient, 'logs.nginx.access', response._id);
expect(result._index).to.match(/^\.ds\-logs.nginx.access-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:20.000Z',
message: 'test',
log: { level: 'info', logger: 'nginx' },
});
});
});
});
}
26 changes: 26 additions & 0 deletions x-pack/test/api_integration/apis/streams/helpers/cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Client } from '@elastic/elasticsearch';

/**
DELETE .kibana_streams
DELETE _data_stream/logs
DELETE /_index_template/logs@stream
DELETE /_component_template/[email protected]
DELETE /_ingest/pipeline/logs@json-pipeline
DELETE /_ingest/pipeline/[email protected]
DELETE /_ingest/pipeline/[email protected]
*/

export async function cleanUpRootStream(esClient: Client) {
await esClient.indices.delete({ index: '.kibana_streams' });
await esClient.indices.deleteDataStream({ name: 'logs' });
await esClient.indices.deleteIndexTemplate({ name: 'logs@stream' });
await esClient.cluster.deleteComponentTemplate({ name: '[email protected]' });
await esClient.ingest.deletePipeline({ id: 'logs@stream.*' });
}
43 changes: 43 additions & 0 deletions x-pack/test/api_integration/apis/streams/helpers/requests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import { JsonObject } from '@kbn/utility-types';
import { Agent } from 'supertest';
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';

export async function enableStreams(supertest: Agent) {
const req = supertest.post('/api/streams/_enable').set('kbn-xsrf', 'xxx');
const response = await req.send().expect(200);
return response.body;
}

export async function indexDocument(esClient: Client, index: string, document: JsonObject) {
const response = await esClient.index({ index, document });
return response;
}

export async function fetchDocument(esClient: Client, index: string, id: string) {
const query = {
ids: { values: [id] },
};
const response = await esClient.search({ index, query });
expect((response.hits.total as SearchTotalHits).value).to.eql(1);
return response.hits.hits[0];
}

export async function forkStream(supertest: Agent, root: string, body: JsonObject) {
const req = supertest.post(`/api/streams/${root}/_fork`).set('kbn-xsrf', 'xxx');
const response = await req.send(body).expect(200);
return response.body;
}

export async function deleteStream(supertest: Agent, id: string) {
const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx');
const response = await req.send().expect(200);
return response.body;
}
14 changes: 14 additions & 0 deletions x-pack/test/api_integration/apis/streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { FtrProviderContext } from '../../ftr_provider_context';

export default function ({ loadTestFile }: FtrProviderContext) {
describe('Streams Endpoints', () => {
loadTestFile(require.resolve('./full_flow'));
});
}

0 comments on commit 97a702a

Please sign in to comment.