From 88a37d2eccb1c8d8d5a62d401b40f2839bb52640 Mon Sep 17 00:00:00 2001 From: Chris Cowan Date: Thu, 21 Nov 2024 14:35:33 -0700 Subject: [PATCH 1/3] [Streams] Adding the first integration test --- .buildkite/ftr_oblt_stateful_configs.yml | 1 + .../streams/server/routes/streams/delete.ts | 1 + .../api_integration/apis/streams/config.ts | 16 ++ .../api_integration/apis/streams/full_flow.ts | 140 ++++++++++++++++++ .../apis/streams/helpers/cleanup.ts | 26 ++++ .../apis/streams/helpers/requests.ts | 43 ++++++ .../api_integration/apis/streams/index.ts | 14 ++ x-pack/test/tsconfig.json | 3 +- 8 files changed, 243 insertions(+), 1 deletion(-) create mode 100644 x-pack/test/api_integration/apis/streams/config.ts create mode 100644 x-pack/test/api_integration/apis/streams/full_flow.ts create mode 100644 x-pack/test/api_integration/apis/streams/helpers/cleanup.ts create mode 100644 x-pack/test/api_integration/apis/streams/helpers/requests.ts create mode 100644 x-pack/test/api_integration/apis/streams/index.ts diff --git a/.buildkite/ftr_oblt_stateful_configs.yml b/.buildkite/ftr_oblt_stateful_configs.yml index 7655ce6de38cf..c68a35504ca3f 100644 --- a/.buildkite/ftr_oblt_stateful_configs.yml +++ b/.buildkite/ftr_oblt_stateful_configs.yml @@ -32,6 +32,7 @@ enabled: - x-pack/test/api_integration/apis/synthetics/config.ts - x-pack/test/api_integration/apis/uptime/config.ts - x-pack/test/api_integration/apis/entity_manager/config.ts + - x-pack/test/api_integration/apis/streams/config.ts - x-pack/test/apm_api_integration/basic/config.ts - x-pack/test/apm_api_integration/cloud/config.ts - x-pack/test/apm_api_integration/rules/config.ts diff --git a/x-pack/plugins/streams/server/routes/streams/delete.ts b/x-pack/plugins/streams/server/routes/streams/delete.ts index 3820975dbe16a..53d62bc05ef3a 100644 --- a/x-pack/plugins/streams/server/routes/streams/delete.ts +++ b/x-pack/plugins/streams/server/routes/streams/delete.ts @@ -63,6 +63,7 @@ export const deleteStreamRoute = createServerRoute({ e instanceof ForkConditionMissing || e instanceof MalformedStreamId ) { + logger.error(e); return response.customError({ body: e, statusCode: 400 }); } diff --git a/x-pack/test/api_integration/apis/streams/config.ts b/x-pack/test/api_integration/apis/streams/config.ts new file mode 100644 index 0000000000000..c737db9499836 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/config.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FtrConfigProviderContext } from '@kbn/test'; + +export default async function ({ readConfigFile }: FtrConfigProviderContext) { + const baseIntegrationTestsConfig = await readConfigFile(require.resolve('../../config.ts')); + return { + ...baseIntegrationTestsConfig.getAll(), + testFiles: [require.resolve('.')], + }; +} diff --git a/x-pack/test/api_integration/apis/streams/full_flow.ts b/x-pack/test/api_integration/apis/streams/full_flow.ts new file mode 100644 index 0000000000000..fcb46010df8ba --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/full_flow.ts @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { + deleteStream, + enableStreams, + fetchDocument, + forkStream, + indexDocument, +} from './helpers/requests'; +import { FtrProviderContext } from '../../ftr_provider_context'; +import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; +import { cleanUpRootStream } from './helpers/cleanup'; + +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const esClient = getService('es'); + const retryService = getService('retry'); + const logger = getService('log'); + + describe('flow', () => { + after(async () => { + await deleteStream(supertest, 'logs.nginx'); + await cleanUpRootStream(esClient); + }); + + // Note: Each step is dependent on the previous + describe('full', () => { + it('Enable streams', async () => { + await enableStreams(supertest); + }); + + it('Index a JSON document to logs, should go to logs', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:00.000Z', + message: JSON.stringify({ + 'log.level': 'info', + 'log.logger': 'nginx', + message: 'test', + }), + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger }); + + const result = await fetchDocument(esClient, 'logs', response._id); + expect(result._index).to.match(/^\.ds\-logs-.*/); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:00.000Z', + message: 'test', + log: { level: 'info', logger: 'nginx' }, + }); + }); + + it('Fork logs to logs.nginx', async () => { + const body = { + stream: { + id: 'logs.nginx', + fields: [], + processing: [], + }, + condition: { + field: 'log.logger', + operator: 'eq', + value: 'nginx', + }, + }; + const response = await forkStream(supertest, 'logs', body); + expect(response).to.have.property('acknowledged', true); + }); + + it('Index an Nginx access log message, should goto logs.nginx', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: JSON.stringify({ + 'log.level': 'info', + 'log.logger': 'nginx', + message: 'test', + }), + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ esClient, indexName: 'logs.nginx', retryService, logger }); + + const result = await fetchDocument(esClient, 'logs.nginx', response._id); + expect(result._index).to.match(/^\.ds\-logs.nginx-.*/); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:10.000Z', + message: 'test', + log: { level: 'info', logger: 'nginx' }, + }); + }); + + it('Fork logs to logs.nginx.access', async () => { + const body = { + stream: { + id: 'logs.nginx.access', + fields: [], + processing: [], + }, + condition: { field: 'log.level', operator: 'eq', value: 'info' }, + }; + const response = await forkStream(supertest, 'logs.nginx', body); + expect(response).to.have.property('acknowledged', true); + }); + + it('Index an Nginx access log message, should goto logs.nginx.access', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:20.000Z', + message: JSON.stringify({ + 'log.level': 'info', + 'log.logger': 'nginx', + message: 'test', + }), + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs.nginx.access', + retryService, + logger, + }); + + const result = await fetchDocument(esClient, 'logs.nginx.access', response._id); + expect(result._index).to.match(/^\.ds\-logs.nginx.access-.*/); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:20.000Z', + message: 'test', + log: { level: 'info', logger: 'nginx' }, + }); + }); + }); + }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/cleanup.ts b/x-pack/test/api_integration/apis/streams/helpers/cleanup.ts new file mode 100644 index 0000000000000..f1d382031d484 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/helpers/cleanup.ts @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Client } from '@elastic/elasticsearch'; + +/** +DELETE .kibana_streams +DELETE _data_stream/logs +DELETE /_index_template/logs@stream +DELETE /_component_template/logs@stream.layer +DELETE /_ingest/pipeline/logs@json-pipeline +DELETE /_ingest/pipeline/logs@stream.processing +DELETE /_ingest/pipeline/logs@stream.reroutes +*/ + +export async function cleanUpRootStream(esClient: Client) { + await esClient.indices.delete({ index: '.kibana_streams' }); + await esClient.indices.deleteDataStream({ name: 'logs' }); + await esClient.indices.deleteIndexTemplate({ name: 'logs@stream' }); + await esClient.cluster.deleteComponentTemplate({ name: 'logs@stream.layer' }); + await esClient.ingest.deletePipeline({ id: 'logs@stream.*' }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/requests.ts b/x-pack/test/api_integration/apis/streams/helpers/requests.ts new file mode 100644 index 0000000000000..d44644e9746b1 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/helpers/requests.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { Client } from '@elastic/elasticsearch'; +import { JsonObject } from '@kbn/utility-types'; +import { Agent } from 'supertest'; +import expect from '@kbn/expect'; +import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; + +export async function enableStreams(supertest: Agent) { + const req = supertest.post('/api/streams/_enable').set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} + +export async function indexDocument(esClient: Client, index: string, document: JsonObject) { + const response = await esClient.index({ index, document }); + return response; +} + +export async function fetchDocument(esClient: Client, index: string, id: string) { + const query = { + ids: { values: [id] }, + }; + const response = await esClient.search({ index, query }); + expect((response.hits.total as SearchTotalHits).value).to.eql(1); + return response.hits.hits[0]; +} + +export async function forkStream(supertest: Agent, root: string, body: JsonObject) { + const req = supertest.post(`/api/streams/${root}/_fork`).set('kbn-xsrf', 'xxx'); + const response = await req.send(body).expect(200); + return response.body; +} + +export async function deleteStream(supertest: Agent, id: string) { + const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} diff --git a/x-pack/test/api_integration/apis/streams/index.ts b/x-pack/test/api_integration/apis/streams/index.ts new file mode 100644 index 0000000000000..0e879fd0b9b64 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/index.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { FtrProviderContext } from '../../ftr_provider_context'; + +export default function ({ loadTestFile }: FtrProviderContext) { + describe('Streams Endpoints', () => { + loadTestFile(require.resolve('./full_flow')); + }); +} diff --git a/x-pack/test/tsconfig.json b/x-pack/test/tsconfig.json index 2ba14ceb1218c..0db49e18d5e4b 100644 --- a/x-pack/test/tsconfig.json +++ b/x-pack/test/tsconfig.json @@ -187,6 +187,7 @@ "@kbn/alerting-types", "@kbn/ai-assistant-common", "@kbn/core-deprecations-common", - "@kbn/usage-collection-plugin" + "@kbn/usage-collection-plugin", + "@kbn/streams-plugin" ] } From 7bf4b318a614e01d9d25c2c95c9dc877bd992a78 Mon Sep 17 00:00:00 2001 From: Chris Cowan Date: Thu, 21 Nov 2024 14:40:46 -0700 Subject: [PATCH 2/3] clean up --- x-pack/plugins/streams/server/routes/streams/delete.ts | 1 - x-pack/test/api_integration/apis/streams/full_flow.ts | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/streams/server/routes/streams/delete.ts b/x-pack/plugins/streams/server/routes/streams/delete.ts index 53d62bc05ef3a..3820975dbe16a 100644 --- a/x-pack/plugins/streams/server/routes/streams/delete.ts +++ b/x-pack/plugins/streams/server/routes/streams/delete.ts @@ -63,7 +63,6 @@ export const deleteStreamRoute = createServerRoute({ e instanceof ForkConditionMissing || e instanceof MalformedStreamId ) { - logger.error(e); return response.customError({ body: e, statusCode: 400 }); } diff --git a/x-pack/test/api_integration/apis/streams/full_flow.ts b/x-pack/test/api_integration/apis/streams/full_flow.ts index fcb46010df8ba..03c0cc9e0e219 100644 --- a/x-pack/test/api_integration/apis/streams/full_flow.ts +++ b/x-pack/test/api_integration/apis/streams/full_flow.ts @@ -23,14 +23,14 @@ export default function ({ getService }: FtrProviderContext) { const retryService = getService('retry'); const logger = getService('log'); - describe('flow', () => { + describe('Basic functionality', () => { after(async () => { await deleteStream(supertest, 'logs.nginx'); await cleanUpRootStream(esClient); }); // Note: Each step is dependent on the previous - describe('full', () => { + describe('Full flow', () => { it('Enable streams', async () => { await enableStreams(supertest); }); From 45ed6b2ccec29d76078d5247013d11bdc2b70556 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:05:21 +0000 Subject: [PATCH 3/3] [CI] Auto-commit changed files from 'node scripts/lint_ts_projects --fix' --- x-pack/test/tsconfig.json | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/test/tsconfig.json b/x-pack/test/tsconfig.json index 0db49e18d5e4b..9db41aecbb612 100644 --- a/x-pack/test/tsconfig.json +++ b/x-pack/test/tsconfig.json @@ -188,6 +188,5 @@ "@kbn/ai-assistant-common", "@kbn/core-deprecations-common", "@kbn/usage-collection-plugin", - "@kbn/streams-plugin" ] }