Skip to content

Commit

Permalink
[Reporting] Establish event-based telemetry for report job lifecycles (
Browse files Browse the repository at this point in the history
…#176775)

## Summary

Closes #158501


### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] Use internal staging cluster to Examine events of telemetry

<img width="1593" alt="Screenshot 2024-02-15 at 9 27 01 PM"
src="https://github.com/elastic/kibana/assets/908371/8b0521d7-0768-412a-8e12-a666e73e4ea3">
  • Loading branch information
tsullivan authored Feb 21, 2024
1 parent 633509b commit 34026be
Show file tree
Hide file tree
Showing 17 changed files with 745 additions and 112 deletions.
20 changes: 18 additions & 2 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as Rx from 'rxjs';
import { map, take } from 'rxjs/operators';

import type {
AnalyticsServiceStart,
CoreSetup,
DocLinksServiceSetup,
IBasePath,
Expand Down Expand Up @@ -53,6 +54,7 @@ import { reportingEventLoggerFactory } from './lib/event_logger/logger';
import type { IReport, ReportingStore } from './lib/store';
import { ExecuteReportTask, ReportTaskParams } from './lib/tasks';
import type { ReportingPluginRouter } from './types';
import { EventTracker } from './usage';

export interface ReportingInternalSetup {
basePath: Pick<IBasePath, 'set'>;
Expand All @@ -69,6 +71,7 @@ export interface ReportingInternalSetup {

export interface ReportingInternalStart {
store: ReportingStore;
analytics: AnalyticsServiceStart;
savedObjects: SavedObjectsServiceStart;
uiSettings: UiSettingsServiceStart;
esClient: IClusterClient;
Expand Down Expand Up @@ -301,13 +304,26 @@ export class ReportingCore {
}

/*
*
* Track usage of code paths for telemetry
* Track usage of API endpoints
*/
public getUsageCounter(): UsageCounter | undefined {
return this.pluginSetupDeps?.usageCounter;
}

/*
* Track metrics of internal events
*/
public getEventTracker(
reportId: string,
exportType: string,
objectType: string
): EventTracker | undefined {
const { analytics } = this.pluginStartDeps ?? {};
if (analytics) {
return new EventTracker(analytics, reportId, exportType, objectType);
}
}

/*
* Gives async access to the startDeps
*/
Expand Down
18 changes: 13 additions & 5 deletions x-pack/plugins/reporting/server/lib/tasks/execute_report.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
* 2.0.
*/

import { estypes } from '@elastic/elasticsearch';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { KibanaShuttingDownError } from '@kbn/reporting-common';
import { ReportDocument } from '@kbn/reporting-common/types';
import { createMockConfigSchema } from '@kbn/reporting-mocks-server';
import type { ExportType, ReportingConfigType } from '@kbn/reporting-server';
import type { RunContext } from '@kbn/task-manager-plugin/server';
Expand All @@ -15,7 +17,6 @@ import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { ExecuteReportTask } from '.';
import type { ReportingCore } from '../..';
import { createMockReportingCore } from '../../test_helpers';
import type { SavedReport } from '../store';

const logger = loggingSystemMock.createLogger();

Expand Down Expand Up @@ -97,11 +98,18 @@ describe('Execute Report Task', () => {
validLicenses: [],
} as unknown as ExportType);
const store = await mockReporting.getStore();
store.setReportError = jest.fn(() => Promise.resolve({} as any));
const task = new ExecuteReportTask(mockReporting, configType, logger);
task._claimJob = jest.fn(() =>
Promise.resolve({ _id: 'test', jobtype: 'noop', status: 'pending' } as SavedReport)
store.setReportError = jest.fn(() =>
Promise.resolve({
_id: 'test',
jobtype: 'noop',
status: 'processing',
} as unknown as estypes.UpdateUpdateWriteResponseBase<ReportDocument>)
);
const task = new ExecuteReportTask(mockReporting, configType, logger);
jest
// @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance
.spyOn(task, '_claimJob')
.mockResolvedValueOnce({ _id: 'test', jobtype: 'noop', status: 'pending' } as never);
const mockTaskManager = taskManagerMock.createStart();
await task.init(mockTaskManager);

Expand Down
71 changes: 62 additions & 9 deletions x-pack/plugins/reporting/server/lib/tasks/execute_report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
isExecutionError,
mapToReportingError,
} from '../../../common/errors/map_to_reporting_error';
import { EventTracker } from '../../usage';
import type { ReportingStore } from '../store';
import { Report, SavedReport } from '../store';
import type { ReportFailedFields, ReportProcessingFields } from '../store/store';
Expand Down Expand Up @@ -103,8 +104,9 @@ export class ExecuteReportTask implements ReportingTask {
private taskManagerStart?: TaskManagerStartContract;
private kibanaId?: string;
private kibanaName?: string;
private store?: ReportingStore;
private exportTypesRegistry: ExportTypesRegistry;
private store?: ReportingStore;
private eventTracker?: EventTracker;

constructor(
private reporting: ReportingCore,
Expand Down Expand Up @@ -146,12 +148,26 @@ export class ExecuteReportTask implements ReportingTask {
return this.taskManagerStart;
}

private getEventTracker(report: Report) {
if (this.eventTracker) {
return this.eventTracker;
}

const eventTracker = this.reporting.getEventTracker(
report._id,
report.jobtype,
report.payload.objectType
);
this.eventTracker = eventTracker;
return this.eventTracker;
}

private getJobContentEncoding(jobType: string) {
const exportType = this.exportTypesRegistry.getByJobType(jobType);
return exportType.jobContentEncoding;
}

public async _claimJob(task: ReportTaskParams): Promise<SavedReport> {
private async _claimJob(task: ReportTaskParams): Promise<SavedReport> {
if (this.kibanaId == null) {
throw new Error(`Kibana instance ID is undefined!`);
}
Expand Down Expand Up @@ -218,6 +234,11 @@ export class ExecuteReportTask implements ReportingTask {
`[process_expiration: ${expirationTime}]`
);

// event tracking of claimed job
const eventTracker = this.getEventTracker(report);
const timeSinceCreation = Date.now() - new Date(report.created_at).valueOf();
eventTracker?.claimJob({ timeSinceCreation });

const resp = await store.setReportClaimed(claimedReport, doc);
claimedReport._seq_no = resp._seq_no;
claimedReport._primary_term = resp._primary_term;
Expand All @@ -241,12 +262,21 @@ export class ExecuteReportTask implements ReportingTask {

// update the report in the store
const store = await this.getStore();
const completedTime = moment().toISOString();
const completedTime = moment();
const doc: ReportFailedFields = {
completed_at: completedTime,
completed_at: completedTime.toISOString(),
output: docOutput ?? null,
};

// event tracking of failed job
const eventTracker = this.getEventTracker(report);
const timeSinceCreation = Date.now() - new Date(report.created_at).valueOf();
eventTracker?.failJob({
timeSinceCreation,
errorCode: docOutput?.error_code ?? 'unknown',
errorMessage: error?.message ?? 'unknown',
});

return await store.setReportFailed(report, doc);
}

Expand Down Expand Up @@ -293,7 +323,7 @@ export class ExecuteReportTask implements ReportingTask {
return docOutput;
}

public async _performJob(
private async _performJob(
task: ReportTaskParams,
taskInstanceFields: TaskInstanceFields,
cancellationToken: CancellationToken,
Expand All @@ -314,19 +344,19 @@ export class ExecuteReportTask implements ReportingTask {
);
}

public async _completeJob(
private async _completeJob(
report: SavedReport,
output: CompletedReportOutput
): Promise<SavedReport> {
let docId = `/${report._index}/_doc/${report._id}`;

this.logger.debug(`Saving ${report.jobtype} to ${docId}.`);

const completedTime = moment().toISOString();
const completedTime = moment();
const docOutput = this._formatOutput(output);
const store = await this.getStore();
const doc = {
completed_at: completedTime,
completed_at: completedTime.toISOString(),
metrics: output.metrics,
output: docOutput,
};
Expand All @@ -337,12 +367,35 @@ export class ExecuteReportTask implements ReportingTask {
this.logger.info(`Saved ${report.jobtype} job ${docId}`);
report._seq_no = resp._seq_no;
report._primary_term = resp._primary_term;

// event tracking of completed job
const eventTracker = this.getEventTracker(report);
const byteSize = docOutput.size;
const timeSinceCreation = completedTime.valueOf() - new Date(report.created_at).valueOf();

if (output.metrics?.csv != null) {
eventTracker?.completeJobCsv({
byteSize,
timeSinceCreation,
csvRows: output.metrics.csv.rows ?? -1,
});
} else if (output.metrics?.pdf != null || output.metrics?.png != null) {
const { width, height } = report.payload.layout?.dimensions ?? {};
eventTracker?.completeJobScreenshot({
byteSize,
timeSinceCreation,
screenshotLayout: report.payload.layout?.id ?? 'preserve_layout',
numPages: output.metrics.pdf?.pages ?? -1,
screenshotPixels: Math.round((width ?? 0) * (height ?? 0)),
});
}

return report;
}

// Generic is used to let TS infer the return type at call site.
private async throwIfKibanaShutsDown<T>(): Promise<T> {
await this.reporting.getKibanaShutdown$().toPromise();
await Rx.firstValueFrom(this.reporting.getKibanaShutdown$());
throw new KibanaShuttingDownError();
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/plugins/reporting/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import type {
ReportingStartDeps,
} from './types';
import { ReportingRequestHandlerContext } from './types';
import { registerReportingUsageCollector } from './usage';
import { registerReportingEventTypes, registerReportingUsageCollector } from './usage';

/*
* @internal
Expand Down Expand Up @@ -74,6 +74,7 @@ export class ReportingPlugin
registerUiSettings(core);
registerDeprecations({ core, reportingCore });
registerReportingUsageCollector(reportingCore, plugins.usageCollection);
registerReportingEventTypes(core);

// Routes
registerRoutes(reportingCore, this.logger);
Expand Down Expand Up @@ -108,6 +109,7 @@ export class ReportingPlugin
await reportingCore.pluginStart({
logger,
esClient: elasticsearch.client,
analytics: core.analytics,
savedObjects,
uiSettings,
store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const validation = {

/**
* Handles the common parts of requests to generate a report
* Serves report job handling in the context of the request to generate the report
*/
export class RequestHandler {
constructor(
Expand Down Expand Up @@ -210,6 +211,15 @@ export class RequestHandler {

// return task manager's task information and the download URL
counters.usageCounter();
const eventTracker = reporting.getEventTracker(
report._id,
exportTypeId,
jobParams.objectType
);
eventTracker?.createReport({
isDeprecated: Boolean(report.payload.isDeprecated),
isPublicApi: this.path.match(/internal/) === null,
});

return this.res.ok<ReportingJobResponse>({
headers: { 'content-type': 'application/json' },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {
};

if (filename) {
// event tracking of the downloaded file, if
// the report job was completed successfully
// and a file is available
const eventTracker = reporting.getEventTracker(docId, doc.jobtype, doc.payload.objectType);
const timeSinceCreation = Date.now() - new Date(doc.created_at).valueOf();
eventTracker?.downloadReport({ timeSinceCreation });

return res.file({ body, headers, filename });
}

Expand Down Expand Up @@ -109,6 +116,11 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {

await jobsQuery.delete(docIndex, docId);

// event tracking of the deleted report
const eventTracker = reporting.getEventTracker(docId, doc.jobtype, doc.payload.objectType);
const timeSinceCreation = Date.now() - new Date(doc.created_at).valueOf();
eventTracker?.deleteReport({ timeSinceCreation });

return res.ok({
body: { deleted: true },
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import Boom from '@hapi/boom';

import { IKibanaResponse, kibanaResponseFactory } from '@kbn/core/server';
import { ReportApiJSON } from '@kbn/reporting-common/types';
import { JobId, ReportApiJSON } from '@kbn/reporting-common/types';
import { i18n } from '@kbn/i18n';
import { Counters } from '..';
import { ReportingCore } from '../../..';
Expand All @@ -26,7 +26,7 @@ type JobManagementResponseHandler = (doc: ReportApiJSON) => Promise<IKibanaRespo
export const jobManagementPreRouting = async (
reporting: ReportingCore,
res: typeof kibanaResponseFactory,
docId: string,
jobId: JobId,
user: ReportingUser,
counters: Counters,
cb: JobManagementResponseHandler
Expand All @@ -38,7 +38,7 @@ export const jobManagementPreRouting = async (

const jobsQuery = jobsQueryFactory(reporting);

const doc = await jobsQuery.get(user, docId);
const doc = await jobsQuery.get(user, jobId);
if (!doc) {
return res.notFound();
}
Expand Down
Loading

0 comments on commit 34026be

Please sign in to comment.