Skip to content

Commit

Permalink
feat: metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sanoel committed Sep 20, 2024
1 parent dee2d1d commit ec649d9
Show file tree
Hide file tree
Showing 6 changed files with 1,113 additions and 872 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@oada/jobs",
"version": "4.5.2",
"version": "4.6.0",
"description": "A library for oada job based microservices",
"source": "src/index.ts",
"main": "dist/index.js",
Expand Down Expand Up @@ -59,6 +59,7 @@
"packageManager": "[email protected]",
"dependencies": {
"@ava/typescript": "^4.1.0",
"@oada/lib-prom": "^3.8.0",
"@oada/list-lib": "^4.3.0",
"@oada/oadaify": "^2.1.0",
"@oada/types": "^3.5.3",
Expand Down
11 changes: 8 additions & 3 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ export class Queue {

info(`[QueueId ${this._id}] Started WATCH.`);

// Clean up the resource and grab all existing jobs to run them before starting watch
trace(`[QueueId ${this._id}] Adding existing jobs`);
const jobs = stripResource(r.data as Record<string, unknown>);
this._service.metrics[`${this._service.name}_total_queued`].set(Object.keys(jobs).length);

if (skipQueue) {
info('Skipping existing jobs in the queue prior to startup.');
} else {
// Clean up the resource and grab all existing jobs to run them before starting watch
trace(`[QueueId ${this._id}] Adding existing jobs`);
const jobs = stripResource(r.data as Record<string, unknown>);
// AssertJobs(jobs);
await this.#doJobs(jobs as OADAJobs);
trace(
Expand Down Expand Up @@ -198,11 +200,14 @@ export class Queue {
async #doJobs(jobs: OADAJobs | OADAJobsChange): Promise<void> {
// Queue up the Runners in parallel
for (const [jobKey, value] of Object.entries(jobs)) {
this._service.metrics[`${this._service.name}_total_queued`].inc();
void this.#queue.add(async () => {
const { _id } = value as Link;
if (!_id) return;
// Fetch the job
const { job, isJob } = await Job.fromOada(this.#oada, _id);
const mtype = job.type.replaceAll('-', '_').replaceAll(' ', '_');
this._service.metrics[`${this._service.name}_queued_${mtype}`].inc();

// Instantiate a runner to manage the job
const runner = new Runner(this._service, jobKey, job, this.#oada);
Expand Down
47 changes: 37 additions & 10 deletions src/Runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export class Runner {
* appropriate.
*/
public async run(): Promise<void> {
const mtype = this.#job.type.replaceAll('-', '_').replaceAll(' ', '_');
this.#service.metrics[`${this.#service.name}_running_${mtype}`].inc();
this.#service.metrics[`${this.#service.name}_total_running`].inc();

// A quick check to ensure job isn't already completed
if (this.#job.status === 'success' || this.#job.status === 'failure') {
debug(`[Runner ${this.#job.oadaId}] Job already complete.`);
Expand Down Expand Up @@ -191,16 +195,7 @@ export class Runner {

// Link into success/failure event log
const date = moment(time).format('YYYY-MM-DD');
// Const finalpath = `/bookmarks/services/${this.service.name}/jobs/${status}/day-index/${date}`;
let finalpath: string | undefined;
if (status === 'failure') {
finalpath = failType
? `/bookmarks/services/${this.#service.name}/jobs/${status}/${failType}/day-index/${date}`
: `/bookmarks/services/${this.#service.name}/jobs/${status}/unknown/day-index/${date}`;
} else if (status === 'success') {
finalpath = `/bookmarks/services/${this.#service.name}/jobs/${status}/day-index/${date}`;
}

let finalpath = `/bookmarks/services/${this.#service.name}/jobs/${status}/day-index/${date}`;
info(
`[job ${this.#job.oadaId} ]: linking job to final resting place at ${finalpath}`,
);
Expand All @@ -215,12 +210,44 @@ export class Runner {
tree,
});

// If there is a failType, also link to the typed failure log
if (status === 'failure' && failType) {
let typedFailPath = `/bookmarks/services/${this.#service.name}/jobs/typed-${status}/${failType}/day-index/${date}`;
info(
`[job ${this.#job.oadaId} ]: linking job to final resting place at ${typedFailPath}`,
);
await this.#oada.put({
path: typedFailPath!,
data: {
[this.#jobKey]: {
_id: this.#job.oadaId,
_rev: 0,
},
},
tree,
});
}

// Remove from job queue
trace(`[job ${this.#job.oadaId} ]: removing from jobs queue`);
await this.#oada.delete({
path: `/bookmarks/services/${this.#service.name}/jobs/pending/${this.#jobKey}`,
});

trace(
`[job ${this.#job.oadaId} ]: marking job as ${status}`,
failType ?? 'unknown',
);
const mtype = this.#job.type.replaceAll('-', '_').replaceAll(' ', '_');
this.#service.metrics[`${this.#service.name}_total_queued`].dec();
this.#service.metrics[`${this.#service.name}_queued_${mtype}`].dec();

this.#service.metrics[`${this.#service.name}_total_running`].dec();
this.#service.metrics[`${this.#service.name}_running_${mtype}`].dec();

this.#service.metrics[`${this.#service.name}_total_${status}`].inc();
this.#service.metrics[`${this.#service.name}_${status}_${mtype}`].inc();

// Notify the status reporter if there is one
try {
const frs = this.#service.opts?.finishReporters;
Expand Down
86 changes: 86 additions & 0 deletions src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import type { Config } from '@oada/client';
import { OADAClient } from '@oada/client';
import { assert as assertQueue } from '@oada/types/oada/service/queue.js';
import { Gauge } from '@oada/lib-prom';

import { Report, type ReportConstructor } from './Report.js';
import { debug, error, warn } from './utils.js';
import type { Job } from './Job.js';
import type { Json } from './index.js';
import type { Logger } from './Logger.js';
import moment from 'moment';
import { Queue } from './Queue.js';

export type Domain = string;
Expand Down Expand Up @@ -63,6 +65,10 @@ export interface ConstructorArguments {
opts?: ServiceOptions;
}

export interface ServiceMetrics {
[key: string]: any;
}

export const defaultServiceQueueName = 'default-service-queue';

/**
Expand All @@ -81,6 +87,7 @@ export class Service {
public domain: string;
public token: string;
public opts: ServiceOptions | undefined;
public metrics: ServiceMetrics;

readonly #oada: OADAClient;
// Readonly #clients = new Map<Domain, OADAClient>();
Expand Down Expand Up @@ -122,6 +129,29 @@ export class Service {
this.domain = this.#oada.getDomain();
this.token = this.#oada.getToken()[0]!;
this.concurrency = object.concurrency ?? this.#oada.getConcurrency();
this.metrics = {
[`${this.name}_total_failure`]: new Gauge({
name: `${this.name}_total_failure`,
help: `Number of ${this.name} jobs that failed`,
labelNames: ['service', this.name],
}),
[`${this.name}_total_queued`]: new Gauge({
name: `${this.name}_total_queued`,
help: `Number of ${this.name} jobs that were queued`,
labelNames: ['service', 'type'],
}),
[`${this.name}_total_success`]: new Gauge({
name: `${this.name}_total_success`,
help: `Number of ${this.name} jobs that succeeded`,
labelNames: ['service', this.name],
}),
[`${this.name}_total_running`]: new Gauge({
name: `${this.name}_total_running`,
help: `Number of ${this.name} jobs that are running`,
labelNames: ['service', this.name],
}),

}
if (object.opts) {
this.opts = object.opts;
}
Expand Down Expand Up @@ -155,6 +185,7 @@ export class Service {
}

public async start(): Promise<void> {
await this.#initTotalMetrics();
await this.#doQueue();
for (const r of this.#reports.values()) {
r.start();
Expand Down Expand Up @@ -184,6 +215,8 @@ export class Service {
* @param worker Worker function
*/
public on(type: string, timeout: number, work: WorkerFunction): void {
this.#ensureMetrics(type);
this.#initTypedMetrics(type);
this.#workers.set(type, { work, timeout });
}

Expand Down Expand Up @@ -259,4 +292,57 @@ export class Service {
debug('Unable to stop queue %0', error_);
}
}

/**
* Create the metrics
*/
#ensureMetrics(type: string): void {
const statuses = ['queued', 'running', 'success', 'failure'];
for (const status of statuses) {
let mtype = type.replaceAll('-', '_').replaceAll(' ', '_');
const name = `${this.name}_${status}_${mtype}`;
if (!this.metrics[name]) {
this.metrics[name] = new Gauge({
name: name,
help: `Number of ${this.name} jobs of type "${type}" that are of status "${status}"`,
labelNames: ['service', mtype, status],
});
}
}
}

async #initTotalMetrics(): Promise<void> {
const date = moment().format('YYYY-MM-DD');
for await (const status of ['success', 'failure']) {
try {
let { data } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}`
})
let keys = Object.keys(data as Record<string, any>).filter(key => !key.startsWith('_'));
this.metrics[`${this.name}_total_${status}`].set(keys.length);
} catch(err) {
this.metrics[`${this.name}_total_${status}`].set(0);
}
}
}

async #initTypedMetrics(type: string): Promise<void> {
let mtype = type.replaceAll('-', '_').replaceAll(' ', '_');
const date = moment().format('YYYY-MM-DD');
for await (const status of ['success', 'failure']) {
try {
this.metrics[`${this.name}_${status}_${mtype}`].set(0);
let { data } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}`
})
for await (const job of Object.keys(data as Record<string, any>)) {
let { data: j } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}/${job}`
}) as unknown as { data: { j: string, [k: string]: any } };
if (j.type === type) this.metrics[`${this.name}_${status}_${mtype}`].inc();
}
} catch(err) {
}
}
}
}
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"forceConsistentCasingInFileNames": true,
"rootDir": "src",
"outDir": "dist",
"lib": ["ES2015"]
"lib": ["ES2021"]
},
"include": ["src"]
}
Loading

0 comments on commit ec649d9

Please sign in to comment.