Skip to content

Commit

Permalink
[Reporting] Log the wait time for the stream's pending callbacks (ela…
Browse files Browse the repository at this point in the history
…stic#203156)

## Summary

This PR adds logging for the final step of report execution: waiting for
the writable stream's pending callbacks.

* **Debug**: log the beginning of the wait
* **Info**: log the end of the wait

Update: this also puts the report job ID in the context of the logger
utility used for running the task.

<img width="1983" alt="image"
src="https://github.com/user-attachments/assets/1ef68758-c2c9-4b6d-8090-50e496089140">

### Checklist

none

### Identify risks

none
  • Loading branch information
tsullivan authored Dec 9, 2024
1 parent 61dd1f5 commit cafab5e
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions x-pack/plugins/reporting/server/lib/tasks/execute_report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ export class ExecuteReportTask implements ReportingTask {

const store = await this.getStore();
const report = await store.findReportFromTask(task); // receives seq_no and primary_term
const logger = this.logger.get(report._id);

if (report.status === 'completed') {
throw new Error(`Can not claim the report job: it is already completed!`);
Expand All @@ -197,7 +198,7 @@ export class ExecuteReportTask implements ReportingTask {
err.stack = error.stack;
} else {
if (report.error && report.error instanceof Error) {
errorLogger(this.logger, 'Error executing report', report.error);
errorLogger(logger, 'Error executing report', report.error);
}
err = new QueueTimeoutError(
`Max attempts reached (${maxAttempts}). Queue timeout reached.`
Expand Down Expand Up @@ -226,7 +227,7 @@ export class ExecuteReportTask implements ReportingTask {
...doc,
});

this.logger.info(
logger.info(
`Claiming ${claimedReport.jobtype} ${report._id} ` +
`[_index: ${report._index}] ` +
`[_seq_no: ${report._seq_no}] ` +
Expand All @@ -251,14 +252,15 @@ export class ExecuteReportTask implements ReportingTask {
error?: ReportingError
): Promise<UpdateResponse<ReportDocument>> {
const message = `Failing ${report.jobtype} job ${report._id}`;
const logger = this.logger.get(report._id);

// log the error
let docOutput;
if (error) {
errorLogger(this.logger, message, error);
errorLogger(logger, message, error);
docOutput = this._formatOutput(error);
} else {
errorLogger(this.logger, message);
errorLogger(logger, message);
}

// update the report in the store
Expand Down Expand Up @@ -287,8 +289,9 @@ export class ExecuteReportTask implements ReportingTask {
): Promise<UpdateResponse<ReportDocument>> {
const message = `Saving execution error for ${report.jobtype} job ${report._id}`;
const errorParsed = parseError(failedToExecuteErr);
const logger = this.logger.get(report._id);
// log the error
errorLogger(this.logger, message, failedToExecuteErr);
errorLogger(logger, message, failedToExecuteErr);

// update the report in the store
const store = await this.getStore();
Expand Down Expand Up @@ -350,8 +353,9 @@ export class ExecuteReportTask implements ReportingTask {
output: CompletedReportOutput
): Promise<SavedReport> {
let docId = `/${report._index}/_doc/${report._id}`;
const logger = this.logger.get(report._id);

this.logger.debug(`Saving ${report.jobtype} to ${docId}.`);
logger.debug(`Saving ${report.jobtype} to ${docId}.`);

const completedTime = moment();
const docOutput = this._formatOutput(output);
Expand All @@ -365,7 +369,7 @@ export class ExecuteReportTask implements ReportingTask {

const resp = await store.setReportCompleted(report, doc);

this.logger.info(`Saved ${report.jobtype} job ${docId}`);
logger.info(`Saved ${report.jobtype} job ${docId}`);
report._seq_no = resp._seq_no!;
report._primary_term = resp._primary_term!;

Expand Down Expand Up @@ -464,11 +468,12 @@ export class ExecuteReportTask implements ReportingTask {

const { jobtype: jobType, attempts } = report;
const maxAttempts = this.getMaxAttempts();
const logger = this.logger.get(jobId);

this.logger.debug(
logger.debug(
`Starting ${jobType} report ${jobId}: attempt ${attempts} of ${maxAttempts}.`
);
this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);
logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);

const eventLog = this.reporting.getEventLogger(
new Report({ ...task, _id: task.id, _index: task.index })
Expand Down Expand Up @@ -502,7 +507,9 @@ export class ExecuteReportTask implements ReportingTask {

stream.end();

logger.debug(`Begin waiting for the stream's pending callbacks...`);
await finishedWithNoPendingCallbacks(stream);
logger.info(`The stream's pending callbacks have completed.`);

report._seq_no = stream.getSeqNo()!;
report._primary_term = stream.getPrimaryTerm()!;
Expand All @@ -513,24 +520,21 @@ export class ExecuteReportTask implements ReportingTask {
});

if (output) {
this.logger.debug(`Job output size: ${stream.bytesWritten} bytes.`);
logger.debug(`Job output size: ${stream.bytesWritten} bytes.`);
// Update the job status to "completed"
report = await this._completeJob(report, {
...output,
size: stream.bytesWritten,
});
}
// untrack the report for concurrency awareness
this.logger.debug(`Stopping ${jobId}.`);
logger.debug(`Stopping ${jobId}.`);
} catch (failedToExecuteErr) {
eventLog.logError(failedToExecuteErr);

await this._saveExecutionError(report, failedToExecuteErr).catch(
(failedToSaveError) => {
errorLogger(
this.logger,
`Error in saving execution error ${jobId}`,
failedToSaveError
);
errorLogger(logger, `Error in saving execution error ${jobId}`, failedToSaveError);
}
);

Expand All @@ -541,7 +545,7 @@ export class ExecuteReportTask implements ReportingTask {
throwRetryableError(error, new Date(Date.now() + TIME_BETWEEN_ATTEMPTS));
} finally {
this.reporting.untrackReport(jobId);
this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);
logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);
}
},

Expand All @@ -551,7 +555,7 @@ export class ExecuteReportTask implements ReportingTask {
*/
cancel: async () => {
if (jobId) {
this.logger.warn(`Cancelling job ${jobId}...`);
this.logger.get(jobId).warn(`Cancelling job ${jobId}...`);
}
cancellationToken.cancel();
},
Expand Down

0 comments on commit cafab5e

Please sign in to comment.