Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Automatic Import] Add support for handling unstructured syslog samples #192817

Merged
merged 5 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const unstructuredLogState = {
lastExecutedChain: 'testchain',
packageName: 'testPackage',
dataStreamName: 'testDatastream',
grokPatterns: ['%{GREEDYDATA:message}'],
logSamples: ['dummy data'],
jsonSamples: ['{"message":"dummy data"}'],
finalized: false,
ecsVersion: 'testVersion',
errors: { test: 'testerror' },
additionalProcessors: [],
};

export const unstructuredLogResponse = {
grok_patterns: [
'####<%{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} (?:AM|PM) %{WORD:timezone}> <%{WORD:log_level}> <%{WORD:component}> <%{DATA:hostname}> <%{DATA:server_name}> <%{DATA:thread_info}> <%{DATA:user}> <%{DATA:empty_field}> <%{DATA:empty_field2}> <%{NUMBER:timestamp}> <%{DATA:message_id}> <%{GREEDYDATA:message}>',
],
};
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function handleKVError({

return {
kvProcessor,
lastExecutedChain: 'kv_error',
lastExecutedChain: 'kvError',
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ describe('Testing kv header', () => {
expect(response.grokPattern).toStrictEqual(
'<%{NUMBER:priority}>%{NUMBER:version} %{GREEDYDATA:message}'
);
expect(response.lastExecutedChain).toBe('kv_header');
expect(response.lastExecutedChain).toBe('kvHeader');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ export async function handleHeader({

return {
grokPattern: pattern.grok_pattern,
lastExecutedChain: 'kv_header',
lastExecutedChain: 'kvHeader',
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export async function handleKVValidate({
)) as { pipelineResults: KVResult[]; errors: object[] };

if (errors.length > 0) {
return { errors, lastExecutedChain: 'kv_validate' };
return { errors, lastExecutedChain: 'kvValidate' };
}

// Converts JSON Object into a string and parses it as a array of JSON strings
Expand All @@ -56,7 +56,7 @@ export async function handleKVValidate({
jsonSamples,
additionalProcessors,
errors: [],
lastExecutedChain: 'kv_validate',
lastExecutedChain: 'kvValidate',
};
}

Expand All @@ -65,7 +65,7 @@ export async function handleHeaderValidate({
client,
}: HandleKVNodeParams): Promise<Partial<KVState>> {
const grokPattern = state.grokPattern;
const grokProcessor = createGrokProcessor(grokPattern);
const grokProcessor = createGrokProcessor([grokPattern]);
const pipeline = { processors: grokProcessor, on_failure: [onFailure] };

const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ESProcessorItem, SamplesFormat } from '../../../common';
import { getKVGraph } from '../kv/graph';
import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types';
import { LogFormat } from '../../constants';
import { getUnstructuredGraph } from '../unstructured/graph';

const graphState: StateGraphArgs<LogFormatDetectionState>['channels'] = {
lastExecutedChain: {
Expand Down Expand Up @@ -90,9 +91,9 @@ function logFormatRouter({ state }: LogDetectionBaseNodeParams): string {
if (state.samplesFormat.name === LogFormat.STRUCTURED) {
return 'structured';
}
// if (state.samplesFormat === LogFormat.UNSTRUCTURED) {
// return 'unstructured';
// }
if (state.samplesFormat.name === LogFormat.UNSTRUCTURED) {
return 'unstructured';
}
// if (state.samplesFormat === LogFormat.CSV) {
// return 'csv';
// }
Expand All @@ -109,18 +110,19 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection
handleLogFormatDetection({ state, model })
)
.addNode('handleKVGraph', await getKVGraph({ model, client }))
// .addNode('handleUnstructuredGraph', (state: LogFormatDetectionState) => getCompiledUnstructuredGraph({state, model}))
.addNode('handleUnstructuredGraph', await getUnstructuredGraph({ model, client }))
// .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model}))
.addEdge(START, 'modelInput')
.addEdge('modelInput', 'handleLogFormatDetection')
.addEdge('handleKVGraph', 'modelOutput')
.addEdge('handleUnstructuredGraph', 'modelOutput')
.addEdge('modelOutput', END)
.addConditionalEdges(
'handleLogFormatDetection',
(state: LogFormatDetectionState) => logFormatRouter({ state }),
{
structured: 'handleKVGraph',
// unstructured: 'handleUnstructuredGraph',
unstructured: 'handleUnstructuredGraph',
// csv: 'handleCsvGraph',
unsupported: 'modelOutput',
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const GROK_EXAMPLE_ANSWER = {
rfc: 'RFC2454',
regex:
'/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/',
grok_patterns: ['%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}'],
};

export const GROK_ERROR_EXAMPLE_ANSWER = {
grok_patterns: [
'%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
],
};

export const onFailure = {
append: {
field: 'error.message',
value:
'{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}',
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import type { UnstructuredLogState } from '../../types';
import type { HandleUnstructuredNodeParams } from './types';
import { GROK_ERROR_PROMPT } from './prompts';
import { GROK_ERROR_EXAMPLE_ANSWER } from './constants';

export async function handleUnstructuredError({
state,
model,
}: HandleUnstructuredNodeParams): Promise<Partial<UnstructuredLogState>> {
const outputParser = new JsonOutputParser();
const grokErrorGraph = GROK_ERROR_PROMPT.pipe(model).pipe(outputParser);
const currentPatterns = state.grokPatterns;

const pattern = await grokErrorGraph.invoke({
current_pattern: JSON.stringify(currentPatterns, null, 2),
errors: JSON.stringify(state.errors, null, 2),
ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2),
});

return {
grokPatterns: pattern.grok_patterns,
lastExecutedChain: 'unstructuredError',
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { FakeLLM } from '@langchain/core/utils/testing';
import { handleUnstructuredError } from './error';
import type { UnstructuredLogState } from '../../types';
import {
unstructuredLogState,
unstructuredLogResponse,
} from '../../../__jest__/fixtures/unstructured';
import {
ActionsClientChatOpenAI,
ActionsClientSimpleChatModel,
} from '@kbn/langchain/server/language_models';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';

const model = new FakeLLM({
response: JSON.stringify(unstructuredLogResponse, null, 2),
}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel;

const state: UnstructuredLogState = unstructuredLogState;

describe('Testing unstructured error handling node', () => {
const client = {
asCurrentUser: {
ingest: {
simulate: jest.fn(),
},
},
} as unknown as IScopedClusterClient;
it('handleUnstructuredError()', async () => {
const response = await handleUnstructuredError({ state, model, client });
expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns);
expect(response.lastExecutedChain).toBe('unstructuredError');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
ActionsClientChatOpenAI,
ActionsClientSimpleChatModel,
} from '@kbn/langchain/server/language_models';
import { FakeLLM } from '@langchain/core/utils/testing';
import { getUnstructuredGraph } from './graph';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';

const model = new FakeLLM({
response: '{"log_type": "structured"}',
}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel;

describe('UnstructuredGraph', () => {
const client = {
asCurrentUser: {
ingest: {
simulate: jest.fn(),
},
},
} as unknown as IScopedClusterClient;
describe('Compiling and Running', () => {
it('Ensures that the graph compiles', async () => {
// When getUnstructuredGraph runs, langgraph compiles the graph it will error if the graph has any issues.
// Common issues for example detecting a node has no next step, or there is a infinite loop between them.
try {
await getUnstructuredGraph({ model, client });
} catch (error) {
fail(`getUnstructuredGraph threw an error: ${error}`);
}
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { StateGraphArgs } from '@langchain/langgraph';
import { StateGraph, END, START } from '@langchain/langgraph';
import type { UnstructuredLogState } from '../../types';
import { handleUnstructured } from './unstructured';
import type { UnstructuredGraphParams, UnstructuredBaseNodeParams } from './types';
import { handleUnstructuredError } from './error';
import { handleUnstructuredValidate } from './validate';

const graphState: StateGraphArgs<UnstructuredLogState>['channels'] = {
lastExecutedChain: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
packageName: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
dataStreamName: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
logSamples: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
},
grokPatterns: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
},
jsonSamples: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
},
finalized: {
value: (x: boolean, y?: boolean) => y ?? x,
default: () => false,
},
errors: {
value: (x: object, y?: object) => y ?? x,
default: () => [],
},
additionalProcessors: {
value: (x: object[], y?: object[]) => y ?? x,
default: () => [],
},
ecsVersion: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
};

function modelInput({ state }: UnstructuredBaseNodeParams): Partial<UnstructuredLogState> {
return {
finalized: false,
lastExecutedChain: 'modelInput',
};
}

function modelOutput({ state }: UnstructuredBaseNodeParams): Partial<UnstructuredLogState> {
return {
finalized: true,
additionalProcessors: state.additionalProcessors,
lastExecutedChain: 'modelOutput',
};
}

function validationRouter({ state }: UnstructuredBaseNodeParams): string {
if (Object.keys(state.errors).length === 0) {
return 'modelOutput';
}
return 'handleUnstructuredError';
}

export async function getUnstructuredGraph({ model, client }: UnstructuredGraphParams) {
const workflow = new StateGraph({
channels: graphState,
})
.addNode('modelInput', (state: UnstructuredLogState) => modelInput({ state }))
.addNode('modelOutput', (state: UnstructuredLogState) => modelOutput({ state }))
.addNode('handleUnstructuredError', (state: UnstructuredLogState) =>
handleUnstructuredError({ state, model, client })
)
.addNode('handleUnstructured', (state: UnstructuredLogState) =>
handleUnstructured({ state, model, client })
)
.addNode('handleUnstructuredValidate', (state: UnstructuredLogState) =>
handleUnstructuredValidate({ state, model, client })
)
.addEdge(START, 'modelInput')
.addEdge('modelInput', 'handleUnstructured')
.addEdge('handleUnstructured', 'handleUnstructuredValidate')
.addConditionalEdges(
'handleUnstructuredValidate',
(state: UnstructuredLogState) => validationRouter({ state }),
{
handleUnstructuredError: 'handleUnstructuredError',
modelOutput: 'modelOutput',
}
)
.addEdge('handleUnstructuredError', 'handleUnstructuredValidate')
.addEdge('modelOutput', END);

const compiledUnstructuredGraph = workflow.compile();
return compiledUnstructuredGraph;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { getUnstructuredGraph } from './graph';
Loading