Skip to content

Commit

Permalink
[8.x] 🌊 Play nice with ES (#200253) (#202516)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `8.x`:
- [🌊 Play nice with ES
(#200253)](#200253)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Joe
Reuter","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-11-26T06:46:36Z","message":"🌊
Play nice with ES (#200253)\n\nThis PR implements two changes:\r\n* When
syncing a stream, try to PUT the current mappings to the data\r\nstream
- if this fails with `illegal_argument_exception`, do a
rollover\r\ninstead. This is similar to how fleet handles this
situation\r\n* Before accessing streams, check whether the current user
can read the\r\ncurrent data stream and only return it if this is the
case. Users with\r\npartial read access will only see a partial tree.
This doesn't apply to\r\nwriting changes as the user needs to be able to
change index templates,\r\npipelines and so on which requires admin
privileges anyway\r\n\r\n---------\r\n\r\nCo-authored-by: kibanamachine
<[email protected]>","sha":"8e671728614012a5f5c5efc68975d9c0139ccfb7","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","v9.0.0","backport:prev-minor","v8.18.0"],"title":"🌊
Play nice with
ES","number":200253,"url":"https://github.com/elastic/kibana/pull/200253","mergeCommit":{"message":"🌊
Play nice with ES (#200253)\n\nThis PR implements two changes:\r\n* When
syncing a stream, try to PUT the current mappings to the data\r\nstream
- if this fails with `illegal_argument_exception`, do a
rollover\r\ninstead. This is similar to how fleet handles this
situation\r\n* Before accessing streams, check whether the current user
can read the\r\ncurrent data stream and only return it if this is the
case. Users with\r\npartial read access will only see a partial tree.
This doesn't apply to\r\nwriting changes as the user needs to be able to
change index templates,\r\npipelines and so on which requires admin
privileges anyway\r\n\r\n---------\r\n\r\nCo-authored-by: kibanamachine
<[email protected]>","sha":"8e671728614012a5f5c5efc68975d9c0139ccfb7"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/200253","number":200253,"mergeCommit":{"message":"🌊
Play nice with ES (#200253)\n\nThis PR implements two changes:\r\n* When
syncing a stream, try to PUT the current mappings to the data\r\nstream
- if this fails with `illegal_argument_exception`, do a
rollover\r\ninstead. This is similar to how fleet handles this
situation\r\n* Before accessing streams, check whether the current user
can read the\r\ncurrent data stream and only return it if this is the
case. Users with\r\npartial read access will only see a partial tree.
This doesn't apply to\r\nwriting changes as the user needs to be able to
change index templates,\r\npipelines and so on which requires admin
privileges anyway\r\n\r\n---------\r\n\r\nCo-authored-by: kibanamachine
<[email protected]>","sha":"8e671728614012a5f5c5efc68975d9c0139ccfb7"}},{"branch":"8.x","label":"v8.18.0","branchLabelMappingKey":"^v8.18.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Joe Reuter <[email protected]>
  • Loading branch information
kibanamachine and flash1293 authored Dec 2, 2024
1 parent a07583b commit 8e036ab
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
import { retryTransientEsErrors } from '../helpers/retry';

interface DataStreamManagementOptions {
Expand All @@ -23,6 +24,7 @@ interface DeleteDataStreamOptions {
interface RolloverDataStreamOptions {
esClient: ElasticsearchClient;
name: string;
mappings: MappingTypeMapping['properties'] | undefined;
logger: Logger;
}

Expand Down Expand Up @@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({
esClient,
name,
logger,
mappings,
}: RolloverDataStreamOptions) {
const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` });
for (const dataStream of dataStreams.data_streams) {
const currentMappings =
Object.values(
await esClient.indices.getMapping({
index: dataStream.indices.at(-1)?.index_name,
})
)[0].mappings.properties || {};
const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name });
const simulatedMappings = simulatedIndex.template.mappings.properties || {};

// check whether the same fields and same types are listed (don't check for other mapping attributes)
const isDifferent =
Object.values(simulatedMappings).length !== Object.values(currentMappings).length ||
Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => {
const currentType = currentMappings[fieldName]?.type;
return currentType !== type;
});

if (!isDifferent) {
const writeIndex = dataStream.indices.at(-1);
if (!writeIndex) {
continue;
}

try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
await retryTransientEsErrors(
() => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }),
{
logger,
}
);
} catch (error: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
if (
typeof error.message !== 'string' ||
!error.message.includes('illegal_argument_exception')
) {
throw error;
}
try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
} catch (rolloverError: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
}
}
}
}
27 changes: 26 additions & 1 deletion x-pack/plugins/streams/server/lib/streams/stream_crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export async function listStreams({

interface ReadStreamParams extends BaseParams {
id: string;
skipAccessCheck?: boolean;
}

export interface ReadStreamResponse {
Expand All @@ -121,13 +122,20 @@ export interface ReadStreamResponse {
export async function readStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams): Promise<ReadStreamResponse> {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
index: STREAMS_INDEX,
});
const definition = response._source as StreamDefinition;
if (!skipAccessCheck) {
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return {
definition,
};
Expand Down Expand Up @@ -249,6 +257,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP
}
}

interface CheckReadAccessParams extends BaseParams {
id: string;
}

export async function checkReadAccess({
id,
scopedClusterClient,
}: CheckReadAccessParams): Promise<boolean> {
try {
return await scopedClusterClient.asCurrentUser.indices.exists({ index: id });
} catch (e) {
return false;
}
}

interface SyncStreamParams {
scopedClusterClient: IScopedClusterClient;
definition: StreamDefinition;
Expand All @@ -262,10 +285,11 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
const componentTemplate = generateLayer(definition.id, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
logger,
component: generateLayer(definition.id, definition),
component: componentTemplate,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
Expand Down Expand Up @@ -308,5 +332,6 @@ export async function syncStream({
esClient: scopedClusterClient.asCurrentUser,
name: definition.id,
logger,
mappings: componentTemplate.template.mappings?.properties,
});
}
37 changes: 15 additions & 22 deletions x-pack/plugins/streams/server/routes/streams/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,25 @@ export interface StreamTree {
children: StreamTree[];
}

function asTrees(definitions: StreamDefinition[]): StreamTree[] {
const nodes = new Map<string, StreamTree>();
function asTrees(definitions: Array<{ id: string }>) {
const trees: StreamTree[] = [];
const ids = definitions.map((definition) => definition.id);

const rootNodes = new Set<StreamTree>();
ids.sort((a, b) => a.split('.').length - b.split('.').length);

function getNode(id: string) {
let node = nodes.get(id);
if (!node) {
node = { id, children: [] };
nodes.set(id, node);
ids.forEach((id) => {
let currentTree = trees;
let existingNode: StreamTree | undefined;
// traverse the tree following the prefix of the current id.
// once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth
while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) {
currentTree = existingNode.children;
}
return node;
}

definitions.forEach((definition) => {
const path = definition.id.split('.');
const parentId = path.slice(0, path.length - 1).join('.');
const parentNode = parentId.length ? getNode(parentId) : undefined;
const selfNode = getNode(definition.id);

if (parentNode) {
parentNode.children.push(selfNode);
} else {
rootNodes.add(selfNode);
if (!existingNode) {
const newNode = { id, children: [] };
currentTree.push(newNode);
}
});

return Array.from(rootNodes.values());
return trees;
}

0 comments on commit 8e036ab

Please sign in to comment.