Skip to content

Commit

Permalink
🌊 Play nice with ES (elastic#200253)
Browse files Browse the repository at this point in the history
This PR implements two changes:
* When syncing a stream, try to PUT the current mappings to the data
stream - if this fails with `illegal_argument_exception`, do a rollover
instead. This is similar to how fleet handles this situation
* Before accessing streams, check whether the current user can read the
current data stream and only return it if this is the case. Users with
partial read access will only see a partial tree. This doesn't apply to
writing changes as the user needs to be able to change index templates,
pipelines and so on which requires admin privileges anyway

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
2 people authored and CAWilson94 committed Dec 12, 2024
1 parent c408cef commit e925464
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
import { retryTransientEsErrors } from '../helpers/retry';

interface DataStreamManagementOptions {
Expand All @@ -23,6 +24,7 @@ interface DeleteDataStreamOptions {
interface RolloverDataStreamOptions {
esClient: ElasticsearchClient;
name: string;
mappings: MappingTypeMapping['properties'] | undefined;
logger: Logger;
}

Expand Down Expand Up @@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({
esClient,
name,
logger,
mappings,
}: RolloverDataStreamOptions) {
const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` });
for (const dataStream of dataStreams.data_streams) {
const currentMappings =
Object.values(
await esClient.indices.getMapping({
index: dataStream.indices.at(-1)?.index_name,
})
)[0].mappings.properties || {};
const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name });
const simulatedMappings = simulatedIndex.template.mappings.properties || {};

// check whether the same fields and same types are listed (don't check for other mapping attributes)
const isDifferent =
Object.values(simulatedMappings).length !== Object.values(currentMappings).length ||
Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => {
const currentType = currentMappings[fieldName]?.type;
return currentType !== type;
});

if (!isDifferent) {
const writeIndex = dataStream.indices.at(-1);
if (!writeIndex) {
continue;
}

try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
await retryTransientEsErrors(
() => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }),
{
logger,
}
);
} catch (error: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
if (
typeof error.message !== 'string' ||
!error.message.includes('illegal_argument_exception')
) {
throw error;
}
try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
} catch (rolloverError: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
}
}
}
}
27 changes: 26 additions & 1 deletion x-pack/plugins/streams/server/lib/streams/stream_crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export async function listStreams({

interface ReadStreamParams extends BaseParams {
id: string;
skipAccessCheck?: boolean;
}

export interface ReadStreamResponse {
Expand All @@ -121,13 +122,20 @@ export interface ReadStreamResponse {
export async function readStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams): Promise<ReadStreamResponse> {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
index: STREAMS_INDEX,
});
const definition = response._source as StreamDefinition;
if (!skipAccessCheck) {
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return {
definition,
};
Expand Down Expand Up @@ -249,6 +257,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP
}
}

interface CheckReadAccessParams extends BaseParams {
id: string;
}

export async function checkReadAccess({
id,
scopedClusterClient,
}: CheckReadAccessParams): Promise<boolean> {
try {
return await scopedClusterClient.asCurrentUser.indices.exists({ index: id });
} catch (e) {
return false;
}
}

interface SyncStreamParams {
scopedClusterClient: IScopedClusterClient;
definition: StreamDefinition;
Expand All @@ -262,10 +285,11 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
const componentTemplate = generateLayer(definition.id, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
logger,
component: generateLayer(definition.id, definition),
component: componentTemplate,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
Expand Down Expand Up @@ -308,5 +332,6 @@ export async function syncStream({
esClient: scopedClusterClient.asCurrentUser,
name: definition.id,
logger,
mappings: componentTemplate.template.mappings?.properties,
});
}
37 changes: 15 additions & 22 deletions x-pack/plugins/streams/server/routes/streams/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,25 @@ export interface StreamTree {
children: StreamTree[];
}

function asTrees(definitions: StreamDefinition[]): StreamTree[] {
const nodes = new Map<string, StreamTree>();
function asTrees(definitions: Array<{ id: string }>) {
const trees: StreamTree[] = [];
const ids = definitions.map((definition) => definition.id);

const rootNodes = new Set<StreamTree>();
ids.sort((a, b) => a.split('.').length - b.split('.').length);

function getNode(id: string) {
let node = nodes.get(id);
if (!node) {
node = { id, children: [] };
nodes.set(id, node);
ids.forEach((id) => {
let currentTree = trees;
let existingNode: StreamTree | undefined;
// traverse the tree following the prefix of the current id.
// once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth
while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) {
currentTree = existingNode.children;
}
return node;
}

definitions.forEach((definition) => {
const path = definition.id.split('.');
const parentId = path.slice(0, path.length - 1).join('.');
const parentNode = parentId.length ? getNode(parentId) : undefined;
const selfNode = getNode(definition.id);

if (parentNode) {
parentNode.children.push(selfNode);
} else {
rootNodes.add(selfNode);
if (!existingNode) {
const newNode = { id, children: [] };
currentTree.push(newNode);
}
});

return Array.from(rootNodes.values());
return trees;
}

0 comments on commit e925464

Please sign in to comment.