Skip to content

Commit

Permalink
cleanup metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
awlayton committed Sep 20, 2024
1 parent b0dc54b commit 022a943
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 165 deletions.
10 changes: 8 additions & 2 deletions .eslintrc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ parser: '@typescript-eslint/parser'

parserOptions:
ecmaVersion: 2020
project: './**/tsconfig.*'
project: ./**/tsconfig.json

overrides:
- files: '*.{c,m,}ts'
Expand All @@ -48,7 +48,7 @@ overrides:
- prettier
parserOptions:
ecmaVersion: 2020
project: './**/tsconfig.*'
project: ./**/tsconfig.json
rules:
'@typescript-eslint/naming-convention':
[
Expand Down Expand Up @@ -78,6 +78,12 @@ overrides:
leadingUnderscore: require,
},

{
selector: memberLike,
modifiers: [static, readonly],
format: [PascalCase],
},

{ selector: typeLike, format: [PascalCase] },

{ selector: parameter, modifiers: [destructured], format: null },
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ service.on(
});

return { coolThing: r.data.thing };
}
},
);

service.start().catch((e: unknown) => {
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
"serialize-error": "^11.0.3",
"tiny-json-http": "^7.5.1",
"tslib": "^2.6.2",
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.1/xlsx-0.20.1.tgz"
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.2/xlsx-0.20.2.tgz"
},
"peerDependencies": {
"@oada/client": "^4.5.5"
Expand All @@ -96,6 +96,7 @@
"@commitlint/config-conventional": "^18.6.0",
"@oada/client": "^5.0.0",
"@tsconfig/node16": "^16.1.1",
"@tsconfig/node20": "^20.1.4",
"@types/chai": "^4.3.11",
"@types/clone-deep": "^4.0.4",
"@types/convict": "^6.1.6",
Expand Down Expand Up @@ -146,6 +147,7 @@
"node": "20.11.0"
},
"resolutions": {
"xksuid": "https://github.com/g12i/xksuid.git#fix-crypto-polyfill"
"xksuid": "https://github.com/g12i/xksuid.git#fix-crypto-polyfill",
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.2/xlsx-0.20.2.tgz"
}
}
}
7 changes: 7 additions & 0 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ export interface JobUpdate {
* Holds job data
*/
export class Job {
static readonly Statuses = [
'queued',
'running',
'success',
'failure',
] as const;

/**
* Fetch a Job from an OADA resource ID
* @param oada Authenticated OADAClient to fetch Job object
Expand Down
23 changes: 13 additions & 10 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import PQueue from 'p-queue';
import moment from 'moment';

import type { Link } from '@oada/types/oada.js';
Expand All @@ -24,12 +25,10 @@ import type OADAJobsChange from '@oada/types/oada/service/jobs-change.js';
// TODO: Fix this type and get it back in here
// import { assert as assertJobs } from '@oada/types/oada/service/jobs-change.js';

import { error, info, stripResource, trace } from './utils.js';
import { Job } from './Job.js';
import PQueue from 'p-queue';
import { Runner } from './Runner.js';
import type { Service } from './Service.js';

import { error, info, stripResource, trace } from './utils.js';
import { tree } from './tree.js';

/**
Expand All @@ -39,19 +38,21 @@ export class Queue {
#watchRequestId: string | string[] = '';
readonly #oada: OADAClient;
readonly #queue: PQueue;
readonly #service;

/**
* Creates queue watcher
* @param _service The `Service` which the watch is operating under
* @param token? The token for the queue to watch, or undefined if an OADA client was passed
*/
constructor(
private readonly _service: Service,
_service: Service,
private readonly _id: string,
) {
// , token?: string) {
this.#service = _service;
this.#oada = _service.getClient(); // .clone(token ?? '');
this.#queue = new PQueue({ concurrency: this._service.concurrency });
this.#queue = new PQueue({ concurrency: this.#service.concurrency });
/*
If (typeof domainOrOada === 'string') {
this.oada = service.getClient().clone(token ?? '');
Expand All @@ -70,7 +71,7 @@ export class Queue {
* Opens the WATCH and begins processing jobs
*/
public async start(skipQueue = false): Promise<void> {
const root = `/bookmarks/services/${this._service.name}`;
const root = `/bookmarks/services/${this.#service.name}`;
const jobspath = `${root}/jobs/pending`;
const successpath = `${root}/jobs/success`;
const failurepath = `${root}/jobs/failure`;
Expand Down Expand Up @@ -199,17 +200,19 @@ export class Queue {
async #doJobs(jobs: OADAJobs | OADAJobsChange): Promise<void> {
// Queue up the Runners in parallel
for (const [jobKey, value] of Object.entries(jobs)) {
this._service.metrics[`${this._service.name}_total_queued`].inc();
void this.#queue.add(async () => {
const { _id } = value as Link;
if (!_id) return;
// Fetch the job
const { job, isJob } = await Job.fromOada(this.#oada, _id);
const mtype = job.type.replaceAll('-', '_').replaceAll(' ', '_');
this._service.metrics[`${this._service.name}_queued_${mtype}`].inc();
this.#service.metrics.inc({
service: this.#service.name,
type: job.type,
state: 'queued',
});

// Instantiate a runner to manage the job
const runner = new Runner(this._service, jobKey, job, this.#oada);
const runner = new Runner(this.#service, jobKey, job, this.#oada);

if (!isJob) {
await runner.finish('failure', {}, moment());
Expand Down
49 changes: 32 additions & 17 deletions src/Runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { tree } from './tree.js';
import { onFinish as slackOnFinish } from './finishReporters/slack/index.js';

export class JobError extends Error {
// eslint-disable-next-line @typescript-eslint/naming-convention
'JobError'?: string;
'constructor'(m: string, t?: string) {
super(m);
Expand Down Expand Up @@ -68,9 +69,11 @@ export class Runner {
* appropriate.
*/
public async run(): Promise<void> {
const mtype = this.#job.type.replaceAll('-', '_').replaceAll(' ', '_');
this.#service.metrics[`${this.#service.name}_running_${mtype}`].inc();
this.#service.metrics[`${this.#service.name}_total_running`].inc();
this.#service.metrics.inc({
service: this.#service.name,
type: this.#job.type,
state: 'running',
});

// A quick check to ensure job isn't already completed
if (this.#job.status === 'success' || this.#job.status === 'failure') {
Expand Down Expand Up @@ -125,7 +128,7 @@ export class Runner {
trace(`[job ${this.#job.oadaId}] Error: %O`, error_);

await (error_ instanceof TimeoutError
? this.finish('failure', error_, moment(), 'timeout')
? this.finish('failure', error_ as unknown as Json, moment(), 'timeout')
: this.finish('failure', error_, moment(), error_.JobError));
}
}
Expand Down Expand Up @@ -180,7 +183,8 @@ export class Runner {
}

trace(
`[job ${this.#job.oadaId} ]: putting to job resource the final {status,result} = ${data}`,
{ job: this.#job.oadaId, ...data },
'Putting to job resource the final status/result',
);
await this.#oada.put({
path: `/${this.#job.oadaId}`,
Expand All @@ -195,12 +199,12 @@ export class Runner {

// Link into success/failure event log
const date = moment(time).format('YYYY-MM-DD');
let finalpath = `/bookmarks/services/${this.#service.name}/jobs/${status}/day-index/${date}`;
const finalpath = `/bookmarks/services/${this.#service.name}/jobs/${status}/day-index/${date}`;
info(
`[job ${this.#job.oadaId} ]: linking job to final resting place at ${finalpath}`,
);
await this.#oada.put({
path: finalpath!,
path: finalpath,
data: {
[this.#jobKey]: {
_id: this.#job.oadaId,
Expand All @@ -212,12 +216,12 @@ export class Runner {

// If there is a failType, also link to the typed failure log
if (status === 'failure' && failType) {
let typedFailPath = `/bookmarks/services/${this.#service.name}/jobs/typed-${status}/${failType}/day-index/${date}`;
const typedFailPath = `/bookmarks/services/${this.#service.name}/jobs/typed-${status}/${failType}/day-index/${date}`;
info(
`[job ${this.#job.oadaId} ]: linking job to final resting place at ${typedFailPath}`,
);
await this.#oada.put({
path: typedFailPath!,
path: typedFailPath,
data: {
[this.#jobKey]: {
_id: this.#job.oadaId,
Expand All @@ -238,15 +242,26 @@ export class Runner {
`[job ${this.#job.oadaId} ]: marking job as ${status}`,
failType ?? 'unknown',
);
const mtype = this.#job.type.replaceAll('-', '_').replaceAll(' ', '_');
this.#service.metrics[`${this.#service.name}_total_queued`].dec();
this.#service.metrics[`${this.#service.name}_queued_${mtype}`].dec();

this.#service.metrics[`${this.#service.name}_total_running`].dec();
this.#service.metrics[`${this.#service.name}_running_${mtype}`].dec();
// Decrement queued job count?
this.#service.metrics.dec({
service: this.#service.name,
type: this.#job.type,
state: 'queued',
});
// Decrement running job count
this.#service.metrics.dec({
service: this.#service.name,
type: this.#job.type,
state: 'running',
});

this.#service.metrics[`${this.#service.name}_total_${status}`].inc();
this.#service.metrics[`${this.#service.name}_${status}_${mtype}`].inc();
// Increment successes or failures based on status
this.#service.metrics.inc({
service: this.#service.name,
type: this.#job.type,
state: status,
});

// Notify the status reporter if there is one
try {
Expand Down Expand Up @@ -275,7 +290,7 @@ export class Runner {
await slackOnFinish({
config: r,
service: this.#service,
finalpath: finalpath!,
finalpath,
job: finaljob,
jobId: this.#job.oadaId,
status,
Expand Down
Loading

0 comments on commit 022a943

Please sign in to comment.