Skip to content

Commit

Permalink
🌊 Data generation and fixes (#200783)
Browse files Browse the repository at this point in the history
This PR adds a synthtrace scenario for the main logs streams endpoint
called `slash_logs`. It can be invoked like this:
```
node scripts/synthtrace.js slash_logs --live --kibana=http://elastic:changeme@localhost:5601 --target=http://elastic:changeme@localhost:9200 --liveBucketSize=1000
```

## Changes on synthtrace

I had to adapt a couple things:
* Add the `--liveBucketSize` flag because it's really annoying to wait
for a minute to see whether data gets processed correctly
* Extend the existing log mock module that's also used for the
unstructured logs scenario in a couple ways
* Add the ability to route directly to an index by adding `_index` to a
document (otherwise it will use the DSNS)

## Changes on streams

I fixed a couple things I realized were broken:
* Check whether a field exists before accessing it in the routing
condition (otherwise it rejects the document)
* Update ES objects in the right order - if the routing is put in place
before the receiving child data stream is ready, it will auto-create an
index with the wrong mapping
* Allow to not specify a condition for a child - in this case it's not
routing to this child
* Set `subobjects: true` for now - otherwise fields are not indexed
properly, I think this is an Elasticsearch bug, I will follow up on this
with the Elasticsearch team
* Set `dynamic: false` - that somehow got lost in refactorings
  • Loading branch information
flash1293 authored Nov 25, 2024
1 parent c6a278b commit 493bd47
Show file tree
Hide file tree
Showing 15 changed files with 582 additions and 63 deletions.
20 changes: 20 additions & 0 deletions packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const defaultLogsOptions: LogsOptions = {

export type LogDocument = Fields &
Partial<{
_index?: string;
'input.type': string;
'log.file.path'?: string;
'service.name'?: string;
Expand Down Expand Up @@ -74,6 +75,14 @@ export type LogDocument = Fields &
svc: string;
hostname: string;
[LONG_FIELD_NAME]: string;
'http.status_code'?: number;
'http.request.method'?: string;
'url.path'?: string;
'process.name'?: string;
'kubernetes.namespace'?: string;
'kubernetes.pod.name'?: string;
'kubernetes.container.name'?: string;
'orchestrator.resource.name'?: string;
}>;

class Log extends Serializable<LogDocument> {
Expand Down Expand Up @@ -155,6 +164,16 @@ function create(logsOptions: LogsOptions = defaultLogsOptions): Log {
).dataset('synth');
}

function createForIndex(index: string): Log {
return new Log(
{
'input.type': 'logs',
_index: index,
},
defaultLogsOptions
);
}

function createMinimal({
dataset = 'synth',
namespace = 'default',
Expand All @@ -176,6 +195,7 @@ function createMinimal({

export const log = {
create,
createForIndex,
createMinimal,
};

Expand Down
5 changes: 5 additions & 0 deletions packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ function options(y: Argv) {
description: 'Generate and index data continuously',
boolean: true,
})
.option('liveBucketSize', {
description: 'Bucket size in ms for live streaming',
default: 1000,
number: true,
})
.option('clean', {
describe: 'Clean APM indices before indexing new data',
default: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ export function parseRunCliFlags(flags: RunCliFlags) {
'concurrency',
'versionOverride',
'clean',
'assume-package-version'
'assume-package-version',
'liveBucketSize'
),
logLevel: parsedLogLevel,
file: parsedFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export async function startLiveDataUpload({
});
}

const bucketSizeInMs = 1000 * 60;
const bucketSizeInMs = runOptions.liveBucketSize;
let requestedUntil = start;

let currentStreams: PassThrough[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export function getRoutingTransform<T extends Fields>(dataStreamType: string) {
transform(document: ESDocumentWithOperation<T>, encoding, callback) {
if ('data_stream.dataset' in document && 'data_stream.namespace' in document) {
document._index = `${dataStreamType}-${document['data_stream.dataset']}-${document['data_stream.namespace']}`;
} else {
} else if (!('_index' in document)) {
throw new Error('Cannot determine index for event');
}

Expand Down
372 changes: 361 additions & 11 deletions packages/kbn-apm-synthtrace/src/scenarios/helpers/logs_mock_data.ts

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions packages/kbn-apm-synthtrace/src/scenarios/slash_logs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import { LogDocument, generateShortId, log } from '@kbn/apm-synthtrace-client';
import { Scenario } from '../cli/scenario';
import { withClient } from '../lib/utils/with_client';
import {
getAgentName,
getCloudProvider,
getCloudRegion,
getIpAddress,
getJavaLogs,
getServiceName,
getWebLogs,
getKubernetesMessages,
getLinuxMessages,
KUBERNETES_SERVICES,
getStableShortId,
getRandomRange,
} from './helpers/logs_mock_data';
import { getAtIndexOrRandom } from './helpers/get_at_index_or_random';

const LINUX_PROCESSES = ['cron', 'sshd', 'systemd', 'nginx', 'apache2'];

const scenario: Scenario<LogDocument> = async (runOptions) => {
const constructCommonMetadata = () => ({
'agent.name': getAgentName(),
'cloud.provider': getCloudProvider(),
'cloud.region': getCloudRegion(Math.floor(Math.random() * 3)),
'cloud.availability_zone': `${getCloudRegion(0)}a`,
'cloud.instance.id': generateShortId(),
'cloud.project.id': generateShortId(),
});

const generateNginxLogs = (timestamp: number) => {
return getWebLogs().map((message) => {
return log
.createForIndex('logs')
.setHostIp(getIpAddress())
.message(message)
.defaults({
...constructCommonMetadata(),
'log.file.path': `/var/log/nginx/access-${getStableShortId()}.log`,
})
.timestamp(timestamp);
});
};

const generateSyslogData = (timestamp: number) => {
const messages: Record<string, string[]> = getLinuxMessages();

return getRandomRange().map(() => {
const processName = getAtIndexOrRandom(LINUX_PROCESSES);
const message = getAtIndexOrRandom(messages[processName]);
return log
.createForIndex('logs')
.message(message)
.setHostIp(getIpAddress())
.defaults({
...constructCommonMetadata(),
'process.name': processName,
'log.file.path': `/var/log/${processName}.log`,
})
.timestamp(timestamp);
});
};

const generateKubernetesLogs = (timestamp: number) => {
const messages: Record<string, string[]> = getKubernetesMessages();

return getRandomRange().map(() => {
const service = getAtIndexOrRandom(KUBERNETES_SERVICES);
const isStringifiedJSON = Math.random() > 0.5;
const message = isStringifiedJSON
? JSON.stringify({
serviceName: service,
message: getAtIndexOrRandom(messages[service]),
})
: getAtIndexOrRandom(messages[service]);
return log
.createForIndex('logs')
.message(message)
.setHostIp(getIpAddress())
.defaults({
...constructCommonMetadata(),
'kubernetes.namespace': 'default',
'kubernetes.pod.name': `${service}-pod-${getStableShortId()}`,
'kubernetes.container.name': `${service}-container`,
'orchestrator.resource.name': service,
})
.timestamp(timestamp);
});
};

const generateUnparsedJavaLogs = (timestamp: number) => {
return getJavaLogs().map((message) => {
const serviceName = getServiceName(Math.floor(Math.random() * 3));
return log
.createForIndex('logs')
.message(message)
.defaults({
...constructCommonMetadata(),
'service.name': serviceName,
})
.timestamp(timestamp);
});
};

return {
generate: ({ range, clients: { logsEsClient } }) => {
const { logger } = runOptions;

const nginxLogs = range.interval('1m').generator(generateNginxLogs);
const syslogData = range.interval('1m').generator(generateSyslogData);
const kubernetesLogs = range.interval('1m').generator(generateKubernetesLogs);
const unparsedJavaLogs = range.interval('1m').generator(generateUnparsedJavaLogs);

return withClient(
logsEsClient,
logger.perf('generating_messy_logs', () => [
nginxLogs,
syslogData,
kubernetesLogs,
unparsedJavaLogs,
])
);
},
};
};

export default scenario;
19 changes: 9 additions & 10 deletions packages/kbn-apm-synthtrace/src/scenarios/unstructured_logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Scenario } from '../cli/scenario';
import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates';
import { withClient } from '../lib/utils/with_client';
import { parseLogsScenarioOpts } from './helpers/logs_scenario_opts_parser';
import { getJavaLog, getWebLog } from './helpers/logs_mock_data';
import { getJavaLogs, getWebLogs } from './helpers/logs_mock_data';

const scenario: Scenario<LogDocument> = async (runOptions) => {
const { isLogsDb } = parseLogsScenarioOpts(runOptions.scenarioOpts);
Expand All @@ -23,19 +23,18 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
generate: ({ range, clients: { logsEsClient } }) => {
const { logger } = runOptions;

const datasetJavaLogs = (timestamp: number) =>
log.create({ isLogsDb }).dataset('java').message(getJavaLog()).timestamp(timestamp);

const datasetWebLogs = (timestamp: number) =>
log.create({ isLogsDb }).dataset('web').message(getWebLog()).timestamp(timestamp);

const logs = range
.interval('1m')
.rate(1)
.generator((timestamp) => {
return Array(200)
.fill(0)
.flatMap((_, index) => [datasetJavaLogs(timestamp), datasetWebLogs(timestamp)]);
return [
...getJavaLogs().map((message) =>
log.create({ isLogsDb }).dataset('java').message(message).timestamp(timestamp)
),
...getWebLogs().map((message) =>
log.create({ isLogsDb }).dataset('web').message(message).timestamp(timestamp)
),
];
});

return withClient(
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/streams/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export const streamWithoutIdDefinitonSchema = z.object({
.array(
z.object({
id: z.string(),
condition: conditionSchema,
condition: z.optional(conditionSchema),
})
)
.default([]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export function generateLayer(
template: {
settings: isRoot(definition.id) ? logsSettings : {},
mappings: {
subobjects: false,
subobjects: true, // TODO set to false once this works on Elasticsearch side - right now fields are not properly indexed.
dynamic: false,
properties,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,48 @@

import { conditionToPainless } from './condition_to_painless';

const operatorConditionAndResutls = [
const operatorConditionAndResults = [
{
condition: { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
result: 'ctx.log?.logger == "nginx_proxy"',
result: '(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy")',
},
{
condition: { field: 'log.logger', operator: 'neq' as const, value: 'nginx_proxy' },
result: 'ctx.log?.logger != "nginx_proxy"',
result: '(ctx.log?.logger !== null && ctx.log?.logger != "nginx_proxy")',
},
{
condition: { field: 'http.response.status_code', operator: 'lt' as const, value: 500 },
result: 'ctx.http?.response?.status_code < 500',
result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code < 500)',
},
{
condition: { field: 'http.response.status_code', operator: 'lte' as const, value: 500 },
result: 'ctx.http?.response?.status_code <= 500',
result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code <= 500)',
},
{
condition: { field: 'http.response.status_code', operator: 'gt' as const, value: 500 },
result: 'ctx.http?.response?.status_code > 500',
result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code > 500)',
},
{
condition: { field: 'http.response.status_code', operator: 'gte' as const, value: 500 },
result: 'ctx.http?.response?.status_code >= 500',
result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code >= 500)',
},
{
condition: { field: 'log.logger', operator: 'startsWith' as const, value: 'nginx' },
result: 'ctx.log?.logger.startsWith("nginx")',
result: '(ctx.log?.logger !== null && ctx.log?.logger.startsWith("nginx"))',
},
{
condition: { field: 'log.logger', operator: 'endsWith' as const, value: 'proxy' },
result: 'ctx.log?.logger.endsWith("proxy")',
result: '(ctx.log?.logger !== null && ctx.log?.logger.endsWith("proxy"))',
},
{
condition: { field: 'log.logger', operator: 'contains' as const, value: 'proxy' },
result: 'ctx.log?.logger.contains("proxy")',
result: '(ctx.log?.logger !== null && ctx.log?.logger.contains("proxy"))',
},
];

describe('conditionToPainless', () => {
describe('operators', () => {
operatorConditionAndResutls.forEach((setup) => {
operatorConditionAndResults.forEach((setup) => {
test(`${setup.condition.operator}`, () => {
expect(conditionToPainless(setup.condition)).toEqual(setup.result);
});
Expand All @@ -65,7 +65,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToPainless(condition)).toEqual(
'ctx.log?.logger == "nginx_proxy" && ctx.log?.level == "error"'
'(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") && (ctx.log?.level !== null && ctx.log?.level == "error")'
)
);
});
Expand All @@ -81,7 +81,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToPainless(condition)).toEqual(
'ctx.log?.logger == "nginx_proxy" || ctx.log?.level == "error"'
'(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") || (ctx.log?.level !== null && ctx.log?.level == "error")'
)
);
});
Expand All @@ -102,7 +102,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToPainless(condition)).toEqual(
'ctx.log?.logger == "nginx_proxy" && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")'
'(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") && ((ctx.log?.level !== null && ctx.log?.level == "error") || (ctx.log?.level !== null && ctx.log?.level == "ERROR"))'
)
);
});
Expand All @@ -125,7 +125,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToPainless(condition)).toEqual(
'(ctx.log?.logger == "nginx_proxy" || ctx.service?.name == "nginx") && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")'
'((ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") || (ctx.service?.name !== null && ctx.service?.name == "nginx")) && ((ctx.log?.level !== null && ctx.log?.level == "error") || (ctx.log?.level !== null && ctx.log?.level == "ERROR"))'
)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ function toPainless(condition: FilterCondition) {

export function conditionToPainless(condition: Condition, nested = false): string {
if (isFilterCondition(condition)) {
return toPainless(condition);
return `(${safePainlessField(condition)} !== null && ${toPainless(condition)})`;
}
if (isAndCondition(condition)) {
const and = condition.and.map((filter) => conditionToPainless(filter, true)).join(' && ');
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/streams/server/routes/streams/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export const deleteStreamRoute = createServerRoute({
throw new MalformedStreamId('Cannot delete root stream');
}

// need to update parent first to cut off documents streaming down
await updateParentStream(scopedClusterClient, params.path.id, parentId, logger);

await deleteStream(scopedClusterClient, params.path.id, logger);
Expand Down
Loading

0 comments on commit 493bd47

Please sign in to comment.