Skip to content

Commit

Permalink
feature: multi node migration support (#25)
Browse files Browse the repository at this point in the history
* Basic implementation of multi node migration support.
    Warning: This has not been tested in the wild yet!
  • Loading branch information
yss14 authored Dec 8, 2019
1 parent 291453d commit ee88e2c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 30 deletions.
47 changes: 47 additions & 0 deletions src/__test__/database-schema.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,50 @@ describe('migrate to version', () => {
.rejects.toThrowError('Target version of migrateToVersion() has to be greater 1')
})
})

describe('multi-node environment', () => {
const simulateNode = async (nodeOprations: (...args: any[]) => Promise<void>, nodeName: string) => {
await nodeOprations(nodeName)
}

test('test', async () => {
const { database } = await setupTest()

const migration2 = jest.fn()
const migration3 = jest.fn()
const migration4 = jest.fn()
const migration5 = jest.fn()

const migrations = new Map<number, IMigration>()
migrations.set(5, Migration(migration5))
migrations.set(2, Migration(migration2))
migrations.set(3, Migration(migration3))
migrations.set(4, Migration(migration4))

const operations = async (nodeName: string) => {
const databaseSchema = DatabaseSchema({
name: 'TestSchema',
client: database,
createStatements: composeCreateTableStatements(TestTables),
migrations,
})

await databaseSchema.init()
await databaseSchema.migrateLatest()

expect(databaseSchema.getVersion()).toBe(5)
}

await Promise.all([
simulateNode(operations, 'Node1'),
simulateNode(operations, 'Node2'),
simulateNode(operations, 'Node3'),
simulateNode(operations, 'Node4'),
])

expect(migration2).toHaveBeenCalledTimes(1)
expect(migration3).toHaveBeenCalledTimes(1)
expect(migration4).toHaveBeenCalledTimes(1)
expect(migration5).toHaveBeenCalledTimes(1)
})
})
100 changes: 71 additions & 29 deletions src/database-schema.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import { IDatabaseClient, IDatabaseBaseClient } from './database-client'
import { TableSchema, ColumnType, NativeFunction, Table } from './table';
import { SQL } from './sql';

const schema_management = TableSchema({
name: { type: ColumnType.Varchar, primaryKey: true, nullable: false },
name: { type: ColumnType.Varchar, primaryKey: true, nullable: false, unique: true },
version: { type: ColumnType.Integer, nullable: false },
date_added: { type: ColumnType.TimestampTZ, nullable: false, defaultValue: { func: NativeFunction.Now } },
locked: { type: ColumnType.Boolean, nullable: false, defaultValue: false },
})
const SchemaManagementTable = Table({ schema_management }, 'schema_management')

const selectVersionQuery = (name: string) => SchemaManagementTable.select('*', ['name'])([name])
const insertSchemaQuery = (name: string, version: number) => SchemaManagementTable.insertFromObj({ name, version })
const updateSchemaVersionQuery = (name: string, newVersion: number) => SchemaManagementTable.update(['version'], ['name'])([newVersion], [name])

export interface IDatabaseSchema {
readonly name: string;
getVersion(): number;
init(): Promise<void>;
migrateLatest(): Promise<void>;
migrateToVersion(version: number): Promise<void>;
}
export type IDatabaseSchema = ReturnType<typeof DatabaseSchema>

export interface IMigration {
up: (client: IDatabaseBaseClient) => Promise<void>;
Expand All @@ -36,7 +32,7 @@ export interface IDatabaseSchemaArgs {
logMigrations?: boolean;
}

export const DatabaseSchema = ({ client, createStatements, name, migrations, logMigrations }: IDatabaseSchemaArgs): IDatabaseSchema => {
export const DatabaseSchema = ({ client, createStatements, name, migrations, logMigrations }: IDatabaseSchemaArgs) => {
let version = 0
let isInitialized = false

Expand All @@ -45,22 +41,28 @@ export const DatabaseSchema = ({ client, createStatements, name, migrations, log
throw new Error(`Database schema ${name} has already been initialized.`)
}

await client.transaction(async (transaction) => {
await transaction.query(SchemaManagementTable.create())
try {
await client.transaction(async (transaction) => {
await transaction.query(SchemaManagementTable.create())

const versionDBResults = await transaction.query(selectVersionQuery(name))
const versionDBResults = await transaction.query(selectVersionQuery(name))

if (versionDBResults.length === 0) {
await transaction.query({
sql: createStatements.join('\n'),
})
await transaction.query(insertSchemaQuery(name, 1))
if (versionDBResults.length === 0) {
await transaction.query({
sql: createStatements.join('\n'),
})
await transaction.query(insertSchemaQuery(name, 1))

version = 1
} else {
version = versionDBResults[0].version
version = 1
} else {
version = versionDBResults[0].version
}
})
} catch (err) {
if (err.message.indexOf('duplicate key value violates unique constraint') === -1) {
throw err
}
})
}

isInitialized = true
}
Expand All @@ -69,33 +71,73 @@ export const DatabaseSchema = ({ client, createStatements, name, migrations, log
throw new Error(`Migration failed, database schema is not initialized. Please call init() first on your database schema.`)
}

const lockSchemaTableQuery = SQL.raw(`
LOCK TABLE ${SchemaManagementTable.name} IN ACCESS EXCLUSIVE MODE;
`, [])
const getSchemaVersionQuery = (awaitLock: boolean) => SQL.raw<typeof schema_management>(`
SELECT * FROM ${SchemaManagementTable.name}
WHERE name = $1 ${!awaitLock ? 'FOR UPDATE NOWAIT' : ''};
`, [name])
const setSchemaLockQuery = (locked: boolean) => SQL.raw(`
UPDATE ${SchemaManagementTable.name} SET locked = $1 WHERE name=$2;
`, [locked, name])

/*
Locks schema_management table for given transaction and retrievs current schema version
If table is already locked, the postgres client is advised to await execution until lock is released
This ensures, that in a multi-node environment all starting nodes proceed code execution after all migrations are done
*/
const getCurrentVersionAndLockSchema = async (client: IDatabaseBaseClient, awaitLock: boolean) => {
await client.query(lockSchemaTableQuery)
const dbResults = await client.query(getSchemaVersionQuery(awaitLock))

if (dbResults.length === 1 && dbResults[0].locked === false) {
await client.query(setSchemaLockQuery(true))

return dbResults[0].version
}

return null
}

const migrateToVersion = async (targetVersion: number) => {
if (!isInitialized) throwNotInitialized()

if (targetVersion <= 1) {
throw new Error('Target version of migrateToVersion() has to be greater 1')
}

const currentVersion = version

for (let newVersion = currentVersion + 1; newVersion <= targetVersion; newVersion -= -1) {
for (let newVersion = version; newVersion <= targetVersion; newVersion++) {
await client.transaction(async (transaction) => {
const currentVersion = await getCurrentVersionAndLockSchema(transaction, true)

if (currentVersion === null || currentVersion >= newVersion) {
if (currentVersion) {
await transaction.query(setSchemaLockQuery(false))
}

return
}

const migration = migrations.get(newVersion)

if (!migration) {
await transaction.query(setSchemaLockQuery(false))

throw new Error(`Migration with version ${newVersion} not found. Aborting migration process...`)
}

await migration.up(transaction)
await transaction.query(updateSchemaVersionQuery(name, newVersion))
await transaction.query(setSchemaLockQuery(false))

// istanbul ignore next
if (!(logMigrations === false)) {
console.info(`Successfully migrated ${name} from version ${version} to ${newVersion}`)
}
})

version = newVersion

// istanbul ignore next
if (!(logMigrations === false)) {
console.info(`Successfully migrated ${name} from version ${version - 1} to ${version}`)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sql.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Columns, Column, IReferenceConstraintInternal, isCollection, isSQLFunction, ForeignKeyUpdateDeleteRule, ICreateIndexStatement, IQuery, isJSONType, IWhereConditionColumned, ISQLArg } from "./table";
import * as pgEscape from 'pg-escape';
import { dateToSQLUTCFormat } from "./sql-utils";
import moment from 'moment'
import * as moment from 'moment'
import { flatten } from './utils'

const isStringArray = (arr: any): arr is string[] => Array.isArray(arr) && arr.every(item => typeof item === 'string')
Expand Down

0 comments on commit ee88e2c

Please sign in to comment.