Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Automatic Import] Fix Non-ecs compatible fields in grok processor #194727

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ export const KV_HEADER_EXAMPLE_ANSWER = {
rfc: 'RFC2454',
regex:
'/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/',
grok_pattern: '%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
grok_pattern:
'%{WORD:cisco.audit.key1}:%{WORD:cisco.audit.value1};%{WORD:cisco.audit.key2}:%{WORD:cisco.audit.value2}:%{GREEDYDATA:message}',
};

export const KV_HEADER_ERROR_EXAMPLE_ANSWER = {
grok_pattern:
'%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
'%{TIMESTAMP:cisco.audit.timestamp}:%{WORD:cisco.audit.value1};%{WORD:cisco.audit.key2}:%{WORD:cisco.audit.value2}:%{GREEDYDATA:message}',
};

export const onFailure = {
Expand All @@ -33,6 +34,8 @@ export const onFailure = {
},
};

export const removeProcessor = { remove: { field: 'message', ignore_missing: true } };

export const COMMON_ERRORS = [
{
error: 'field [message] does not contain value_split [=]',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export async function handleHeaderError({
const currentPattern = state.grokPattern;

const pattern = await kvHeaderErrorGraph.invoke({
packageName: state.packageName,
dataStreamName: state.dataStreamName,
current_pattern: JSON.stringify(currentPattern, null, 2),
errors: JSON.stringify(state.errors, null, 2),
ex_answer: JSON.stringify(KV_HEADER_ERROR_EXAMPLE_ANSWER, null, 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export async function handleHeader({

const pattern = await kvHeaderGraph.invoke({
samples: state.logSamples,
packageName: state.packageName,
dataStreamName: state.dataStreamName,
ex_answer: JSON.stringify(KV_HEADER_EXAMPLE_ANSWER, null, 2),
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ Follow these steps to identify the header pattern:

You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Do not parse the message part in the regex. Just the header part should be in regex nad grok_pattern.
- Do not parse the message part in the regex. Just the header part should be in regex and grok_pattern.
- Make sure to map the remaining message body to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>

Expand Down Expand Up @@ -121,8 +122,9 @@ Follow these steps to fix the errors in the header pattern:
4. Make sure the regex and grok pattern matches all the header information. Only the structured message body should be under GREEDYDATA in grok pattern.
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Do not parse the message part in the regex. Just the header part should be in regex nad grok_pattern.
- Do not parse the message part in the regex. Just the header part should be in regex and grok_pattern.
- Make sure to map the remaining message body to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>

Expand Down
51 changes: 30 additions & 21 deletions x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { ESProcessorItem } from '../../../common';
import type { KVState } from '../../types';
import type { HandleKVNodeParams } from './types';
import { testPipeline } from '../../util';
import { onFailure } from './constants';
import { onFailure, removeProcessor } from './constants';
import { createGrokProcessor } from '../../util/processors';

interface KVResult {
interface StructuredLogResult {
[packageName: string]: { [dataStreamName: string]: unknown };
}

Expand All @@ -32,25 +32,24 @@ export async function handleKVValidate({

// Pick logSamples if there was no header detected.
const samples = state.header ? state.kvLogMessages : state.logSamples;

const { pipelineResults: kvOutputSamples, errors } = (await createJSONInput(
kvProcessor,
samples,
client,
state
)) as { pipelineResults: KVResult[]; errors: object[] };

const { errors } = await verifyKVProcessor(kvProcessor, samples, client);
if (errors.length > 0) {
return { errors, lastExecutedChain: 'kvValidate' };
}

// Converts JSON Object into a string and parses it as a array of JSON strings
const jsonSamples = kvOutputSamples
const additionalProcessors = state.additionalProcessors;
additionalProcessors.push(kvProcessor[0]);
const samplesObject: StructuredLogResult[] = await buildJSONSamples(
state.logSamples,
additionalProcessors,
client
);

const jsonSamples = samplesObject
.map((log) => log[packageName])
.map((log) => log[dataStreamName])
.map((log) => JSON.stringify(log));
const additionalProcessors = state.additionalProcessors;
additionalProcessors.push(kvProcessor[0]);

return {
jsonSamples,
Expand Down Expand Up @@ -89,15 +88,25 @@ export async function handleHeaderValidate({
};
}

async function createJSONInput(
async function verifyKVProcessor(
kvProcessor: ESProcessorItem,
formattedSamples: string[],
client: IScopedClusterClient,
state: KVState
): Promise<{ pipelineResults: object[]; errors: object[] }> {
// This processor removes the original message field in the JSON output
const removeProcessor = { remove: { field: 'message', ignore_missing: true } };
client: IScopedClusterClient
): Promise<{ errors: object[] }> {
// This processor removes the original message field in the output
const pipeline = { processors: [kvProcessor[0], removeProcessor], on_failure: [onFailure] };
const { pipelineResults, errors } = await testPipeline(formattedSamples, pipeline, client);
return { pipelineResults, errors };
const { errors } = await testPipeline(formattedSamples, pipeline, client);
return { errors };
}

async function buildJSONSamples(
samples: string[],
processors: object[],
client: IScopedClusterClient
): Promise<StructuredLogResult[]> {
const pipeline = { processors: [...processors, removeProcessor], on_failure: [onFailure] };
const { pipelineResults } = (await testPipeline(samples, pipeline, client)) as {
pipelineResults: StructuredLogResult[];
};
return pipelineResults;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ For each pipeline result you find matching values that would fit any of the rela

You ALWAYS follow these guidelines when writing your response:
<guidelines>
- The \`message\` field may not be part of related fields.
- You can use as many processor objects as needed to map all relevant pipeline result fields to any of the ECS related fields.
- If no relevant fields or values are found that could be mapped confidently to any of the related fields, then respond with an empty array [] as valid JSON enclosed with 3 backticks (\`).
- Do not respond with anything except the array of processors as a valid JSON objects enclosed with 3 backticks (\`), see example response below.
Expand Down Expand Up @@ -79,6 +80,7 @@ Follow these steps to help resolve the current ingest pipeline issues:

You ALWAYS follow these guidelines when writing your response:
<guidelines>
- The \`message\` field may not be part of related fields.
- Never use "split" in template values, only use the field name inside the triple brackets. If the error mentions "Improperly closed variable in query-template" then check each "value" field for any special characters and remove them.
- If solving an error means removing the last processor in the list, then return an empty array [] as valid JSON enclosed with 3 backticks (\`).
- Do not respond with anything except the complete updated array of processors as a valid JSON object enclosed with 3 backticks (\`), see example response below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export async function handleUnstructuredError({
const currentPatterns = state.grokPatterns;

const pattern = await grokErrorGraph.invoke({
packageName: state.packageName,
dataStreamName: state.dataStreamName,
current_pattern: JSON.stringify(currentPatterns, null, 2),
errors: JSON.stringify(state.errors, null, 2),
ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Make sure to map the remaining message part to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>

Expand Down Expand Up @@ -89,6 +90,7 @@ Follow these steps to help improve the grok patterns and apply it step by step:
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Make sure to map the remaining message part to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ export interface HandleUnstructuredNodeParams extends UnstructuredNodeParams {
}

export interface GrokResult {
[key: string]: unknown;
grok_patterns: string[];
message: string;
}

export interface LogResult {
[packageName: string]: { [dataStreamName: string]: unknown };
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export async function handleUnstructured({
const samples = state.logSamples;

const pattern = (await grokMainGraph.invoke({
packageName: state.packageName,
dataStreamName: state.dataStreamName,
samples: samples[0],
ex_answer: JSON.stringify(GROK_EXAMPLE_ANSWER, null, 2),
})) as GrokResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ describe('Testing unstructured validation without errors', () => {
const client = {
asCurrentUser: {
ingest: {
simulate: jest
.fn()
.mockReturnValue({ docs: [{ doc: { _source: { message: 'dummy data' } } }] }),
simulate: jest.fn().mockReturnValue({
docs: [
{
doc: {
_source: { testPackage: { testDatastream: { message: 'dummy data' } } },
},
},
],
}),
},
},
} as unknown as IScopedClusterClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import type { UnstructuredLogState } from '../../types';
import type { GrokResult, HandleUnstructuredNodeParams } from './types';
import type { HandleUnstructuredNodeParams, LogResult } from './types';
import { testPipeline } from '../../util';
import { onFailure } from './constants';
import { createGrokProcessor } from '../../util/processors';
Expand All @@ -18,17 +18,22 @@ export async function handleUnstructuredValidate({
const grokPatterns = state.grokPatterns;
const grokProcessor = createGrokProcessor(grokPatterns);
const pipeline = { processors: grokProcessor, on_failure: [onFailure] };
const packageName = state.packageName;
const dataStreamName = state.dataStreamName;

const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
pipelineResults: GrokResult[];
pipelineResults: LogResult[];
errors: object[];
};

if (errors.length > 0) {
return { errors, lastExecutedChain: 'unstructuredValidate' };
}

const jsonSamples: string[] = pipelineResults.map((entry) => JSON.stringify(entry));
const jsonSamples = pipelineResults
.map((log) => log[packageName])
.map((log) => log[dataStreamName])
.map((log) => JSON.stringify(log));
const additionalProcessors = state.additionalProcessors;
additionalProcessors.push(grokProcessor[0]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ processors:
field: originalMessage
ignore_missing: true
tag: remove_copied_message
if: 'ctx.event?.original != null'{% if log_format != 'unstructured' %}
if: 'ctx.event?.original != null'
- remove:
field: message
ignore_missing: true
tag: remove_message{% endif %}{% if (log_format == 'json') or (log_format == 'ndjson') %}
tag: remove_message{% if (log_format == 'json') or (log_format == 'ndjson') %}
- json:
field: event.original
tag: json_original
Expand Down