diff --git a/packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts b/packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts index 8b3ed0cda1072..8fa7a5997f4f7 100644 --- a/packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts +++ b/packages/kbn-apm-synthtrace-client/src/lib/logs/index.ts @@ -24,6 +24,7 @@ const defaultLogsOptions: LogsOptions = { export type LogDocument = Fields & Partial<{ + _index?: string; 'input.type': string; 'log.file.path'?: string; 'service.name'?: string; @@ -74,6 +75,14 @@ export type LogDocument = Fields & svc: string; hostname: string; [LONG_FIELD_NAME]: string; + 'http.status_code'?: number; + 'http.request.method'?: string; + 'url.path'?: string; + 'process.name'?: string; + 'kubernetes.namespace'?: string; + 'kubernetes.pod.name'?: string; + 'kubernetes.container.name'?: string; + 'orchestrator.resource.name'?: string; }>; class Log extends Serializable { @@ -155,6 +164,16 @@ function create(logsOptions: LogsOptions = defaultLogsOptions): Log { ).dataset('synth'); } +function createForIndex(index: string): Log { + return new Log( + { + 'input.type': 'logs', + _index: index, + }, + defaultLogsOptions + ); +} + function createMinimal({ dataset = 'synth', namespace = 'default', @@ -176,6 +195,7 @@ function createMinimal({ export const log = { create, + createForIndex, createMinimal, }; diff --git a/packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts b/packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts index d6f899b2084ac..f4646de82d19f 100644 --- a/packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts +++ b/packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts @@ -48,6 +48,11 @@ function options(y: Argv) { description: 'Generate and index data continuously', boolean: true, }) + .option('liveBucketSize', { + description: 'Bucket size in ms for live streaming', + default: 1000, + number: true, + }) .option('clean', { describe: 'Clean APM indices before indexing new data', default: false, diff --git a/packages/kbn-apm-synthtrace/src/cli/utils/parse_run_cli_flags.ts b/packages/kbn-apm-synthtrace/src/cli/utils/parse_run_cli_flags.ts index d069f89b168a2..1a7b82dcd364b 100644 --- a/packages/kbn-apm-synthtrace/src/cli/utils/parse_run_cli_flags.ts +++ b/packages/kbn-apm-synthtrace/src/cli/utils/parse_run_cli_flags.ts @@ -74,7 +74,8 @@ export function parseRunCliFlags(flags: RunCliFlags) { 'concurrency', 'versionOverride', 'clean', - 'assume-package-version' + 'assume-package-version', + 'liveBucketSize' ), logLevel: parsedLogLevel, file: parsedFile, diff --git a/packages/kbn-apm-synthtrace/src/cli/utils/start_live_data_upload.ts b/packages/kbn-apm-synthtrace/src/cli/utils/start_live_data_upload.ts index 38404be151612..9478ae8f26af2 100644 --- a/packages/kbn-apm-synthtrace/src/cli/utils/start_live_data_upload.ts +++ b/packages/kbn-apm-synthtrace/src/cli/utils/start_live_data_upload.ts @@ -52,7 +52,7 @@ export async function startLiveDataUpload({ }); } - const bucketSizeInMs = 1000 * 60; + const bucketSizeInMs = runOptions.liveBucketSize; let requestedUntil = start; let currentStreams: PassThrough[] = []; diff --git a/packages/kbn-apm-synthtrace/src/lib/shared/data_stream_get_routing_transform.ts b/packages/kbn-apm-synthtrace/src/lib/shared/data_stream_get_routing_transform.ts index 40d1b05878c04..daa631a5ff111 100644 --- a/packages/kbn-apm-synthtrace/src/lib/shared/data_stream_get_routing_transform.ts +++ b/packages/kbn-apm-synthtrace/src/lib/shared/data_stream_get_routing_transform.ts @@ -16,7 +16,7 @@ export function getRoutingTransform(dataStreamType: string) { transform(document: ESDocumentWithOperation, encoding, callback) { if ('data_stream.dataset' in document && 'data_stream.namespace' in document) { document._index = `${dataStreamType}-${document['data_stream.dataset']}-${document['data_stream.namespace']}`; - } else { + } else if (!('_index' in document)) { throw new Error('Cannot determine index for event'); } diff --git a/packages/kbn-apm-synthtrace/src/scenarios/helpers/logs_mock_data.ts b/packages/kbn-apm-synthtrace/src/scenarios/helpers/logs_mock_data.ts index 5f3cbd5f054dd..9a8e90f295c61 100644 --- a/packages/kbn-apm-synthtrace/src/scenarios/helpers/logs_mock_data.ts +++ b/packages/kbn-apm-synthtrace/src/scenarios/helpers/logs_mock_data.ts @@ -22,12 +22,351 @@ const { // Arrays for data const LOG_LEVELS: string[] = ['FATAL', 'ERROR', 'WARN', 'INFO', 'DEBUG', 'TRACE']; -const JAVA_LOG_MESSAGES = [ +export const LINUX_PROCESSES = ['cron', 'sshd', 'systemd', 'nginx', 'apache2']; + +// generate 20 short ids to cycle through +const shortIds = Array.from({ length: 20 }, (_, i) => generateShortId()); + +export function getStableShortId() { + return shortIds[Math.floor(Math.random() * shortIds.length)]; +} + +export const getLinuxMessages = () => + ({ + cron: [ + `(${moment().toISOString()}) INFO: (CRON) User ran command: '/usr/bin/backup.sh'.`, + `(${moment().toISOString()}) WARN: (CRON) Missing crontab entry for user .`, + `(${moment().toISOString()}) ERROR: (CRON) Failed to execute '/usr/bin/backup.sh'.`, + `(${moment().toISOString()}) INFO: (CRON) New cron job added for user .`, + `(${moment().toISOString()}) DEBUG: (CRON) Skipping execution of disabled job 'jobID-${getStableShortId()}'.`, + `(${moment().toISOString()}) INFO: (CRON) Daily backup completed successfully in ${Math.floor( + Math.random() * 300 + )} seconds.`, + `(${moment().toISOString()}) ERROR: (CRON) Syntax error in crontab file for user .`, + `(${moment().toISOString()}) INFO: (CRON) Purged old log files during job 'jobID-${getStableShortId()}'.`, + `(${moment().toISOString()}) WARN: (CRON) Job 'jobID-${getStableShortId()}' exceeded timeout of ${Math.floor( + Math.random() * 3600 + )} seconds.`, + `(${moment().toISOString()}) INFO: (CRON) Executing job 'jobID-${getStableShortId()}' as user .`, + ], + sshd: [ + `${moment().toISOString()} INFO: sshd[${Math.floor( + Math.random() * 10000 + )}]: Accepted password for user from ${getIpAddress()} port ${ + 1024 + Math.floor(Math.random() * 50000) + }.`, + `${moment().toISOString()} WARN: sshd[${Math.floor( + Math.random() * 10000 + )}]: Failed password attempt for user from ${getIpAddress()} port ${ + 1024 + Math.floor(Math.random() * 50000) + }.`, + `${moment().toISOString()} INFO: sshd[${Math.floor( + Math.random() * 10000 + )}]: Connection closed by ${getIpAddress()} port ${ + 1024 + Math.floor(Math.random() * 50000) + }.`, + `${moment().toISOString()} ERROR: sshd[${Math.floor( + Math.random() * 10000 + )}]: Invalid public key for user .`, + `${moment().toISOString()} INFO: sshd[${Math.floor( + Math.random() * 10000 + )}]: Starting session for user .`, + `${moment().toISOString()} WARN: sshd[${Math.floor( + Math.random() * 10000 + )}]: Too many authentication failures from ${getIpAddress()}.`, + `${moment().toISOString()} INFO: sshd[${Math.floor( + Math.random() * 10000 + )}]: User disconnected.`, + `${moment().toISOString()} ERROR: sshd[${Math.floor( + Math.random() * 10000 + )}]: Attempt to use forbidden user .`, + `${moment().toISOString()} INFO: sshd[${Math.floor( + Math.random() * 10000 + )}]: Received SIGHUP signal. Reloading configuration.`, + `${moment().toISOString()} DEBUG: sshd[${Math.floor( + Math.random() * 10000 + )}]: Monitoring connections on port 22.`, + ], + systemd: [ + `${moment().toISOString()} INFO: systemd[${Math.floor( + Math.random() * 10000 + )}]: Started service .`, + `${moment().toISOString()} ERROR: systemd[${Math.floor( + Math.random() * 10000 + )}]: Failed to start service .`, + `${moment().toISOString()} INFO: systemd[${Math.floor( + Math.random() * 10000 + )}]: Stopped service .`, + `${moment().toISOString()} DEBUG: systemd[${Math.floor( + Math.random() * 10000 + )}]: Reloading daemon configuration.`, + `${moment().toISOString()} WARN: systemd[${Math.floor( + Math.random() * 10000 + )}]: Service restarted too many times.`, + `${moment().toISOString()} INFO: systemd[${Math.floor( + Math.random() * 10000 + )}]: Mounted .`, + `${moment().toISOString()} ERROR: systemd[${Math.floor( + Math.random() * 10000 + )}]: Unit entered failed state.`, + `${moment().toISOString()} INFO: systemd[${Math.floor( + Math.random() * 10000 + )}]: Timer triggered.`, + `${moment().toISOString()} WARN: systemd[${Math.floor( + Math.random() * 10000 + )}]: Service is inactive.`, + `${moment().toISOString()} DEBUG: systemd[${Math.floor( + Math.random() * 10000 + )}]: Service received SIGTERM.`, + ], + nginx: [ + `${moment().toISOString()} INFO: nginx[${Math.floor( + Math.random() * 10000 + )}]: Access log: ${getIpAddress()} - - [${moment().format( + 'DD/MMM/YYYY:HH:mm:ss Z' + )}] "GET /path-${getStableShortId()} HTTP/1.1" ${ + 200 + Math.floor(Math.random() * 100) + } ${Math.floor(Math.random() * 10000)}.`, + `${moment().toISOString()} ERROR: nginx[${Math.floor( + Math.random() * 10000 + )}]: 502 Bad Gateway for request to /path-${getStableShortId()}.`, + `${moment().toISOString()} WARN: nginx[${Math.floor( + Math.random() * 10000 + )}]: Upstream server timed out on /path-${getStableShortId()}.`, + `${moment().toISOString()} INFO: nginx[${Math.floor( + Math.random() * 10000 + )}]: Server restarted successfully.`, + `${moment().toISOString()} DEBUG: nginx[${Math.floor( + Math.random() * 10000 + )}]: Cache hit for /path-${getStableShortId()}.`, + ], + apache2: [ + `${moment().toISOString()} INFO: apache2[${Math.floor( + Math.random() * 10000 + )}]: GET /path-${getStableShortId()} HTTP/1.1" ${ + 200 + Math.floor(Math.random() * 100) + } ${Math.floor(Math.random() * 10000)} bytes.`, + `${moment().toISOString()} ERROR: apache2[${Math.floor( + Math.random() * 10000 + )}]: 500 Internal Server Error for request /path-${getStableShortId()}.`, + `${moment().toISOString()} WARN: apache2[${Math.floor( + Math.random() * 10000 + )}]: Worker process terminated unexpectedly.`, + `${moment().toISOString()} INFO: apache2[${Math.floor( + Math.random() * 10000 + )}]: Server restarted.`, + `${moment().toISOString()} DEBUG: apache2[${Math.floor( + Math.random() * 10000 + )}]: Keep-alive timeout on connection ${Math.floor(Math.random() * 10000)}.`, + ], + } as Record); + +export const KUBERNETES_SERVICES = [ + 'auth-service', + 'payment-service', + 'inventory-service', + 'ui-service', + 'notification-service', +]; + +export const getKubernetesMessages = () => + ({ + 'auth-service': [ + `User authenticated successfully at ${moment().toISOString()}.`, + `Failed login attempt for user at ${moment().toISOString()}.`, + `Token expired for user .`, + `Session started for user .`, + `Password reset requested by user .`, + `Invalid JWT token provided for user .`, + `New user registered at ${moment().toISOString()}.`, + `MFA challenge triggered for user .`, + `MFA challenge succeeded for user .`, + `MFA challenge failed for user .`, + `Access revoked for user .`, + `User deleted their account.`, + `Permission granted for resource .`, + `Permission denied for resource .`, + `Role updated for user .`, + `User logged out.`, + `Invalid credentials provided for user .`, + `Security alert triggered at ${moment().toISOString()} for user .`, + `Session expired for user .`, + `Password changed successfully for user .`, + ], + 'payment-service': [ + `Payment of $${(Math.random() * 1000).toFixed( + 2 + )} processed successfully at ${moment().toISOString()}.`, + `Card declined for transaction .`, + `Refund initiated for transaction .`, + `Refund of $${(Math.random() * 500).toFixed(2)} processed successfully.`, + `Payment gateway timeout during transaction .`, + `Fraudulent transaction detected at ${moment().toISOString()}.`, + `Payment pending approval for transaction .`, + `Payment gateway configuration error.`, + `Payment of $${(Math.random() * 200).toFixed( + 2 + )} canceled by user .`, + `Recurring payment of $${(Math.random() * 50).toFixed(2)} initiated.`, + `Subscription for renewed successfully.`, + `Subscription for canceled.`, + `Invoice generated.`, + `Invoice sent to user .`, + `Payment method added for user .`, + `Payment method removed for user .`, + `Credit limit exceeded for user .`, + `Insufficient funds for transaction .`, + `Transaction rollback initiated for .`, + `Chargeback received for transaction .`, + ], + 'inventory-service': [ + `Stock level updated for item : ${Math.floor( + Math.random() * 500 + )} units remaining.`, + `Item added to catalog at ${moment().toISOString()}.`, + `Item removed from catalog.`, + `Stock alert for item : Low inventory (${Math.floor( + Math.random() * 20 + )} units left).`, + `Stock replenished for item .`, + `Inventory check completed for warehouse .`, + `Item flagged as discontinued.`, + `Bulk update performed on inventory.`, + `Price updated for item .`, + `Warehouse status: Operational.`, + `Warehouse reported system failure.`, + `Item backordered.`, + `New shipment received for item .`, + `Item sold out.`, + `Item marked for promotion.`, + `Warehouse restocked.`, + `Stock audit started at ${moment().toISOString()}.`, + `Inventory discrepancy reported for item .`, + `Restock delayed for item .`, + `Item reserved for order .`, + ], + 'ui-service': [ + `Page rendered successfully at ${moment().toISOString()}.`, + `User clicked button .`, + `API call to completed in ${Math.floor( + Math.random() * 300 + )}ms.`, + `UI component loaded successfully.`, + `UI component failed to load.`, + `Session timeout for user .`, + `Error rendering component : Invalid data.`, + `User navigated to .`, + `CSS stylesheet loaded.`, + `JavaScript file executed.`, + `UI error at ${moment().toISOString()}: Cannot read property 'undefined'.`, + `Form submitted by user .`, + `Dialog displayed.`, + `Modal closed by user.`, + `Drag-and-drop interaction started.`, + `Drag-and-drop interaction completed.`, + `Keyboard shortcut activated: Ctrl+${String.fromCharCode( + 65 + Math.floor(Math.random() * 26) + )}.`, + `New notification displayed to user .`, + `UI settings updated by user .`, + `User logged out from UI.`, + ], + 'notification-service': [ + `Email sent to user .`, + `Push notification delivered to user .`, + `SMS sent to phone number .`, + `Email delivery failed for user .`, + `Push notification failed for user .`, + `SMS delivery failed for phone number .`, + `User opted out of notifications.`, + `New email template created.`, + `New push notification template created.`, + `New SMS template created.`, + `Batch email sent to ${Math.floor(Math.random() * 500)} recipients.`, + `Batch push notifications sent to ${Math.floor(Math.random() * 500)} recipients.`, + `Batch SMS sent to ${Math.floor(Math.random() * 500)} recipients.`, + `Template deleted.`, + `Notification settings updated for user .`, + `Email verification sent to user .`, + `Password reset notification sent to user .`, + `Marketing email sent to user .`, + `Reminder notification sent to user .`, + `System maintenance notification sent to all users.`, + ], + } as Record); + +export const getJavaMessages = () => [ '[main] com.example1.core.ApplicationCore - Critical failure: NullPointerException encountered during startup', - '[main] com.example.service.UserService - User registration completed for userId: 12345', + '[main] com.example1.core.ApplicationCore - Application started successfully in 3456ms', + '[main] com.example1.core.ApplicationCore - Configuring bean "dataSource" of type HikariCP', + '[main] com.example1.core.ApplicationCore - Memory usage threshold exceeded. GC invoked.', + '[main] com.example1.core.ApplicationCore - Shutting down gracefully on SIGTERM', + + '[main] com.example2.service.PaymentService - Payment processed successfully for orderId: ORD-' + + generateShortId(), + '[main] com.example2.service.PaymentService - Failed to process payment for orderId: ORD-' + + generateShortId() + + '. Reason: Insufficient funds.', + '[main] com.example2.service.PaymentService - Payment gateway timeout for orderId: ORD-' + + generateShortId(), + '[main] com.example2.service.PaymentService - Initiating refund for transactionId: TXN-' + + generateShortId(), + '[main] com.example2.service.PaymentService - Payment retry attempt started for orderId: ORD-' + + generateShortId(), + '[main] com.example3.util.JsonParser - Parsing JSON response from external API', - '[main] com.example4.security.AuthManager - Unauthorized access attempt detected for userId: 67890', + '[main] com.example3.util.JsonParser - Invalid JSON encountered: {"invalid_key":"missing_value"}', + '[main] com.example3.util.JsonParser - Successfully parsed JSON for userId: ' + + Math.floor(Math.random() * 10000), + '[main] com.example3.util.JsonParser - JSON parsing failed due to org.json.JSONException: Unterminated string', + '[main] com.example3.util.JsonParser - Fallback to default configuration triggered due to parsing error', + + '[main] com.example4.security.AuthManager - Unauthorized access attempt detected for userId: ' + + Math.floor(Math.random() * 100000), + '[main] com.example4.security.AuthManager - Password updated for userId: ' + + Math.floor(Math.random() * 100000), + '[main] com.example4.security.AuthManager - User account locked after 3 failed login attempts for userId: ' + + Math.floor(Math.random() * 100000), + '[main] com.example4.security.AuthManager - Token validation failed for token: TOKEN-' + + generateShortId(), + '[main] com.example4.security.AuthManager - User session terminated for userId: ' + + Math.floor(Math.random() * 100000), + '[main] com.example5.dao.UserDao - Database query failed: java.sql.SQLException: Timeout expired', + '[main] com.example5.dao.UserDao - Retrieved 10 results for query: SELECT * FROM users WHERE status = "active"', + '[main] com.example5.dao.UserDao - Connection pool exhausted. Waiting for available connection.', + '[main] com.example5.dao.UserDao - Insert operation succeeded for userId: ' + + Math.floor(Math.random() * 100000), + '[main] com.example5.dao.UserDao - Detected stale connection. Retrying operation.', + + '[main] com.example6.metrics.MetricsCollector - Reporting CPU usage: ' + + (Math.random() * 100).toFixed(2) + + '%', + '[main] com.example6.metrics.MetricsCollector - Application uptime: ' + + Math.floor(Math.random() * 86400) + + ' seconds', + '[main] com.example6.metrics.MetricsCollector - Memory usage: Heap=128MB Non-Heap=64MB', + '[main] com.example6.metrics.MetricsCollector - GC activity detected. Time taken: ' + + Math.floor(Math.random() * 100) + + 'ms', + '[main] com.example6.metrics.MetricsCollector - Collected metrics for 15 services', + + '[main] com.example7.messaging.MessageQueue - Message published to queue "orders" with messageId: MSG-' + + generateShortId(), + '[main] com.example7.messaging.MessageQueue - Consumer failed to process messageId: MSG-' + + generateShortId() + + '. Error: NullPointerException', + '[main] com.example7.messaging.MessageQueue - Queue "notifications" has 50 pending messages', + '[main] com.example7.messaging.MessageQueue - Retrying message delivery for messageId: MSG-' + + generateShortId(), + '[main] com.example7.messaging.MessageQueue - Dead-letter queue reached maximum size. Oldest messages purged.', + + '[main] com.example8.integration.ExternalServiceClient - HTTP 200: Successfully received response from "https://api.example.com/v1/resource"', + '[main] com.example8.integration.ExternalServiceClient - HTTP 500: Internal Server Error while accessing "https://api.example.com/v1/resource"', + '[main] com.example8.integration.ExternalServiceClient - Connection timeout occurred after 30 seconds', + '[main] com.example8.integration.ExternalServiceClient - Retrying request to endpoint "https://api.example.com/v1/resource"', + '[main] com.example8.integration.ExternalServiceClient - API key validation failed for key: APIKEY-' + + generateShortId(), ]; const IP_ADDRESSES = [ @@ -71,14 +410,25 @@ export const getCloudRegion = (index?: number) => getAtIndexOrRandom(CLOUD_REGIO export const getServiceName = (index?: number) => getAtIndexOrRandom(SERVICE_NAMES, index); export const getAgentName = (index?: number) => getAtIndexOrRandom(ELASTIC_AGENT_NAMES, index); -export const getJavaLog = () => - `${moment().format('YYYY-MM-DD HH:mm:ss,SSS')} ${getAtIndexOrRandom( - LOG_LEVELS - )} ${getAtIndexOrRandom(JAVA_LOG_MESSAGES)}`; +export const getJavaLogs = () => { + const javaLogMessages = getJavaMessages(); + return getRandomRange().map( + () => + `${moment().format('YYYY-MM-DD HH:mm:ss,SSS')} ${getAtIndexOrRandom( + LOG_LEVELS + )} ${getAtIndexOrRandom(javaLogMessages)}` + ); +}; + +export function getRandomRange() { + return Array.from({ length: Math.floor(Math.random() * 1000) + 1 }).fill(null); +} -export const getWebLog = () => { - const path = `/api/${noun()}/${verb()}`; - const bytes = randomInt(100, 4000); +export const getWebLogs = () => { + return getRandomRange().map(() => { + const path = `/api/${noun()}/${verb()}`; + const bytes = randomInt(100, 4000); - return `${ipv4()} - - [${moment().toISOString()}] "${httpMethod()} ${path} HTTP/1.1" ${httpStatusCode()} ${bytes} "-" "${userAgent()}"`; + return `${ipv4()} - - [${moment().toISOString()}] "${httpMethod()} ${path} HTTP/1.1" ${httpStatusCode()} ${bytes} "-" "${userAgent()}"`; + }); }; diff --git a/packages/kbn-apm-synthtrace/src/scenarios/slash_logs.ts b/packages/kbn-apm-synthtrace/src/scenarios/slash_logs.ts new file mode 100644 index 0000000000000..26c998f658661 --- /dev/null +++ b/packages/kbn-apm-synthtrace/src/scenarios/slash_logs.ts @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { LogDocument, generateShortId, log } from '@kbn/apm-synthtrace-client'; +import { Scenario } from '../cli/scenario'; +import { withClient } from '../lib/utils/with_client'; +import { + getAgentName, + getCloudProvider, + getCloudRegion, + getIpAddress, + getJavaLogs, + getServiceName, + getWebLogs, + getKubernetesMessages, + getLinuxMessages, + KUBERNETES_SERVICES, + getStableShortId, + getRandomRange, +} from './helpers/logs_mock_data'; +import { getAtIndexOrRandom } from './helpers/get_at_index_or_random'; + +const LINUX_PROCESSES = ['cron', 'sshd', 'systemd', 'nginx', 'apache2']; + +const scenario: Scenario = async (runOptions) => { + const constructCommonMetadata = () => ({ + 'agent.name': getAgentName(), + 'cloud.provider': getCloudProvider(), + 'cloud.region': getCloudRegion(Math.floor(Math.random() * 3)), + 'cloud.availability_zone': `${getCloudRegion(0)}a`, + 'cloud.instance.id': generateShortId(), + 'cloud.project.id': generateShortId(), + }); + + const generateNginxLogs = (timestamp: number) => { + return getWebLogs().map((message) => { + return log + .createForIndex('logs') + .setHostIp(getIpAddress()) + .message(message) + .defaults({ + ...constructCommonMetadata(), + 'log.file.path': `/var/log/nginx/access-${getStableShortId()}.log`, + }) + .timestamp(timestamp); + }); + }; + + const generateSyslogData = (timestamp: number) => { + const messages: Record = getLinuxMessages(); + + return getRandomRange().map(() => { + const processName = getAtIndexOrRandom(LINUX_PROCESSES); + const message = getAtIndexOrRandom(messages[processName]); + return log + .createForIndex('logs') + .message(message) + .setHostIp(getIpAddress()) + .defaults({ + ...constructCommonMetadata(), + 'process.name': processName, + 'log.file.path': `/var/log/${processName}.log`, + }) + .timestamp(timestamp); + }); + }; + + const generateKubernetesLogs = (timestamp: number) => { + const messages: Record = getKubernetesMessages(); + + return getRandomRange().map(() => { + const service = getAtIndexOrRandom(KUBERNETES_SERVICES); + const isStringifiedJSON = Math.random() > 0.5; + const message = isStringifiedJSON + ? JSON.stringify({ + serviceName: service, + message: getAtIndexOrRandom(messages[service]), + }) + : getAtIndexOrRandom(messages[service]); + return log + .createForIndex('logs') + .message(message) + .setHostIp(getIpAddress()) + .defaults({ + ...constructCommonMetadata(), + 'kubernetes.namespace': 'default', + 'kubernetes.pod.name': `${service}-pod-${getStableShortId()}`, + 'kubernetes.container.name': `${service}-container`, + 'orchestrator.resource.name': service, + }) + .timestamp(timestamp); + }); + }; + + const generateUnparsedJavaLogs = (timestamp: number) => { + return getJavaLogs().map((message) => { + const serviceName = getServiceName(Math.floor(Math.random() * 3)); + return log + .createForIndex('logs') + .message(message) + .defaults({ + ...constructCommonMetadata(), + 'service.name': serviceName, + }) + .timestamp(timestamp); + }); + }; + + return { + generate: ({ range, clients: { logsEsClient } }) => { + const { logger } = runOptions; + + const nginxLogs = range.interval('1m').generator(generateNginxLogs); + const syslogData = range.interval('1m').generator(generateSyslogData); + const kubernetesLogs = range.interval('1m').generator(generateKubernetesLogs); + const unparsedJavaLogs = range.interval('1m').generator(generateUnparsedJavaLogs); + + return withClient( + logsEsClient, + logger.perf('generating_messy_logs', () => [ + nginxLogs, + syslogData, + kubernetesLogs, + unparsedJavaLogs, + ]) + ); + }, + }; +}; + +export default scenario; diff --git a/packages/kbn-apm-synthtrace/src/scenarios/unstructured_logs.ts b/packages/kbn-apm-synthtrace/src/scenarios/unstructured_logs.ts index 704cfd21bbc09..b87fd7038a7d3 100644 --- a/packages/kbn-apm-synthtrace/src/scenarios/unstructured_logs.ts +++ b/packages/kbn-apm-synthtrace/src/scenarios/unstructured_logs.ts @@ -12,7 +12,7 @@ import { Scenario } from '../cli/scenario'; import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates'; import { withClient } from '../lib/utils/with_client'; import { parseLogsScenarioOpts } from './helpers/logs_scenario_opts_parser'; -import { getJavaLog, getWebLog } from './helpers/logs_mock_data'; +import { getJavaLogs, getWebLogs } from './helpers/logs_mock_data'; const scenario: Scenario = async (runOptions) => { const { isLogsDb } = parseLogsScenarioOpts(runOptions.scenarioOpts); @@ -23,19 +23,18 @@ const scenario: Scenario = async (runOptions) => { generate: ({ range, clients: { logsEsClient } }) => { const { logger } = runOptions; - const datasetJavaLogs = (timestamp: number) => - log.create({ isLogsDb }).dataset('java').message(getJavaLog()).timestamp(timestamp); - - const datasetWebLogs = (timestamp: number) => - log.create({ isLogsDb }).dataset('web').message(getWebLog()).timestamp(timestamp); - const logs = range .interval('1m') .rate(1) .generator((timestamp) => { - return Array(200) - .fill(0) - .flatMap((_, index) => [datasetJavaLogs(timestamp), datasetWebLogs(timestamp)]); + return [ + ...getJavaLogs().map((message) => + log.create({ isLogsDb }).dataset('java').message(message).timestamp(timestamp) + ), + ...getWebLogs().map((message) => + log.create({ isLogsDb }).dataset('web').message(message).timestamp(timestamp) + ), + ]; }); return withClient( diff --git a/x-pack/plugins/streams/common/types.ts b/x-pack/plugins/streams/common/types.ts index 6cdb2f923f6f4..d3aa43911ec2c 100644 --- a/x-pack/plugins/streams/common/types.ts +++ b/x-pack/plugins/streams/common/types.ts @@ -72,7 +72,7 @@ export const streamWithoutIdDefinitonSchema = z.object({ .array( z.object({ id: z.string(), - condition: conditionSchema, + condition: z.optional(conditionSchema), }) ) .default([]), diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts index 82c89c9ab9171..4763aacb44478 100644 --- a/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts @@ -30,7 +30,8 @@ export function generateLayer( template: { settings: isRoot(definition.id) ? logsSettings : {}, mappings: { - subobjects: false, + subobjects: true, // TODO set to false once this works on Elasticsearch side - right now fields are not properly indexed. + dynamic: false, properties, }, }, diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts index aab7f27f12d14..8c63d7caa8811 100644 --- a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts +++ b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts @@ -7,48 +7,48 @@ import { conditionToPainless } from './condition_to_painless'; -const operatorConditionAndResutls = [ +const operatorConditionAndResults = [ { condition: { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, - result: 'ctx.log?.logger == "nginx_proxy"', + result: '(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy")', }, { condition: { field: 'log.logger', operator: 'neq' as const, value: 'nginx_proxy' }, - result: 'ctx.log?.logger != "nginx_proxy"', + result: '(ctx.log?.logger !== null && ctx.log?.logger != "nginx_proxy")', }, { condition: { field: 'http.response.status_code', operator: 'lt' as const, value: 500 }, - result: 'ctx.http?.response?.status_code < 500', + result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code < 500)', }, { condition: { field: 'http.response.status_code', operator: 'lte' as const, value: 500 }, - result: 'ctx.http?.response?.status_code <= 500', + result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code <= 500)', }, { condition: { field: 'http.response.status_code', operator: 'gt' as const, value: 500 }, - result: 'ctx.http?.response?.status_code > 500', + result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code > 500)', }, { condition: { field: 'http.response.status_code', operator: 'gte' as const, value: 500 }, - result: 'ctx.http?.response?.status_code >= 500', + result: '(ctx.http?.response?.status_code !== null && ctx.http?.response?.status_code >= 500)', }, { condition: { field: 'log.logger', operator: 'startsWith' as const, value: 'nginx' }, - result: 'ctx.log?.logger.startsWith("nginx")', + result: '(ctx.log?.logger !== null && ctx.log?.logger.startsWith("nginx"))', }, { condition: { field: 'log.logger', operator: 'endsWith' as const, value: 'proxy' }, - result: 'ctx.log?.logger.endsWith("proxy")', + result: '(ctx.log?.logger !== null && ctx.log?.logger.endsWith("proxy"))', }, { condition: { field: 'log.logger', operator: 'contains' as const, value: 'proxy' }, - result: 'ctx.log?.logger.contains("proxy")', + result: '(ctx.log?.logger !== null && ctx.log?.logger.contains("proxy"))', }, ]; describe('conditionToPainless', () => { describe('operators', () => { - operatorConditionAndResutls.forEach((setup) => { + operatorConditionAndResults.forEach((setup) => { test(`${setup.condition.operator}`, () => { expect(conditionToPainless(setup.condition)).toEqual(setup.result); }); @@ -65,7 +65,7 @@ describe('conditionToPainless', () => { }; expect( expect(conditionToPainless(condition)).toEqual( - 'ctx.log?.logger == "nginx_proxy" && ctx.log?.level == "error"' + '(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") && (ctx.log?.level !== null && ctx.log?.level == "error")' ) ); }); @@ -81,7 +81,7 @@ describe('conditionToPainless', () => { }; expect( expect(conditionToPainless(condition)).toEqual( - 'ctx.log?.logger == "nginx_proxy" || ctx.log?.level == "error"' + '(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") || (ctx.log?.level !== null && ctx.log?.level == "error")' ) ); }); @@ -102,7 +102,7 @@ describe('conditionToPainless', () => { }; expect( expect(conditionToPainless(condition)).toEqual( - 'ctx.log?.logger == "nginx_proxy" && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")' + '(ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") && ((ctx.log?.level !== null && ctx.log?.level == "error") || (ctx.log?.level !== null && ctx.log?.level == "ERROR"))' ) ); }); @@ -125,7 +125,7 @@ describe('conditionToPainless', () => { }; expect( expect(conditionToPainless(condition)).toEqual( - '(ctx.log?.logger == "nginx_proxy" || ctx.service?.name == "nginx") && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")' + '((ctx.log?.logger !== null && ctx.log?.logger == "nginx_proxy") || (ctx.service?.name !== null && ctx.service?.name == "nginx")) && ((ctx.log?.level !== null && ctx.log?.level == "error") || (ctx.log?.level !== null && ctx.log?.level == "ERROR"))' ) ); }); diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts index 539ad3603535b..2cccef260d7e1 100644 --- a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts +++ b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts @@ -69,7 +69,7 @@ function toPainless(condition: FilterCondition) { export function conditionToPainless(condition: Condition, nested = false): string { if (isFilterCondition(condition)) { - return toPainless(condition); + return `(${safePainlessField(condition)} !== null && ${toPainless(condition)})`; } if (isAndCondition(condition)) { const and = condition.and.map((filter) => conditionToPainless(filter, true)).join(' && '); diff --git a/x-pack/plugins/streams/server/routes/streams/delete.ts b/x-pack/plugins/streams/server/routes/streams/delete.ts index 369568ff9b7f0..a2092838792cf 100644 --- a/x-pack/plugins/streams/server/routes/streams/delete.ts +++ b/x-pack/plugins/streams/server/routes/streams/delete.ts @@ -52,6 +52,7 @@ export const deleteStreamRoute = createServerRoute({ throw new MalformedStreamId('Cannot delete root stream'); } + // need to update parent first to cut off documents streaming down await updateParentStream(scopedClusterClient, params.path.id, parentId, logger); await deleteStream(scopedClusterClient, params.path.id, logger); diff --git a/x-pack/plugins/streams/server/routes/streams/edit.ts b/x-pack/plugins/streams/server/routes/streams/edit.ts index 378f1ba3c8f01..cda73907d2302 100644 --- a/x-pack/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/plugins/streams/server/routes/streams/edit.ts @@ -57,22 +57,10 @@ export const editStreamRoute = createServerRoute({ const parentId = getParentId(params.path.id); let parentDefinition: StreamDefinition | undefined; - if (parentId) { - parentDefinition = await updateParentStream( - scopedClusterClient, - parentId, - params.path.id, - logger - ); - } const streamDefinition = { ...params.body }; - await syncStream({ - scopedClusterClient, - definition: { ...streamDefinition, id: params.path.id }, - rootDefinition: parentDefinition, - logger, - }); + // always need to go from the leaves to the parent when syncing ingest pipelines, otherwise data + // will be routed before the data stream is ready for (const child of streamDefinition.children) { const streamExists = await checkStreamExists({ @@ -97,6 +85,22 @@ export const editStreamRoute = createServerRoute({ }); } + await syncStream({ + scopedClusterClient, + definition: { ...streamDefinition, id: params.path.id }, + rootDefinition: parentDefinition, + logger, + }); + + if (parentId) { + parentDefinition = await updateParentStream( + scopedClusterClient, + parentId, + params.path.id, + logger + ); + } + return { acknowledged: true }; } catch (e) { if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { diff --git a/x-pack/plugins/streams/server/routes/streams/fork.ts b/x-pack/plugins/streams/server/routes/streams/fork.ts index 8294aebb0b0e9..12dce248dcdd1 100644 --- a/x-pack/plugins/streams/server/routes/streams/fork.ts +++ b/x-pack/plugins/streams/server/routes/streams/fork.ts @@ -76,21 +76,22 @@ export const forkStreamsRoute = createServerRoute({ params.body.stream.fields ); - rootDefinition.children.push({ - id: params.body.stream.id, - condition: params.body.condition, - }); - + // need to create the child first, otherwise we risk streaming data even though the child data stream is not ready await syncStream({ scopedClusterClient, - definition: rootDefinition, + definition: childDefinition, rootDefinition, logger, }); + rootDefinition.children.push({ + id: params.body.stream.id, + condition: params.body.condition, + }); + await syncStream({ scopedClusterClient, - definition: childDefinition, + definition: rootDefinition, rootDefinition, logger, });