From 1f6a57325a878b046dd3bf22e53f00ea403be1fd Mon Sep 17 00:00:00 2001 From: Nikita Klimin Date: Tue, 23 Jul 2024 11:49:24 +0300 Subject: [PATCH] feat: add s3 ops metrics (#317) * feat: add s3 ops metrics * changes * fix: add event emitter and change metric to Counter --- .dockerignore | 2 ++ .gitignore | 1 + ...-feat-add-s3-metrics_2024-07-22-11-40.json | 10 +++++++ ...-feat-add-s3-metrics_2024-07-22-11-40.json | 10 +++++++ ...-feat-add-s3-metrics_2024-07-22-11-40.json | 10 +++++++ common/config/rush/pnpm-lock.yaml | 3 +- util/util-internal-dump-cli/src/dumper.ts | 12 ++++++-- util/util-internal-dump-cli/src/prometheus.ts | 20 ++++++++++--- util/util-internal-fs/src/factory.ts | 10 ++++--- util/util-internal-fs/src/s3.ts | 12 +++++++- util/util-internal-ingest-cli/package.json | 4 ++- util/util-internal-ingest-cli/src/ingest.ts | 26 ++++++++++++++++- .../src/prometheus.ts | 29 +++++++++++++++++++ 13 files changed, 135 insertions(+), 14 deletions(-) create mode 100644 common/changes/@subsquid/util-internal-dump-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json create mode 100644 common/changes/@subsquid/util-internal-fs/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json create mode 100644 common/changes/@subsquid/util-internal-ingest-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json create mode 100644 util/util-internal-ingest-cli/src/prometheus.ts diff --git a/.dockerignore b/.dockerignore index e47a865a3..85b889157 100644 --- a/.dockerignore +++ b/.dockerignore @@ -32,3 +32,5 @@ docker-compose.yml /ops/docker-publish.sh **/.DS_Store + +*.temp \ No newline at end of file diff --git a/.gitignore b/.gitignore index 8868156ed..fef86899d 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ common/autoinstallers/*/.npmrc # IDE .idea +.vscode # Built js libs /*/*/lib diff --git a/common/changes/@subsquid/util-internal-dump-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json b/common/changes/@subsquid/util-internal-dump-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json new file mode 100644 index 000000000..73f42eaf7 --- /dev/null +++ b/common/changes/@subsquid/util-internal-dump-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/util-internal-dump-cli", + "comment": "add prometheus metrics for S3 file system handler", + "type": "patch" + } + ], + "packageName": "@subsquid/util-internal-dump-cli" +} \ No newline at end of file diff --git a/common/changes/@subsquid/util-internal-fs/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json b/common/changes/@subsquid/util-internal-fs/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json new file mode 100644 index 000000000..4f57e76bf --- /dev/null +++ b/common/changes/@subsquid/util-internal-fs/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/util-internal-fs", + "comment": "add metrics for S3 file system handler", + "type": "patch" + } + ], + "packageName": "@subsquid/util-internal-fs" +} \ No newline at end of file diff --git a/common/changes/@subsquid/util-internal-ingest-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json b/common/changes/@subsquid/util-internal-ingest-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json new file mode 100644 index 000000000..02005b7fd --- /dev/null +++ b/common/changes/@subsquid/util-internal-ingest-cli/octo-gone-feat-add-s3-metrics_2024-07-22-11-40.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/util-internal-ingest-cli", + "comment": "add prometheus metrics", + "type": "minor" + } + ], + "packageName": "@subsquid/util-internal-ingest-cli" +} \ No newline at end of file diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 31e99ca5e..6ecfae756 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -7738,12 +7738,13 @@ packages: dev: false file:projects/util-internal-ingest-cli.tgz: - resolution: {integrity: sha512-aUnjqkCZ1K036D1WsGDalurA8VO5LOChQxgqrTXNHZwIsNU92wERfGdHQ8IkfXDQ/n7TcL/MRd1fNjS6HzftWg==, tarball: file:projects/util-internal-ingest-cli.tgz} + resolution: {integrity: sha512-hF3MhECI+li0SPLDDdrnzoKUFnIcyO2gsIKEIkAkoZaqF/GJ1S2AIhqvIAR/aSU+bUJMWbzekN6pt/4MqDunlA==, tarball: file:projects/util-internal-ingest-cli.tgz} name: '@rush-temp/util-internal-ingest-cli' version: 0.0.0 dependencies: '@types/node': 18.19.31 commander: 11.1.0 + prom-client: 14.2.0 typescript: 5.3.3 dev: false diff --git a/util/util-internal-dump-cli/src/dumper.ts b/util/util-internal-dump-cli/src/dumper.ts index 5a704d4dc..fab11b50c 100644 --- a/util/util-internal-dump-cli/src/dumper.ts +++ b/util/util-internal-dump-cli/src/dumper.ts @@ -8,6 +8,7 @@ import {createFs, Fs} from '@subsquid/util-internal-fs' import {assertRange, printRange, Range, rangeEnd} from '@subsquid/util-internal-range' import {Command} from 'commander' import {PrometheusServer} from './prometheus' +import {EventEmitter} from 'events' export interface DumperOptions { @@ -93,7 +94,7 @@ export abstract class Dumper this.getFinalizedHeight(), this.rpc(), this.log().child('prometheus') ) + this.eventEmitter().on('S3FsOperation', (op: string) => server.incS3Requests(op)) + return server } private async *ingest(from?: number, prevHash?: string): AsyncIterable { diff --git a/util/util-internal-dump-cli/src/prometheus.ts b/util/util-internal-dump-cli/src/prometheus.ts index 7d12b8c64..ef19aa755 100644 --- a/util/util-internal-dump-cli/src/prometheus.ts +++ b/util/util-internal-dump-cli/src/prometheus.ts @@ -1,7 +1,7 @@ import {Logger} from '@subsquid/logger' import {RpcClient} from '@subsquid/rpc-client' import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server' -import {collectDefaultMetrics, Gauge, Registry} from 'prom-client' +import {collectDefaultMetrics, Gauge, Counter, Registry} from 'prom-client' export class PrometheusServer { @@ -9,6 +9,7 @@ export class PrometheusServer { private chainHeightGauge: Gauge private lastWrittenBlockGauge: Gauge private rpcRequestsGauge: Gauge + private s3RequestsCounter: Counter constructor( private port: number, @@ -30,13 +31,13 @@ export class PrometheusServer { } this.set(chainHeight) } - }); + }) this.lastWrittenBlockGauge = new Gauge({ name: 'sqd_dump_last_written_block', help: 'Last saved block', registers: [this.registry] - }); + }) this.rpcRequestsGauge = new Gauge({ name: 'sqd_rpc_request_count', @@ -56,7 +57,14 @@ export class PrometheusServer { kind: 'failure' }, metrics.connectionErrors) } - }); + }) + + this.s3RequestsCounter = new Counter({ + name: 'sqd_s3_request_count', + help: 'Number of s3 requests made', + labelNames: ['kind'], + registers: [this.registry], + }) collectDefaultMetrics({register: this.registry}) } @@ -65,6 +73,10 @@ export class PrometheusServer { this.lastWrittenBlockGauge.set(block) } + incS3Requests(kind: string, value?: number) { + this.s3RequestsCounter.inc({kind}, value) + } + serve(): Promise { return createPrometheusServer(this.registry, this.port) } diff --git a/util/util-internal-fs/src/factory.ts b/util/util-internal-fs/src/factory.ts index a4204d443..38a339a38 100644 --- a/util/util-internal-fs/src/factory.ts +++ b/util/util-internal-fs/src/factory.ts @@ -2,14 +2,15 @@ import {S3Client} from '@aws-sdk/client-s3' import {Fs} from './interface' import {LocalFs} from './local' import {S3Fs} from './s3' +import {EventEmitter} from 'events' -export function createFs(url: string): Fs { +export function createFs(url: string, eventEmitter?: EventEmitter): Fs { if (url.includes('://')) { let protocol = new URL(url).protocol switch(protocol) { case 's3:': - return createS3Fs(url.slice('s3://'.length)) + return createS3Fs(url.slice('s3://'.length), eventEmitter) default: throw new Error(`Unsupported protocol: ${protocol}`) } @@ -19,12 +20,13 @@ export function createFs(url: string): Fs { } -function createS3Fs(root: string): S3Fs { +function createS3Fs(root: string, eventEmitter?: EventEmitter): S3Fs { let client = new S3Client({ endpoint: process.env.AWS_S3_ENDPOINT }) return new S3Fs({ root, - client + client, + eventEmitter }) } diff --git a/util/util-internal-fs/src/s3.ts b/util/util-internal-fs/src/s3.ts index b087e08f0..6d6bb0b7e 100644 --- a/util/util-internal-fs/src/s3.ts +++ b/util/util-internal-fs/src/s3.ts @@ -10,21 +10,25 @@ import assert from 'assert' import {Readable} from 'stream' import Upath from 'upath' import {Fs} from './interface' +import {EventEmitter} from 'events' export interface S3FsOptions { root: string client: S3Client + eventEmitter?: EventEmitter } export class S3Fs implements Fs { public readonly client: S3Client private root: string + private eventEmitter?: EventEmitter constructor(options: S3FsOptions) { this.client = options.client this.root = Upath.normalizeTrim(options.root) + this.eventEmitter = options.eventEmitter splitPath(this.root) } @@ -52,7 +56,8 @@ export class S3Fs implements Fs { cd(...path: string[]): S3Fs { return new S3Fs({ client: this.client, - root: this.resolve(path) + root: this.resolve(path), + eventEmitter: this.eventEmitter }) } @@ -74,6 +79,7 @@ export class S3Fs implements Fs { ContinuationToken }) ) + this.eventEmitter?.emit('S3FsOperation', 'ListObjectsV2') // process folder names if (res.CommonPrefixes) { @@ -116,6 +122,7 @@ export class S3Fs implements Fs { Key, Body: content })) + this.eventEmitter?.emit('S3FsOperation', 'PutObject') } async delete(path: string): Promise { @@ -129,6 +136,7 @@ export class S3Fs implements Fs { ContinuationToken }) ) + this.eventEmitter?.emit('S3FsOperation', 'ListObjectsV2') if (list.Contents) { let Objects: ObjectIdentifier[] = [] @@ -144,6 +152,7 @@ export class S3Fs implements Fs { Objects } })) + this.eventEmitter?.emit('S3FsOperation', 'DeleteObjects') } if (list.IsTruncated) { @@ -160,6 +169,7 @@ export class S3Fs implements Fs { Bucket, Key })) + this.eventEmitter?.emit('S3FsOperation', 'GetObject') assert(res.Body instanceof Readable) return res.Body } diff --git a/util/util-internal-ingest-cli/package.json b/util/util-internal-ingest-cli/package.json index 33059ddbc..a65b02e07 100644 --- a/util/util-internal-ingest-cli/package.json +++ b/util/util-internal-ingest-cli/package.json @@ -22,9 +22,11 @@ "@subsquid/util-internal-archive-layout": "^0.4.0", "@subsquid/util-internal-commander": "^1.4.0", "@subsquid/util-internal-fs": "^0.1.2", + "@subsquid/util-internal-prometheus-server": "^1.3.0", "@subsquid/util-internal-http-server": "^2.0.0", "@subsquid/util-internal-range": "^0.3.0", - "commander": "^11.1.0" + "commander": "^11.1.0", + "prom-client": "^14.2.0" }, "devDependencies": { "@types/node": "^18.18.14", diff --git a/util/util-internal-ingest-cli/src/ingest.ts b/util/util-internal-ingest-cli/src/ingest.ts index 1efa70d85..c25d90558 100644 --- a/util/util-internal-ingest-cli/src/ingest.ts +++ b/util/util-internal-ingest-cli/src/ingest.ts @@ -8,6 +8,8 @@ import {HttpApp, HttpContext, HttpError, waitForInterruption} from '@subsquid/ut import {assertRange, isRange, Range} from '@subsquid/util-internal-range' import {Command} from 'commander' import {Writable} from 'stream' +import {PrometheusServer} from './prometheus' +import {EventEmitter} from 'events' export interface IngestOptions { @@ -19,6 +21,7 @@ export interface IngestOptions { endpointCapacity?: number endpointRateLimit?: number endpointMaxBatchCallSize?: number + metrics?: number } @@ -55,6 +58,7 @@ export class Ingest { program.option('--first-block ', 'Height of the first block to ingest', nat) program.option('--last-block ', 'Height of the last block to ingest', nat) program.option('--service ', 'Run as HTTP data service', nat) + program.option('--metrics ', 'Enable prometheus metrics server', nat) return program } @@ -95,10 +99,24 @@ export class Ingest { @def protected archive(): ArchiveLayout { let url = assertNotNull(this.options().rawArchive, 'archive is not specified') - let fs = createFs(url) + let fs = createFs(url, this.eventEmitter()) return new ArchiveLayout(fs) } + @def + protected eventEmitter(): EventEmitter { + return new EventEmitter() + } + + @def + protected prometheus() { + let server = new PrometheusServer( + this.options().metrics ?? 0, + ) + this.eventEmitter().on('S3FsOperation', (op: string) => server.incS3Requests(op)) + return server + } + private async ingest(range: Range, writable: Writable): Promise { for await (let blocks of this.getBlocks(range)) { await waitDrain(writable) @@ -113,6 +131,7 @@ export class Ingest { let log = this.log().child('service') let app = new HttpApp() let self = this + let prometheus = this.prometheus() app.setMaxRequestBody(1024) app.setLogger(log) @@ -138,6 +157,11 @@ export class Ingest { } }) + if (this.options().metrics != null) { + let server = await prometheus.serve() + this.log().info(`prometheus metrics are available on port ${server.port}`) + } + let server = await app.listen(port) log.info( `Data service is listening on port ${server.port}. ` + diff --git a/util/util-internal-ingest-cli/src/prometheus.ts b/util/util-internal-ingest-cli/src/prometheus.ts new file mode 100644 index 000000000..118bca38e --- /dev/null +++ b/util/util-internal-ingest-cli/src/prometheus.ts @@ -0,0 +1,29 @@ +import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server' +import {collectDefaultMetrics, Counter, Registry} from 'prom-client' + + +export class PrometheusServer { + private registry = new Registry() + private s3RequestsCounter: Counter + + constructor( + private port: number, + ) { + this.s3RequestsCounter = new Counter({ + name: 'sqd_s3_request_count', + help: 'Number of s3 requests made', + labelNames: ['kind'], + registers: [this.registry], + }) + + collectDefaultMetrics({register: this.registry}) + } + + incS3Requests(kind: string, value?: number) { + this.s3RequestsCounter.inc({kind}, value) + } + + serve(): Promise { + return createPrometheusServer(this.registry, this.port) + } +}