Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logs Explorer] Add support for log generation in synthtrace #170107

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
30860d6
Add support for log generation in synthtrace
achyutjhunjhunwala Oct 30, 2023
c47db39
Adds Synthtrace support to serverless tests
achyutjhunjhunwala Oct 30, 2023
cc507c3
Fix lint issue
achyutjhunjhunwala Oct 30, 2023
d940554
[CI] Auto-commit changed files from 'node scripts/lint_ts_projects --…
kibanamachine Oct 30, 2023
c39a4fd
Add support to index log data via CLI
achyutjhunjhunwala Oct 30, 2023
cff58f1
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Oct 30, 2023
3a2ef9a
Fix unrelated checktype issue
achyutjhunjhunwala Oct 30, 2023
f5d3313
Fix datastream regex
achyutjhunjhunwala Oct 31, 2023
0331a59
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Oct 31, 2023
1bc3995
Update synthtrace documentation
achyutjhunjhunwala Oct 31, 2023
aa36d3e
Add Logs UX as the co-owner of the package
achyutjhunjhunwala Oct 31, 2023
83964f8
[CI] Auto-commit changed files from 'node scripts/generate codeowners'
kibanamachine Oct 31, 2023
616f224
IMPROVE log instance creation logic and renamed client
achyutjhunjhunwala Nov 1, 2023
4b527f7
Make scenario files aware of client
achyutjhunjhunwala Nov 1, 2023
b6eb187
Update README.md
achyutjhunjhunwala Nov 1, 2023
ac35eb1
Update README.md
achyutjhunjhunwala Nov 1, 2023
57d2d80
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Nov 1, 2023
8690722
Update Synthtrace scenario with robust data
achyutjhunjhunwala Nov 1, 2023
2f99579
Fix tests
achyutjhunjhunwala Nov 1, 2023
840cfb3
Fix generateId logic to save CPU
achyutjhunjhunwala Nov 3, 2023
20dae5a
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Nov 13, 2023
dcfafe2
Add logic to generate multiple type of data using synthtrace
achyutjhunjhunwala Nov 14, 2023
617941b
Fix all scenario files
achyutjhunjhunwala Nov 14, 2023
7cf6fa1
Fix typo
achyutjhunjhunwala Nov 14, 2023
ebfc3f9
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Nov 14, 2023
135cfe1
Remove code comment
achyutjhunjhunwala Nov 14, 2023
c773422
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Nov 14, 2023
7702861
Merge branch 'main' into add-support-for-logs-in-synthtrace
mistic Nov 14, 2023
4b9553a
Update packages/kbn-apm-synthtrace/README.md
achyutjhunjhunwala Nov 15, 2023
6134699
Fix final review comments
achyutjhunjhunwala Nov 15, 2023
fd786ec
Fix README.md
achyutjhunjhunwala Nov 15, 2023
c28c48c
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Nov 16, 2023
0f099c9
reusing streams rather than creating them on every batch
achyutjhunjhunwala Nov 16, 2023
009bdda
Merge branch 'main' into add-support-for-logs-in-synthtrace
achyutjhunjhunwala Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/kbn-apm-synthtrace-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ export { dedot } from './src/lib/utils/dedot';
export { generateLongId, generateShortId } from './src/lib/utils/generate_id';
export { appendHash, hashKeysOf } from './src/lib/utils/hash';
export type { ESDocumentWithOperation, SynthtraceESAction, SynthtraceGenerator } from './src/types';
export { log, type LogDocument } from './src/lib/logs';
68 changes: 68 additions & 0 deletions packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { Fields } from '../entity';
import { Serializable } from '../serializable';

export type LogDocument = Fields &
Partial<{
achyutjhunjhunwala marked this conversation as resolved.
Show resolved Hide resolved
'input.type': string;
'log.file.path'?: string;
'service.name': string;
'data_stream.namespace': string;
'data_stream.type': string;
'data_stream.dataset': string;
message?: string;
'event.dataset': string;
'log.level'?: string;
'host.name'?: string;
}>;

class Log extends Serializable<LogDocument> {
service(name: string) {
this.fields['service.name'] = name;
return this;
}

namespace(value: string) {
this.fields['data_stream.namespace'] = value;
return this;
}

dataset(value: string) {
this.fields['data_stream.dataset'] = value;
this.fields['event.dataset'] = value;
return this;
}

logLevel(level: string) {
this.fields['log.level'] = level;
return this;
}

message(message: string) {
this.fields.message = message;
return this;
}
}

function service(serviceName: string = 'synth-service'): Log {
return new Log({
achyutjhunjhunwala marked this conversation as resolved.
Show resolved Hide resolved
'input.type': 'logs',
'service.name': serviceName,
'data_stream.namespace': 'default',
'data_stream.type': 'logs',
'data_stream.dataset': 'synth',
'event.dataset': 'synth',
'host.name': 'synth-host',
});
}

export const log = {
service,
};
2 changes: 2 additions & 0 deletions packages/kbn-apm-synthtrace/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export { AssetsSynthtraceEsClient } from './src/lib/assets/assets_synthtrace_es_

export { MonitoringSynthtraceEsClient } from './src/lib/monitoring/monitoring_synthtrace_es_client';

export { LogsSynthtraceEsClient } from './src/lib/logs/logs_synthtrace_es_client';

export {
addObserverVersionTransform,
deleteSummaryFieldTransform,
Expand Down
6 changes: 6 additions & 0 deletions packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ function options(y: Argv) {
return arg as Record<string, any> | undefined;
},
})
.option('type', {
describe:
'Type of data to be generated, defaults to APM events, can be logs for generating logs',
default: 'apm',
string: true,
})
.showHelpOnFail(false);
}

Expand Down
9 changes: 9 additions & 0 deletions packages/kbn-apm-synthtrace/src/cli/utils/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import { createLogger } from '../../lib/utils/create_logger';
import { getEsClient } from './get_es_client';
import { getLogsEsClient } from './get_logs_es_client';
import { getKibanaClient } from './get_kibana_client';
import { getServiceUrls } from './get_service_urls';
import { RunOptions } from './parse_run_cli_flags';
Expand All @@ -33,15 +34,23 @@ export async function bootstrap(runOptions: RunOptions) {
version,
});

const logsEsClient = getLogsEsClient({
target: esUrl,
logger,
concurrency: runOptions.concurrency,
});

await kibanaClient.installApmPackage(latestPackageVersion);

if (runOptions.clean) {
await apmEsClient.clean();
await logsEsClient.clean();
}

return {
logger,
apmEsClient,
logsEsClient,
version,
kibanaUrl,
esUrl,
Expand Down
31 changes: 31 additions & 0 deletions packages/kbn-apm-synthtrace/src/cli/utils/get_logs_es_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { Client } from '@elastic/elasticsearch';
import { LogsSynthtraceEsClient } from '../../lib/logs/logs_synthtrace_es_client';
import { Logger } from '../../lib/utils/create_logger';
import { RunOptions } from './parse_run_cli_flags';

export function getLogsEsClient({
achyutjhunjhunwala marked this conversation as resolved.
Show resolved Hide resolved
target,
logger,
concurrency,
}: Pick<RunOptions, 'concurrency'> & {
target: string;
logger: Logger;
}) {
const client = new Client({
node: target,
});

return new LogsSynthtraceEsClient({
client,
logger,
concurrency,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export function parseRunCliFlags(flags: RunCliFlags) {
'kibana',
'concurrency',
'versionOverride',
'clean'
'clean',
'type'
),
logLevel: parsedLogLevel,
file: parsedFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { timerange } from '@kbn/apm-synthtrace-client';
import { castArray } from 'lodash';
import { PassThrough, Readable, Writable } from 'stream';
import { isGeneratorObject } from 'util/types';
import { ApmSynthtraceEsClient, LogsSynthtraceEsClient } from '../../..';
import { awaitStream } from '../../lib/utils/wait_until_stream_finished';
import { bootstrap } from './bootstrap';
import { getScenario } from './get_scenario';
Expand All @@ -23,9 +24,14 @@ export async function startLiveDataUpload({
start: Date;
}) {
const file = runOptions.file;
const clientType = runOptions.type;

const { logger, apmEsClient } = await bootstrap(runOptions);
const { logger, apmEsClient, logsEsClient } = await bootstrap(runOptions);

let client: ApmSynthtraceEsClient | LogsSynthtraceEsClient = apmEsClient;
if (clientType === 'log') {
client = logsEsClient;
}
const scenario = await getScenario({ file, logger });
const { generate } = await scenario({ ...runOptions, logger });

Expand All @@ -36,7 +42,7 @@ export async function startLiveDataUpload({
objectMode: true,
});

apmEsClient.index(stream);
client.index(stream);

function closeStream() {
stream.end(() => {
Expand Down Expand Up @@ -74,7 +80,7 @@ export async function startLiveDataUpload({

await awaitStream(concatenatedStream);

await apmEsClient.refresh();
await client.refresh();

requestedUntil = bucketTo;
}
Expand All @@ -85,6 +91,7 @@ export async function startLiveDataUpload({
await delay(bucketSizeInMs);
} while (true);
}

async function delay(ms: number) {
return await new Promise((resolve) => setTimeout(resolve, ms));
}
19 changes: 17 additions & 2 deletions packages/kbn-apm-synthtrace/src/cli/utils/synthtrace_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { getEsClient } from './get_es_client';
import { getScenario } from './get_scenario';
import { loggerProxy } from './logger_proxy';
import { RunOptions } from './parse_run_cli_flags';
import { ApmSynthtraceEsClient, LogsSynthtraceEsClient } from '../../..';
import { getLogsEsClient } from './get_logs_es_client';

export interface WorkerData {
bucketFrom: Date;
Expand All @@ -34,8 +36,21 @@ async function start() {
version,
});

const logsEsClient = getLogsEsClient({
concurrency: runOptions.concurrency,
target: esUrl,
logger,
});

const clientType = runOptions.type;

const file = runOptions.file;

let client: ApmSynthtraceEsClient | LogsSynthtraceEsClient = apmEsClient;
if (clientType === 'log') {
client = logsEsClient;
}

const scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));

logger.info(`Running scenario from ${bucketFrom.toISOString()} to ${bucketTo.toISOString()}`);
Expand Down Expand Up @@ -65,8 +80,8 @@ async function start() {
}, 5000);

await logger.perf('index_scenario', async () => {
await apmEsClient.index(generators);
await apmEsClient.refresh();
await client.index(generators);
await client.refresh();
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { Client } from '@elastic/elasticsearch';
import { dedot, ESDocumentWithOperation } from '@kbn/apm-synthtrace-client';
import { pipeline, Readable, Transform } from 'stream';
import { LogDocument } from '@kbn/apm-synthtrace-client/src/lib/logs';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';

export type LogsSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;

export class LogsSynthtraceEsClient extends SynthtraceEsClient<LogDocument> {
constructor(options: { client: Client; logger: Logger } & LogsSynthtraceEsClientOptions) {
super({
...options,
pipeline: logsPipeline(),
});
this.dataStreams = ['logs-*'];
achyutjhunjhunwala marked this conversation as resolved.
Show resolved Hide resolved
}
}

function logsPipeline() {
return (base: Readable) => {
return pipeline(
base,
getSerializeTransform<LogDocument>(),
getRoutingTransform(),
getDedotTransform(),
achyutjhunjhunwala marked this conversation as resolved.
Show resolved Hide resolved
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}

function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<LogDocument>, encoding, callback) {
if (
'data_stream.type' in document &&
'data_stream.dataset' in document &&
'data_stream.namespace' in document
yngrdyn marked this conversation as resolved.
Show resolved Hide resolved
) {
document._index = `${document['data_stream.type']}-${document['data_stream.dataset']}-${document['data_stream.namespace']}`;
} else {
throw new Error('Cannot determine index for event');
}

callback(null, document);
},
});
}

function getDedotTransform() {
return new Transform({
objectMode: true,
transform(document: LogDocument, encoding, callback) {
const target: Record<string, any> = dedot(document, {});
delete target.meta;
target['@timestamp'] = new Date(target['@timestamp']!).toISOString();

callback(null, target);
},
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import { ApmFields, Serializable } from '@kbn/apm-synthtrace-client';
import { Transform } from 'stream';

export function getSerializeTransform() {
const buffer: ApmFields[] = [];
export function getSerializeTransform<TFields = ApmFields>() {
const buffer: TFields[] = [];

let cb: (() => void) | undefined;

function push(stream: Transform, events: ApmFields[], callback?: () => void) {
let event: ApmFields | undefined;
function push(stream: Transform, events: TFields[], callback?: () => void) {
let event: TFields | undefined;
while ((event = events.shift())) {
if (!stream.push(event)) {
buffer.push(...events);
Expand All @@ -37,7 +37,7 @@ export function getSerializeTransform() {
push(this, nextEvents, nextCallback);
}
},
write(chunk: Serializable<ApmFields>, encoding, callback) {
write(chunk: Serializable<TFields>, encoding, callback) {
push(this, chunk.serialize(), callback);
},
});
Expand Down
Loading