diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts new file mode 100644 index 0000000000000..113ef4d37c073 --- /dev/null +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/unstructured.ts @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const unstructuredLogState = { + lastExecutedChain: 'testchain', + packageName: 'testPackage', + dataStreamName: 'testDatastream', + grokPatterns: ['%{GREEDYDATA:message}'], + logSamples: ['dummy data'], + jsonSamples: ['{"message":"dummy data"}'], + finalized: false, + ecsVersion: 'testVersion', + errors: { test: 'testerror' }, + additionalProcessors: [], +}; + +export const unstructuredLogResponse = { + grok_patterns: [ + '####<%{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} (?:AM|PM) %{WORD:timezone}> <%{WORD:log_level}> <%{WORD:component}> <%{DATA:hostname}> <%{DATA:server_name}> <%{DATA:thread_info}> <%{DATA:user}> <%{DATA:empty_field}> <%{DATA:empty_field2}> <%{NUMBER:timestamp}> <%{DATA:message_id}> <%{GREEDYDATA:message}>', + ], +}; diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts index b6d64ee4f615d..b1b7c12a68d5a 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts @@ -33,7 +33,7 @@ export async function handleKVError({ return { kvProcessor, - lastExecutedChain: 'kv_error', + lastExecutedChain: 'kvError', }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts index 7991484024713..353384361d2da 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts @@ -43,6 +43,6 @@ describe('Testing kv header', () => { expect(response.grokPattern).toStrictEqual( '<%{NUMBER:priority}>%{NUMBER:version} %{GREEDYDATA:message}' ); - expect(response.lastExecutedChain).toBe('kv_header'); + expect(response.lastExecutedChain).toBe('kvHeader'); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts index 473eae1516112..36d8968ab9e67 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts @@ -26,6 +26,6 @@ export async function handleHeader({ return { grokPattern: pattern.grok_pattern, - lastExecutedChain: 'kv_header', + lastExecutedChain: 'kvHeader', }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts index 0bca2ac3fd5e4..b0601de74aa5e 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts @@ -41,7 +41,7 @@ export async function handleKVValidate({ )) as { pipelineResults: KVResult[]; errors: object[] }; if (errors.length > 0) { - return { errors, lastExecutedChain: 'kv_validate' }; + return { errors, lastExecutedChain: 'kvValidate' }; } // Converts JSON Object into a string and parses it as a array of JSON strings @@ -56,7 +56,7 @@ export async function handleKVValidate({ jsonSamples, additionalProcessors, errors: [], - lastExecutedChain: 'kv_validate', + lastExecutedChain: 'kvValidate', }; } @@ -65,7 +65,7 @@ export async function handleHeaderValidate({ client, }: HandleKVNodeParams): Promise> { const grokPattern = state.grokPattern; - const grokProcessor = createGrokProcessor(grokPattern); + const grokProcessor = createGrokProcessor([grokPattern]); const pipeline = { processors: grokProcessor, on_failure: [onFailure] }; const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as { diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts index 5ef894bf64a20..b1cdecd39fe69 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts @@ -14,6 +14,7 @@ import { ESProcessorItem, SamplesFormat } from '../../../common'; import { getKVGraph } from '../kv/graph'; import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types'; import { LogFormat } from '../../constants'; +import { getUnstructuredGraph } from '../unstructured/graph'; const graphState: StateGraphArgs['channels'] = { lastExecutedChain: { @@ -90,9 +91,9 @@ function logFormatRouter({ state }: LogDetectionBaseNodeParams): string { if (state.samplesFormat.name === LogFormat.STRUCTURED) { return 'structured'; } - // if (state.samplesFormat === LogFormat.UNSTRUCTURED) { - // return 'unstructured'; - // } + if (state.samplesFormat.name === LogFormat.UNSTRUCTURED) { + return 'unstructured'; + } // if (state.samplesFormat === LogFormat.CSV) { // return 'csv'; // } @@ -109,18 +110,19 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection handleLogFormatDetection({ state, model }) ) .addNode('handleKVGraph', await getKVGraph({ model, client })) - // .addNode('handleUnstructuredGraph', (state: LogFormatDetectionState) => getCompiledUnstructuredGraph({state, model})) + .addNode('handleUnstructuredGraph', await getUnstructuredGraph({ model, client })) // .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model})) .addEdge(START, 'modelInput') .addEdge('modelInput', 'handleLogFormatDetection') .addEdge('handleKVGraph', 'modelOutput') + .addEdge('handleUnstructuredGraph', 'modelOutput') .addEdge('modelOutput', END) .addConditionalEdges( 'handleLogFormatDetection', (state: LogFormatDetectionState) => logFormatRouter({ state }), { structured: 'handleKVGraph', - // unstructured: 'handleUnstructuredGraph', + unstructured: 'handleUnstructuredGraph', // csv: 'handleCsvGraph', unsupported: 'modelOutput', } diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts new file mode 100644 index 0000000000000..b0e36de9be85d --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const GROK_EXAMPLE_ANSWER = { + rfc: 'RFC2454', + regex: + '/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/', + grok_patterns: ['%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}'], +}; + +export const GROK_ERROR_EXAMPLE_ANSWER = { + grok_patterns: [ + '%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}', + ], +}; + +export const onFailure = { + append: { + field: 'error.message', + value: + '{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}', + }, +}; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts new file mode 100644 index 0000000000000..d002dd19d5439 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/error.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { UnstructuredLogState } from '../../types'; +import type { HandleUnstructuredNodeParams } from './types'; +import { GROK_ERROR_PROMPT } from './prompts'; +import { GROK_ERROR_EXAMPLE_ANSWER } from './constants'; + +export async function handleUnstructuredError({ + state, + model, +}: HandleUnstructuredNodeParams): Promise> { + const outputParser = new JsonOutputParser(); + const grokErrorGraph = GROK_ERROR_PROMPT.pipe(model).pipe(outputParser); + const currentPatterns = state.grokPatterns; + + const pattern = await grokErrorGraph.invoke({ + current_pattern: JSON.stringify(currentPatterns, null, 2), + errors: JSON.stringify(state.errors, null, 2), + ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2), + }); + + return { + grokPatterns: pattern.grok_patterns, + lastExecutedChain: 'unstructuredError', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts new file mode 100644 index 0000000000000..212b4b6255be2 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/errors.test.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FakeLLM } from '@langchain/core/utils/testing'; +import { handleUnstructuredError } from './error'; +import type { UnstructuredLogState } from '../../types'; +import { + unstructuredLogState, + unstructuredLogResponse, +} from '../../../__jest__/fixtures/unstructured'; +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify(unstructuredLogResponse, null, 2), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: UnstructuredLogState = unstructuredLogState; + +describe('Testing unstructured error handling node', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + it('handleUnstructuredError()', async () => { + const response = await handleUnstructuredError({ state, model, client }); + expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns); + expect(response.lastExecutedChain).toBe('unstructuredError'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts new file mode 100644 index 0000000000000..60a9bdc4329de --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { FakeLLM } from '@langchain/core/utils/testing'; +import { getUnstructuredGraph } from './graph'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: '{"log_type": "structured"}', +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +describe('UnstructuredGraph', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + describe('Compiling and Running', () => { + it('Ensures that the graph compiles', async () => { + // When getUnstructuredGraph runs, langgraph compiles the graph it will error if the graph has any issues. + // Common issues for example detecting a node has no next step, or there is a infinite loop between them. + try { + await getUnstructuredGraph({ model, client }); + } catch (error) { + fail(`getUnstructuredGraph threw an error: ${error}`); + } + }); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts new file mode 100644 index 0000000000000..6048404728bfb --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.ts @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StateGraphArgs } from '@langchain/langgraph'; +import { StateGraph, END, START } from '@langchain/langgraph'; +import type { UnstructuredLogState } from '../../types'; +import { handleUnstructured } from './unstructured'; +import type { UnstructuredGraphParams, UnstructuredBaseNodeParams } from './types'; +import { handleUnstructuredError } from './error'; +import { handleUnstructuredValidate } from './validate'; + +const graphState: StateGraphArgs['channels'] = { + lastExecutedChain: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + packageName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + dataStreamName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + logSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + grokPatterns: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + jsonSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + finalized: { + value: (x: boolean, y?: boolean) => y ?? x, + default: () => false, + }, + errors: { + value: (x: object, y?: object) => y ?? x, + default: () => [], + }, + additionalProcessors: { + value: (x: object[], y?: object[]) => y ?? x, + default: () => [], + }, + ecsVersion: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, +}; + +function modelInput({ state }: UnstructuredBaseNodeParams): Partial { + return { + finalized: false, + lastExecutedChain: 'modelInput', + }; +} + +function modelOutput({ state }: UnstructuredBaseNodeParams): Partial { + return { + finalized: true, + additionalProcessors: state.additionalProcessors, + lastExecutedChain: 'modelOutput', + }; +} + +function validationRouter({ state }: UnstructuredBaseNodeParams): string { + if (Object.keys(state.errors).length === 0) { + return 'modelOutput'; + } + return 'handleUnstructuredError'; +} + +export async function getUnstructuredGraph({ model, client }: UnstructuredGraphParams) { + const workflow = new StateGraph({ + channels: graphState, + }) + .addNode('modelInput', (state: UnstructuredLogState) => modelInput({ state })) + .addNode('modelOutput', (state: UnstructuredLogState) => modelOutput({ state })) + .addNode('handleUnstructuredError', (state: UnstructuredLogState) => + handleUnstructuredError({ state, model, client }) + ) + .addNode('handleUnstructured', (state: UnstructuredLogState) => + handleUnstructured({ state, model, client }) + ) + .addNode('handleUnstructuredValidate', (state: UnstructuredLogState) => + handleUnstructuredValidate({ state, model, client }) + ) + .addEdge(START, 'modelInput') + .addEdge('modelInput', 'handleUnstructured') + .addEdge('handleUnstructured', 'handleUnstructuredValidate') + .addConditionalEdges( + 'handleUnstructuredValidate', + (state: UnstructuredLogState) => validationRouter({ state }), + { + handleUnstructuredError: 'handleUnstructuredError', + modelOutput: 'modelOutput', + } + ) + .addEdge('handleUnstructuredError', 'handleUnstructuredValidate') + .addEdge('modelOutput', END); + + const compiledUnstructuredGraph = workflow.compile(); + return compiledUnstructuredGraph; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts new file mode 100644 index 0000000000000..8fa7bb99744ed --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +export { getUnstructuredGraph } from './graph'; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts new file mode 100644 index 0000000000000..5cf5c67135d53 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/prompts.ts @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ChatPromptTemplate } from '@langchain/core/prompts'; + +export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are an expert in Syslogs and identifying the headers and structured body in syslog messages. Here is some context for you to reference for your task, read it carefully as you will get questions about it later: + + + {samples} + + `, + ], + [ + 'human', + `Looking at the multiple syslog samples provided in the context, You are tasked with identifying the appropriate regex and Grok pattern for a set of syslog samples. + Your goal is to accurately extract key components such as timestamps, hostnames, priority levels, process names, events, VLAN information, MAC addresses, IP addresses, STP roles, port statuses, messages and more. + + Follow these steps to help improve the grok patterns and apply it step by step: + 1. Familiarize yourself with various syslog message formats. + 2. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity. + 3. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`. + 4. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern. + 5. Make sure to identify the timezone component in the timestamp. + 6. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name + 7. Process Name and PID: Often included with brackets, such as sshd[1234]. + 8. VLAN information: Usually in the format of VLAN: 1234. + 9. MAC Address: The network interface MAC address. + 10. Port number: The port number on the device. + 11. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc. + 12. message: This is the free-form message text that varies widely across log entries. + + + You ALWAYS follow these guidelines when writing your response: + + - Make sure to map the remaining message part to \'message\' in grok pattern. + - Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format. + + + You are required to provide the output in the following example response format: + + + A: Please find the JSON object below: + \`\`\`json + {ex_answer} + \`\`\` + `, + ], + ['ai', 'Please find the JSON object below:'], +]); + +export const GROK_ERROR_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are an expert in Syslogs and identifying the headers and structured body in syslog messages. Here is some context for you to reference for your task, read it carefully as you will get questions about it later: + + +{current_pattern} + +`, + ], + [ + 'human', + `Please go through each error below, carefully review the provided current grok pattern, and resolve the most likely cause to the supplied error by returning an updated version of the current_pattern. + + +{errors} + + +Follow these steps to help improve the grok patterns and apply it step by step: + 1. Familiarize yourself with various syslog message formats. + 2. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity. + 3. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`. + 4. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern. + 5. Make sure to identify the timezone component in the timestamp. + 6. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name + 7. Process Name and PID: Often included with brackets, such as sshd[1234]. + 8. VLAN information: Usually in the format of VLAN: 1234. + 9. MAC Address: The network interface MAC address. + 10. Port number: The port number on the device. + 11. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc. + 12. message: This is the free-form message text that varies widely across log entries. + + You ALWAYS follow these guidelines when writing your response: + + - Make sure to map the remaining message part to \'message\' in grok pattern. + - Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format. + + + You are required to provide the output in the following example response format: + + + A: Please find the JSON object below: + \`\`\`json + {ex_answer} + \`\`\` + `, + ], + ['ai', 'Please find the JSON object below:'], +]); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts new file mode 100644 index 0000000000000..218d3856cb661 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/types.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import type { UnstructuredLogState, ChatModels } from '../../types'; + +export interface UnstructuredBaseNodeParams { + state: UnstructuredLogState; +} + +export interface UnstructuredNodeParams extends UnstructuredBaseNodeParams { + model: ChatModels; +} + +export interface UnstructuredGraphParams { + client: IScopedClusterClient; + model: ChatModels; +} + +export interface HandleUnstructuredNodeParams extends UnstructuredNodeParams { + client: IScopedClusterClient; +} + +export interface GrokResult { + [key: string]: unknown; + grok_patterns: string[]; + message: string; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts new file mode 100644 index 0000000000000..11d7107be13c0 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.test.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FakeLLM } from '@langchain/core/utils/testing'; +import { handleUnstructured } from './unstructured'; +import type { UnstructuredLogState } from '../../types'; +import { + unstructuredLogState, + unstructuredLogResponse, +} from '../../../__jest__/fixtures/unstructured'; +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify(unstructuredLogResponse, null, 2), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: UnstructuredLogState = unstructuredLogState; + +describe('Testing unstructured log handling node', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + it('handleUnstructured()', async () => { + const response = await handleUnstructured({ state, model, client }); + expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns); + expect(response.lastExecutedChain).toBe('handleUnstructured'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.ts new file mode 100644 index 0000000000000..42186e796275f --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/unstructured.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { UnstructuredLogState } from '../../types'; +import { GROK_MAIN_PROMPT } from './prompts'; +import { GrokResult, HandleUnstructuredNodeParams } from './types'; +import { GROK_EXAMPLE_ANSWER } from './constants'; + +export async function handleUnstructured({ + state, + model, + client, +}: HandleUnstructuredNodeParams): Promise> { + const grokMainGraph = GROK_MAIN_PROMPT.pipe(model).pipe(new JsonOutputParser()); + + // Pick logSamples if there was no header detected. + const samples = state.logSamples; + + const pattern = (await grokMainGraph.invoke({ + samples: samples[0], + ex_answer: JSON.stringify(GROK_EXAMPLE_ANSWER, null, 2), + })) as GrokResult; + + return { + grokPatterns: pattern.grok_patterns, + lastExecutedChain: 'handleUnstructured', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts new file mode 100644 index 0000000000000..493834e3220f9 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.test.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FakeLLM } from '@langchain/core/utils/testing'; +import { handleUnstructuredValidate } from './validate'; +import type { UnstructuredLogState } from '../../types'; +import { + unstructuredLogState, + unstructuredLogResponse, +} from '../../../__jest__/fixtures/unstructured'; +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify(unstructuredLogResponse, null, 2), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: UnstructuredLogState = unstructuredLogState; + +describe('Testing unstructured validation without errors', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest + .fn() + .mockReturnValue({ docs: [{ doc: { _source: { message: 'dummy data' } } }] }), + }, + }, + } as unknown as IScopedClusterClient; + + it('handleUnstructuredValidate() without errors', async () => { + const response = await handleUnstructuredValidate({ state, model, client }); + expect(response.jsonSamples).toStrictEqual(unstructuredLogState.jsonSamples); + expect(response.additionalProcessors).toStrictEqual([ + { + grok: { + field: 'message', + patterns: unstructuredLogState.grokPatterns, + tag: 'grok_header_pattern', + }, + }, + ]); + expect(response.errors).toStrictEqual([]); + expect(response.lastExecutedChain).toBe('unstructuredValidate'); + }); +}); + +describe('Testing unstructured validation errors', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest + .fn() + .mockReturnValue({ docs: [{ doc: { _source: { error: 'some error' } } }] }), + }, + }, + } as unknown as IScopedClusterClient; + + it('handleUnstructuredValidate() errors', async () => { + const response = await handleUnstructuredValidate({ state, model, client }); + expect(response.errors).toStrictEqual(['some error']); + expect(response.lastExecutedChain).toBe('unstructuredValidate'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts new file mode 100644 index 0000000000000..043e38be0983f --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { UnstructuredLogState } from '../../types'; +import type { GrokResult, HandleUnstructuredNodeParams } from './types'; +import { testPipeline } from '../../util'; +import { onFailure } from './constants'; +import { createGrokProcessor } from '../../util/processors'; + +export async function handleUnstructuredValidate({ + state, + client, +}: HandleUnstructuredNodeParams): Promise> { + const grokPatterns = state.grokPatterns; + const grokProcessor = createGrokProcessor(grokPatterns); + const pipeline = { processors: grokProcessor, on_failure: [onFailure] }; + + const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as { + pipelineResults: GrokResult[]; + errors: object[]; + }; + + if (errors.length > 0) { + return { errors, lastExecutedChain: 'unstructuredValidate' }; + } + + const jsonSamples: string[] = pipelineResults.map((entry) => JSON.stringify(entry)); + const additionalProcessors = state.additionalProcessors; + additionalProcessors.push(grokProcessor[0]); + + return { + jsonSamples, + additionalProcessors, + errors: [], + lastExecutedChain: 'unstructuredValidate', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts index c0a81193a465b..29a68c4395a7c 100644 --- a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts @@ -81,7 +81,7 @@ export function registerAnalyzeLogsRoutes( const graph = await getLogFormatDetectionGraph({ model, client }); const graphResults = await graph.invoke(logFormatParameters, options); const graphLogFormat = graphResults.results.samplesFormat.name; - if (graphLogFormat === 'unsupported') { + if (graphLogFormat === 'unsupported' || graphLogFormat === 'csv') { return res.customError({ statusCode: 501, body: { message: `Unsupported log samples format` }, diff --git a/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk b/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk index 53ce913df0515..9b0456b134e34 100644 --- a/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk +++ b/x-pack/plugins/integration_assistant/server/templates/processors/grok.yml.njk @@ -1,5 +1,6 @@ - grok: field: message patterns: - - '{{ grokPattern }}' + {% for grokPattern in grokPatterns %} + - '{{ grokPattern }}'{% endfor %} tag: 'grok_header_pattern' diff --git a/x-pack/plugins/integration_assistant/server/types.ts b/x-pack/plugins/integration_assistant/server/types.ts index 0fb68b4e04572..454370a02c366 100644 --- a/x-pack/plugins/integration_assistant/server/types.ts +++ b/x-pack/plugins/integration_assistant/server/types.ts @@ -122,6 +122,19 @@ export interface KVState { ecsVersion: string; } +export interface UnstructuredLogState { + lastExecutedChain: string; + packageName: string; + dataStreamName: string; + grokPatterns: string[]; + logSamples: string[]; + jsonSamples: string[]; + finalized: boolean; + errors: object; + additionalProcessors: object[]; + ecsVersion: string; +} + export interface RelatedState { rawSamples: string[]; samples: string[]; diff --git a/x-pack/plugins/integration_assistant/server/util/processors.ts b/x-pack/plugins/integration_assistant/server/util/processors.ts index 12200f9d32db9..b2e6b1683482a 100644 --- a/x-pack/plugins/integration_assistant/server/util/processors.ts +++ b/x-pack/plugins/integration_assistant/server/util/processors.ts @@ -50,13 +50,13 @@ function createAppendProcessors(processors: SimplifiedProcessors): ESProcessorIt // The kv graph returns a simplified grok processor for header // This function takes in the grok pattern string and creates the grok processor -export function createGrokProcessor(grokPattern: string): ESProcessorItem { +export function createGrokProcessor(grokPatterns: string[]): ESProcessorItem { const templatesPath = joinPath(__dirname, '../templates/processors'); const env = new Environment(new FileSystemLoader(templatesPath), { autoescape: false, }); const template = env.getTemplate('grok.yml.njk'); - const renderedTemplate = template.render({ grokPattern }); + const renderedTemplate = template.render({ grokPatterns }); const grokProcessor = safeLoad(renderedTemplate) as ESProcessorItem; return grokProcessor; }