Skip to content

Commit

Permalink
[Synthtrace] Support LogsDb Mode (elastic#190286)
Browse files Browse the repository at this point in the history
closes [elastic#3757
](elastic/observability-dev#3757)


## 📝  Summary

This PR adds support of `LogsDb` to all current Logs scenarios.

To be able to use the newly added flag from CLI:

`node scripts/synthtrace degraded_logs.ts --scenarioOpts.logsdb=true`

This creates a new `Logsdb` Index template that mimics the default
`Logs` one but sets the `mode=logsdb` and matches on index pattern
`logs-logsdb.*-*`.

## 🎥 Demo


https://github.com/user-attachments/assets/378be9ac-215a-40ca-b57c-3bb9751292b2
  • Loading branch information
mohamedhamed-ahmed authored Aug 13, 2024
1 parent 6589cd3 commit 2de0dd6
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 44 deletions.
40 changes: 29 additions & 11 deletions packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
import { Fields } from '../entity';
import { Serializable } from '../serializable';

const LOGSDB_DATASET_PREFIX = 'logsdb.';

interface LogsOptions {
isLogsDb: boolean;
}

const defaultLogsOptions: LogsOptions = {
isLogsDb: false,
};

export type LogDocument = Fields &
Partial<{
'input.type': string;
Expand Down Expand Up @@ -48,6 +58,12 @@ export type LogDocument = Fields &
}>;

class Log extends Serializable<LogDocument> {
constructor(fields: LogDocument, private logsOptions: LogsOptions) {
super({
...fields,
});
}

service(name: string) {
this.fields['service.name'] = name;
return this;
Expand All @@ -69,8 +85,9 @@ class Log extends Serializable<LogDocument> {
}

dataset(value: string) {
this.fields['data_stream.dataset'] = value;
this.fields['event.dataset'] = value;
const dataset = `${this.logsOptions.isLogsDb ? LOGSDB_DATASET_PREFIX : ''}${value}`;
this.fields['data_stream.dataset'] = dataset;
this.fields['event.dataset'] = dataset;
return this;
}

Expand All @@ -85,15 +102,16 @@ class Log extends Serializable<LogDocument> {
}
}

function create(): Log {
return new Log({
'input.type': 'logs',
'data_stream.namespace': 'default',
'data_stream.type': 'logs',
'data_stream.dataset': 'synth',
'event.dataset': 'synth',
'host.name': 'synth-host',
});
function create(logsOptions: LogsOptions = defaultLogsOptions): Log {
return new Log(
{
'input.type': 'logs',
'data_stream.namespace': 'default',
'data_stream.type': 'logs',
'host.name': 'synth-host',
},
logsOptions
).dataset('synth');
}

export const log = {
Expand Down
21 changes: 9 additions & 12 deletions packages/kbn-apm-synthtrace/src/cli/scenario.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@ import { Logger } from '../lib/utils/create_logger';
import { ScenarioReturnType } from '../lib/utils/with_client';
import { RunOptions } from './utils/parse_run_cli_flags';

interface EsClients {
apmEsClient: ApmSynthtraceEsClient;
logsEsClient: LogsSynthtraceEsClient;
infraEsClient: InfraSynthtraceEsClient;
assetsEsClient: AssetsSynthtraceEsClient;
}

type Generate<TFields> = (options: {
range: Timerange;
clients: {
apmEsClient: ApmSynthtraceEsClient;
logsEsClient: LogsSynthtraceEsClient;
infraEsClient: InfraSynthtraceEsClient;
assetsEsClient: AssetsSynthtraceEsClient;
};
clients: EsClients;
}) => ScenarioReturnType<TFields> | Array<ScenarioReturnType<TFields>>;

export type Scenario<TFields> = (options: RunOptions & { logger: Logger }) => Promise<{
bootstrap?: (options: {
apmEsClient: ApmSynthtraceEsClient;
logsEsClient: LogsSynthtraceEsClient;
infraEsClient: InfraSynthtraceEsClient;
assetsEsClient: AssetsSynthtraceEsClient;
}) => Promise<void>;
bootstrap?: (options: EsClients) => Promise<void>;
generate: Generate<TFields>;
}>;
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import { timerange } from '@kbn/apm-synthtrace-client';
import { castArray } from 'lodash';
import { PassThrough, Readable, Writable } from 'stream';
import { isGeneratorObject } from 'util/types';
import { SynthtraceEsClient } from '../../lib/shared/base_client';
import { awaitStream } from '../../lib/utils/wait_until_stream_finished';
import { bootstrap } from './bootstrap';
import { getScenario } from './get_scenario';
import { RunOptions } from './parse_run_cli_flags';
import { SynthtraceEsClient } from '../../lib/utils/with_client';

export async function startLiveDataUpload({
runOptions,
Expand All @@ -36,6 +36,7 @@ export async function startLiveDataUpload({
let requestedUntil = start;

let currentStreams: PassThrough[] = [];
// @ts-expect-error upgrade typescript v4.9.5
const cachedStreams: WeakMap<SynthtraceEsClient, PassThrough> = new WeakMap();

process.on('SIGINT', () => closeStreams());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { IndicesPutIndexTemplateRequest } from '@elastic/elasticsearch/lib/api/types';

export enum IndexTemplateName {
LogsDb = 'logsdb',
}

export const indexTemplates: {
[key in IndexTemplateName]: IndicesPutIndexTemplateRequest;
} = {
[IndexTemplateName.LogsDb]: {
name: IndexTemplateName.LogsDb,
_meta: {
managed: false,
description: 'custom logsdb template created by synthtrace tool.',
},
template: {
settings: {
mode: 'logsdb',
},
},
priority: 500,
index_patterns: ['logs-logsdb.*-*'],
composed_of: ['logs@mappings', 'logs@settings', 'ecs@mappings'],
allow_auto_create: true,
data_stream: {
hidden: false,
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { LogDocument } from '@kbn/apm-synthtrace-client/src/lib/logs';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
import { indexTemplates, IndexTemplateName } from './custom_logsdb_index_templates';

export type LogsSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;

Expand All @@ -24,6 +25,21 @@ export class LogsSynthtraceEsClient extends SynthtraceEsClient<LogDocument> {
});
this.dataStreams = ['logs-*-*'];
}

async createIndexTemplate(name: IndexTemplateName) {
const isTemplateExisting = await this.client.indices.existsIndexTemplate({ name });

if (isTemplateExisting) return this.logger.info(`Index template already exists: ${name}`);

const template = indexTemplates[name];

try {
await this.client.indices.putIndexTemplate(template);
this.logger.info(`Index template successfully created: ${name}`);
} catch (err) {
this.logger.error(`Index template creation failed: ${name} - ${err.message}`);
}
}
}

function logsPipeline() {
Expand Down
4 changes: 2 additions & 2 deletions packages/kbn-apm-synthtrace/src/lib/utils/with_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

import { SynthtraceGenerator } from '@kbn/apm-synthtrace-client';
import { Readable } from 'stream';
import { ApmSynthtraceEsClient, LogsSynthtraceEsClient } from '../../..';
import { SynthtraceEsClient } from '../shared/base_client';

export type SynthtraceEsClient = ApmSynthtraceEsClient | LogsSynthtraceEsClient;
export type SynthGenerator<TFields> =
// @ts-expect-error upgrade typescript v4.9.5
| SynthtraceGenerator<TFields>
Expand All @@ -19,6 +18,7 @@ export type SynthGenerator<TFields> =
| Readable;

export const withClient = <TFields>(
// @ts-expect-error upgrade typescript v4.9.5
client: SynthtraceEsClient,
generator: SynthGenerator<TFields>
) => {
Expand Down
12 changes: 9 additions & 3 deletions packages/kbn-apm-synthtrace/src/scenarios/degraded_logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
*/
import { LogDocument, log, generateShortId, generateLongId } from '@kbn/apm-synthtrace-client';
import { Scenario } from '../cli/scenario';
import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates';
import { withClient } from '../lib/utils/with_client';
import { parseLogsScenarioOpts } from './helpers/logs_scenario_opts_parser';

const MORE_THAN_1024_CHARS =
'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?';

const scenario: Scenario<LogDocument> = async (runOptions) => {
const { isLogsDb } = parseLogsScenarioOpts(runOptions.scenarioOpts);
return {
bootstrap: async ({ logsEsClient }) => {
if (isLogsDb) await logsEsClient.createIndexTemplate(IndexTemplateName.LogsDb);
},
generate: ({ range, clients: { logsEsClient } }) => {
const { logger } = runOptions;

Expand Down Expand Up @@ -42,7 +48,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
const datasetSynth1Logs = (timestamp: number) => {
const index = Math.floor(Math.random() * 3);
return log
.create()
.create({ isLogsDb })
.dataset('synth.1')
.message(MESSAGE_LOG_LEVELS[index].message as string)
.logLevel(MESSAGE_LOG_LEVELS[index].level)
Expand All @@ -67,7 +73,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
const index = Math.floor(Math.random() * 3);
const isMalformed = i % 60 === 0;
return log
.create()
.create({ isLogsDb })
.dataset('synth.2')
.message(MESSAGE_LOG_LEVELS[index].message as string)
.logLevel(isMalformed ? MORE_THAN_1024_CHARS : MESSAGE_LOG_LEVELS[index].level) // "ignore_above": 1024 in mapping
Expand All @@ -92,7 +98,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
const index = Math.floor(Math.random() * 3);
const isMalformed = i % 10 === 0;
return log
.create()
.create({ isLogsDb })
.dataset('synth.3')
.message(MESSAGE_LOG_LEVELS[index].message as string)
.logLevel(isMalformed ? MORE_THAN_1024_CHARS : MESSAGE_LOG_LEVELS[index].level) // "ignore_above": 1024 in mapping
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export const parseStringToBoolean = (value: string, defaultValue?: boolean): boolean => {
if (!value) return defaultValue ?? false;

switch (value.trim().toLowerCase()) {
case 'true':
return true;
case 'false':
return false;
default:
return defaultValue ?? /true/i.test(value);
}
};

export interface LogsScenarioOpts {
isLogsDb: boolean;
}

export const parseLogsScenarioOpts = (
scenarioOpts: Record<string, any> | undefined
): LogsScenarioOpts => {
const isLogsDb = parseStringToBoolean(scenarioOpts?.logsdb);

return {
isLogsDb,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ import {
import { Scenario } from '../cli/scenario';
import { withClient } from '../lib/utils/with_client';
import { getSynthtraceEnvironment } from '../lib/utils/get_synthtrace_environment';
import { parseLogsScenarioOpts } from './helpers/logs_scenario_opts_parser';
import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates';

const ENVIRONMENT = getSynthtraceEnvironment(__filename);

const scenario: Scenario<LogDocument> = async (runOptions) => {
const { isLogsDb } = parseLogsScenarioOpts(runOptions.scenarioOpts);

return {
bootstrap: async ({ logsEsClient }) => {
if (isLogsDb) await logsEsClient.createIndexTemplate(IndexTemplateName.LogsDb);
},
generate: ({ range, clients: { logsEsClient, apmEsClient } }) => {
const { numServices = 3 } = runOptions.scenarioOpts || {};
const { logger } = runOptions;
Expand Down Expand Up @@ -53,7 +60,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
.map(() => {
const index = Math.floor(Math.random() * 3);
return log
.create()
.create({ isLogsDb })
.message(MESSAGE_LOG_LEVELS[index].message)
.logLevel(MESSAGE_LOG_LEVELS[index].level)
.service(SERVICE_NAMES[index])
Expand Down
18 changes: 15 additions & 3 deletions packages/kbn-apm-synthtrace/src/scenarios/logs_traces_hosts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { Scenario } from '../cli/scenario';
import { Logger } from '../lib/utils/create_logger';
import { withClient } from '../lib/utils/with_client';
import { getSynthtraceEnvironment } from '../lib/utils/get_synthtrace_environment';
import { parseLogsScenarioOpts, parseStringToBoolean } from './helpers/logs_scenario_opts_parser';
import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates';

const ENVIRONMENT = getSynthtraceEnvironment(__filename);

Expand All @@ -38,10 +40,16 @@ const DEFAULT_SCENARIO_OPTS = {
logsRate: 1,
ingestHosts: true,
ingestTraces: true,
logsdb: false,
};

const scenario: Scenario<LogDocument | InfraDocument | ApmFields> = async (runOptions) => {
const { isLogsDb } = parseLogsScenarioOpts(runOptions.scenarioOpts);

return {
bootstrap: async ({ logsEsClient }) => {
if (isLogsDb) await logsEsClient.createIndexTemplate(IndexTemplateName.LogsDb);
},
generate: ({ range, clients: { logsEsClient, infraEsClient, apmEsClient } }) => {
const {
numSpaces,
Expand All @@ -58,6 +66,10 @@ const scenario: Scenario<LogDocument | InfraDocument | ApmFields> = async (runOp
ingestHosts,
ingestTraces,
} = { ...DEFAULT_SCENARIO_OPTS, ...(runOptions.scenarioOpts || {}) };

const parsedIngestHosts = parseStringToBoolean(`${ingestHosts}`);
const parsedIngestTraces = parseStringToBoolean(`${ingestTraces}`);

const { logger } = runOptions;

killIfUnknownScenarioOptions(logger, runOptions.scenarioOpts || {});
Expand Down Expand Up @@ -189,7 +201,7 @@ const scenario: Scenario<LogDocument | InfraDocument | ApmFields> = async (runOp
const customFields = getExtraFields(numCustomFields, isMalformed, customFieldPrefix);

return log
.create()
.create({ isLogsDb })
.dataset(dataset)
.message(message)
.logLevel(logLevel)
Expand Down Expand Up @@ -220,15 +232,15 @@ const scenario: Scenario<LogDocument | InfraDocument | ApmFields> = async (runOp
});

return [
...(ingestHosts
...(parsedIngestHosts
? [
withClient(
infraEsClient,
logger.perf('generating_infra_hosts', () => hosts)
),
]
: []),
...(ingestTraces
...(parsedIngestTraces
? [
withClient(
apmEsClient,
Expand Down
Loading

0 comments on commit 2de0dd6

Please sign in to comment.