diff --git a/src/migrator/async.ts b/src/migrator/async.ts index 695eb29a7..1a31c66b3 100644 --- a/src/migrator/async.ts +++ b/src/migrator/async.ts @@ -26,6 +26,8 @@ import { filterAndSortPendingMigrations, MigrationStatus, BaseAsyncMigration, + migrationMetricsEmitter, + MigrationMetricsEventName, } from './utils'; export const run = async (tx: Tx, model: ApiRootModel): Promise => { @@ -122,7 +124,7 @@ const $run = async ( let asyncRunnerMigratorFn: (tx: Tx) => Promise; let initMigrationState: InitialMigrationStatus = { migration_key: key, - start_time: new Date(), + last_execution_time_ms: 0, last_run_time: new Date(), run_count: 0, migrated_row_count: 0, @@ -212,7 +214,7 @@ const $run = async ( ); return false; } - // sync on the last execution time between instances + // sync on the execution time between instances // precondition: All running instances are running on the same time/block // skip execution if (migrationState.last_run_time) { @@ -229,11 +231,15 @@ const $run = async ( try { // here a separate transaction is needed as this migration may fail // when it fails it would break the transaction for managing the migration status + const executionStartTimeMS = Date.now(); const migratedRows = await sbvrUtils.db.transaction( async (migrationTx) => { return (await asyncRunnerMigratorFn?.(migrationTx)) ?? 0; }, ); + migrationState.last_execution_time_ms = + Date.now() - executionStartTimeMS; + migrationState.migrated_row_count += migratedRows; if (migratedRows === 0) { // when all rows have been catched up once we only catch up less frequently @@ -250,7 +256,7 @@ const $run = async ( if (err instanceof Error) { if ( migrationState.error_count % - initMigrationState.errorThreshold === + initMigrationState.errorThreshold === 0 ) { (sbvrUtils.api.migrations?.logger.error ?? console.error)( @@ -268,6 +274,12 @@ const $run = async ( // either success or error release the lock migrationState.last_run_time = new Date(); migrationState.run_count += 1; + // just emit + migrationMetricsEmitter.emit( + MigrationMetricsEventName.async_migration_status, + migrationState, + ); + await updateMigrationStatus(tx, migrationState); } return migrationState; diff --git a/src/migrator/migrations.sbvr b/src/migrator/migrations.sbvr index 2b3c63061..606aa3f75 100644 --- a/src/migrator/migrations.sbvr +++ b/src/migrator/migrations.sbvr @@ -24,8 +24,8 @@ Fact Type: migration lock has model name Term: migration key Concept Type: Short Text (Type) -Term: start time - Concept Type: Date Time (Type) +Term: last execution time ms + Concept Type: Integer (Type) Term: last run time Concept Type: Date Time (Type) Term: run count @@ -45,8 +45,8 @@ Term: migration status Fact Type: migration status has migration key Necessity: each migration status has exactly one migration key -Fact Type: migration status has start time - Necessity: each migration status has at most one start time +Fact Type: migration status has last execution time ms + Necessity: each migration status has at most one last execution time ms Fact Type: migration status has last run time Necessity: each migration status has at most one last run time diff --git a/src/migrator/sync.ts b/src/migrator/sync.ts index ec2b830b0..483c91ab7 100644 --- a/src/migrator/sync.ts +++ b/src/migrator/sync.ts @@ -8,6 +8,8 @@ import { RunnableMigrations, filterAndSortPendingMigrations, getRunnableSyncMigrations, + migrationMetricsEmitter, + MigrationMetricsEventName, } from './utils'; import type { Tx } from '../database-layer/db'; import type { Config, Model } from '../config-loader/config-loader'; @@ -122,6 +124,8 @@ const executeMigration = async ( `Running migration ${JSON.stringify(key)}`, ); + const migrationStartTimeMs = Date.now(); + if (typeof migration === 'function') { await migration(tx, sbvrUtils); } else if (typeof migration === 'string') { @@ -129,6 +133,15 @@ const executeMigration = async ( } else { throw new MigrationError(`Invalid migration type: ${typeof migration}`); } + + // follow the same interface for migration_key and last_execution_time_ms as migration status + migrationMetricsEmitter.emit( + MigrationMetricsEventName.sync_migration_status, + { + migration_key: key, + last_execution_time_ms: Date.now() - migrationStartTimeMs, + }, + ); }; export const config: Config = { @@ -146,6 +159,10 @@ export const config: Config = { ALTER TABLE "migration lock" ADD COLUMN IF NOT EXISTS "modified at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL; `, + '14.55.0-last-execution-time': ` + ALTER TABLE "migration status" + ADD COLUMN IF NOT EXISTS "last execution time ms" INTEGER NULL; + `, }, }, ], diff --git a/src/migrator/utils.ts b/src/migrator/utils.ts index db210e2dd..202badb89 100644 --- a/src/migrator/utils.ts +++ b/src/migrator/utils.ts @@ -7,6 +7,14 @@ import { TypedError } from 'typed-error'; import { migrator as migratorEnv } from '../config-loader/env'; export { migrator as migratorEnv } from '../config-loader/env'; import { delay } from '../sbvr-api/control-flow'; +import { EventEmitter } from 'eventemitter3'; + +export const migrationMetricsEmitter = new EventEmitter(); + +export enum MigrationMetricsEventName { + 'sync_migration_status' = 'sync_migration_status', + 'async_migration_status' = 'async_migration_status', +} // tslint:disable-next-line:no-var-requires export const modelText = require('./migrations.sbvr'); @@ -87,7 +95,7 @@ export class MigrationError extends TypedError {} export type MigrationStatus = { migration_key: string; - start_time: Date; + last_execution_time_ms: number; last_run_time: Date | null; run_count: number; migrated_row_count: number; @@ -283,13 +291,13 @@ export const initMigrationStatus = async ( try { return await tx.executeSql( binds` -INSERT INTO "migration status" ("migration key", "start time", "is backing off", "run count") +INSERT INTO "migration status" ("migration key", "last execution time ms", "is backing off", "run count") SELECT ${1}, ${2}, ${3}, ${4} WHERE NOT EXISTS (SELECT 1 FROM "migration status" WHERE "migration key" = ${5}) `, [ migrationStatus['migration_key'], - migrationStatus['start_time'], + migrationStatus['last_execution_time_ms'], migrationStatus['is_backing_off'] ? 1 : 0, migrationStatus['run_count'], migrationStatus['migration_key'], @@ -316,8 +324,9 @@ SET "migrated row count" = ${3}, "error count" = ${4}, "converged time" = ${5}, -"is backing off" = ${6} -WHERE "migration status"."migration key" = ${7};`, +"is backing off" = ${6}, +"last execution time ms" = ${7} +WHERE "migration status"."migration key" = ${8};`, [ migrationStatus['run_count'], migrationStatus['last_run_time'], @@ -325,6 +334,7 @@ WHERE "migration status"."migration key" = ${7};`, migrationStatus['error_count'], migrationStatus['converged_time'], migrationStatus['is_backing_off'] ? 1 : 0, + migrationStatus['last_execution_time_ms'], migrationStatus['migration_key'], ], ); @@ -355,7 +365,7 @@ LIMIT 1;`, return { migration_key: data['migration key'], - start_time: data['start time'], + last_execution_time_ms: data['last execution time ms'], last_run_time: data['last run time'], run_count: data['run count'], migrated_row_count: data['migrated row count'], diff --git a/src/server-glue/module.ts b/src/server-glue/module.ts index 35286c0cb..bf1768841 100644 --- a/src/server-glue/module.ts +++ b/src/server-glue/module.ts @@ -18,7 +18,7 @@ export * as env from '../config-loader/env'; export * as types from '../sbvr-api/common-types'; export * as hooks from '../sbvr-api/hooks'; export type { configLoader as ConfigLoader }; -export type { migratorUtils as Migrator }; +export { migratorUtils as Migrator }; let envDatabaseOptions: dbModule.DatabaseOptions; if (dbModule.engines.websql != null) { diff --git a/test/03-async-migrator.test.ts b/test/03-async-migrator.test.ts index 4e8fa91ed..8124c69cc 100644 --- a/test/03-async-migrator.test.ts +++ b/test/03-async-migrator.test.ts @@ -202,6 +202,29 @@ describe('03 Async Migrations', async function () { expect(result[0]?.migrated_row_count - firstRowsMigrated).to.equal(1); expect(Date.now().valueOf() - startTime).to.be.greaterThan(4000); // backOff time from migrator }); + + it.only('should record last execution time ms of the last migration run', async function () { + let result: MigrationStatus[] = []; + + const startTime = Date.now().valueOf(); + // Wait until all migrations have run at least once + while (result.length === 0 || !result.every((row) => row.run_count > 0)) { + result = await getMigrationStatus(); + } + + const checkMigration0003 = result.find( + (row) => row.migration_key === '0003', + ); + expect(checkMigration0003?.last_execution_time_ms).to.be.greaterThan(500); // should be reported greater than 500 ms (from the async migration function) + expect(checkMigration0003?.last_execution_time_ms).to.be.lessThan(2 * 500); // should not exceed twice the last execution time ms defined in the async migration function + + /** + * Here it may need to be checked if the event for the async_migration_status metric was emitted. + * As the pine instance is running in a childprocess the EventEmitter is decoupled from the test instance. + * The EventEmitter exported from the migration utilities cannot be used for registering a listener, as it would + * instantiate a separate EventEmitter in this process, which is not linked to the child-process in which pine actually runs. + */ + }); }); describe('Init sync and async migrations for new model', function () { diff --git a/test/fixtures/03-async-migrator/01-migrations/migrations/0003-test-migration-execution.async.ts b/test/fixtures/03-async-migrator/01-migrations/migrations/0003-test-migration-execution.async.ts new file mode 100644 index 000000000..a0baffc76 --- /dev/null +++ b/test/fixtures/03-async-migrator/01-migrations/migrations/0003-test-migration-execution.async.ts @@ -0,0 +1,18 @@ +import { AsyncMigration } from '../../../../../src/migrator/utils'; + +const migration: AsyncMigration = { + asyncFn: async (tx, options) => { + options; + return await tx.executeSql(`SELECT pg_sleep(0.5);`); + }, + asyncBatchSize: 1, + syncFn: async (tx) => { + await tx.executeSql(`SELECT pg_sleep(0.5);`); + }, + delayMS: 1000, + backoffDelayMS: 4000, + errorThreshold: 15, + finalize: false, +}; + +export default migration;