Skip to content

Commit

Permalink
[Fleet] Update Fleet's custom ingest pipeline names to avoid collisio…
Browse files Browse the repository at this point in the history
…ns + add descriptions to each pipeline (elastic#175448)

## Summary

Closes elastic#175254
Ref elastic#168019
Ref elastic#170270

In 8.12.0, Fleet unintentionally shipped a breaking change in
elastic#170270 for APM users who make use
of a custom `traces-apm` data stream. If a user had previously defined
this ingest pipeline to customize documents ingested for the
`traces-apm` data stream (defined
[here](https://github.com/elastic/integrations/blob/9a36183f0bd12e39a957d2f7bd65f3de4ee685b1/packages/apm/data_stream/traces/manifest.yml#L2-L3),
then they would unexpectedly see that pipeline called when documents
were ingested to the `traces-apm.rum` and `traces-apm.sampled`
datastreams as well.

This PR addresses this collision by adding a `.package` suffix to the
"package level" ingest pipeline introduced in 8.12.0.

So, in 8.12.0 a processor would be defined as such on the
`traces-apm.rum` or `traces-apm.sampled` ingest pipeline

```
{
  "pipeline": {
    "name": "traces-apm@custom",
    "ignore_missing_pipeline": true,
  }
},
```

This PR replaces the pipeline with one that looks as follows:

```
{
  "pipeline": {
    "name": "traces-apm.package@custom",
    "ignore_missing_pipeline": true,
    "description": "[Fleet] Pipeline for all data streams of type `traces` defined by the `apm` integration"
  }
},
```

**To be clear: this is a breaking change if you have defined the
`traces-apm@custom` integration on 8.12. In 8.12.1, it will no longer be
called for documents ingested to the `traces-apm`, `traces-apm.rum`, or
`traces-apm.sampled` data streams. You will need to rename your pipeline
to `traces-apm.package@custom` to preserve this behavior.**

This change also applies to `logs-elastic_agent.*` ingest pipelines. See
[this
comment](elastic#175254 (comment))
for more information.

There is still technically room for a collision, though it's unlikely,
if the data stream name is `package`. This will be handled by a package
spec validation proposed in
elastic/package-spec#699.

---------

Co-authored-by: Kibana Machine <[email protected]>
(cherry picked from commit 9fe5a66)
  • Loading branch information
kpollich committed Jan 25, 2024
1 parent 836d11c commit 1ad41db
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ processors:
});

expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"global@custom\\",\\"ignore_missing_pipeline\\":true}},{\\"pipeline\\":{\\"name\\":\\"logs@custom\\",\\"ignore_missing_pipeline\\":true}},{\\"pipeline\\":{\\"name\\":\\"logs-test@custom\\",\\"ignore_missing_pipeline\\":true}}]}"`
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"global@custom\\",\\"ignore_missing_pipeline\\":true,\\"description\\":\\"[Fleet] Global pipeline for all data streams\\"}},{\\"pipeline\\":{\\"name\\":\\"logs@custom\\",\\"ignore_missing_pipeline\\":true,\\"description\\":\\"[Fleet] Pipeline for all data streams of type \`logs\`\\"}},{\\"pipeline\\":{\\"name\\":\\"logs-test@custom\\",\\"ignore_missing_pipeline\\":true,\\"description\\":\\"[Fleet] Pipeline for the \`test\` dataset\\"}}]}"`
);
});

Expand Down Expand Up @@ -231,12 +231,15 @@ processors:
- pipeline:
name: global@custom
ignore_missing_pipeline: true
description: '[Fleet] Global pipeline for all data streams'
- pipeline:
name: logs@custom
ignore_missing_pipeline: true
description: '[Fleet] Pipeline for all data streams of type \`logs\`'
- pipeline:
name: logs-test.access@custom
ignore_missing_pipeline: true
description: '[Fleet] Pipeline for the \`test.access\` dataset'
- reroute:
tag: test.access
dataset: test.reroute
Expand Down Expand Up @@ -280,7 +283,7 @@ processors:
});

expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"global@custom\\",\\"ignore_missing_pipeline\\":true}},{\\"pipeline\\":{\\"name\\":\\"logs@custom\\",\\"ignore_missing_pipeline\\":true}},{\\"pipeline\\":{\\"name\\":\\"logs-test.access@custom\\",\\"ignore_missing_pipeline\\":true}},{\\"reroute\\":{\\"tag\\":\\"test.access\\",\\"dataset\\":\\"test.reroute\\",\\"namespace\\":\\"default\\",\\"if\\":\\"true == true\\"}}]}"`
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"global@custom\\",\\"ignore_missing_pipeline\\":true,\\"description\\":\\"[Fleet] Global pipeline for all data streams\\"}},{\\"pipeline\\":{\\"name\\":\\"logs@custom\\",\\"ignore_missing_pipeline\\":true,\\"description\\":\\"[Fleet] Pipeline for all data streams of type \`logs\`\\"}},{\\"pipeline\\":{\\"name\\":\\"logs-test.access@custom\\",\\"ignore_missing_pipeline\\":true,\\"description\\":\\"[Fleet] Pipeline for the \`test.access\` dataset\\"}},{\\"reroute\\":{\\"tag\\":\\"test.access\\",\\"dataset\\":\\"test.reroute\\",\\"namespace\\":\\"default\\",\\"if\\":\\"true == true\\"}}]}"`
);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,30 @@ export function addCustomPipelineAndLocalRoutingRulesProcessor(
pipeline.dataStream?.routing_rules?.find(
(rule) => rule.source_dataset === pipeline.dataStream?.dataset
)?.rules ?? [];

const customPipelineProcessors = [
{
pipeline: {
name: 'global@custom',
ignore_missing_pipeline: true,
description: '[Fleet] Global pipeline for all data streams',
},
},
{
pipeline: {
name: `${pipeline.dataStream.type}@custom`,
ignore_missing_pipeline: true,
description: `[Fleet] Pipeline for all data streams of type \`${pipeline.dataStream.type}\``,
},
},
...(pipeline.dataStream.package
? [
{
pipeline: {
name: `${pipeline.dataStream.type}-${pipeline.dataStream.package}@custom`,
// This pipeline name gets the `.integration` suffix to avoid conflicts with the pipeline name for the dataset below
name: `${pipeline.dataStream.type}-${pipeline.dataStream.package}.integration@custom`,
ignore_missing_pipeline: true,
description: `[Fleet] Pipeline for all data streams of type \`${pipeline.dataStream.type}\` defined by the \`${pipeline.dataStream.package}\` integration`,
},
},
]
Expand All @@ -107,6 +112,7 @@ export function addCustomPipelineAndLocalRoutingRulesProcessor(
pipeline: {
name: `${pipeline.dataStream.type}-${pipeline.dataStream.dataset}@custom`,
ignore_missing_pipeline: true,
description: `[Fleet] Pipeline for the \`${pipeline.dataStream.dataset}\` dataset`,
},
},
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ export default function (providerContext: FtrProviderContext) {
});

await es.ingest.putPipeline({
id: `logs-log@custom`,
id: `logs-log.integration@custom`,
processors: [
{
append: {
field: 'test',
value: ['logs-log'],
value: ['logs-log.integration'],
},
},
],
Expand Down Expand Up @@ -138,7 +138,7 @@ export default function (providerContext: FtrProviderContext) {
id: 'logs@custom',
}),
es.ingest.deletePipeline({
id: 'logs-log@custom',
id: 'logs-log.integration@custom',
}),
es.ingest.deletePipeline({
id: CUSTOM_PIPELINE,
Expand All @@ -158,7 +158,12 @@ export default function (providerContext: FtrProviderContext) {
id: res._id,
index: res._index,
});
expect(doc._source?.test).be.eql(['global', 'logs', 'logs-log', 'logs-log.log']);
expect(doc._source?.test).be.eql([
'global',
'logs',
'logs-log.integration',
'logs-log.log',
]);
});
});
});
Expand Down

0 comments on commit 1ad41db

Please sign in to comment.