Skip to content

Commit

Permalink
feat(replication): adds read replica functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
goldcaddy77 committed Feb 24, 2021
1 parent d7fb0f2 commit f4e22fb
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 161 deletions.
4 changes: 3 additions & 1 deletion examples/12-replica-db-connection/.env
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ WARTHOG_DB_HOST=localhost
WARTHOG_DB_PORT=5432
WARTHOG_DB_REPLICA_HOST=localhost
WARTHOG_DB_REPLICA_PORT=5432
WARTHOG_DB_CONNECT_REPLICA=true
WARTHOG_DB_REPLICA_DATABASE=warthog-example-12
WARTHOG_DB_REPLICA_USERNAME=postgres
WARTHOG_DB_REPLICA_PASSWORD=
1 change: 1 addition & 0 deletions examples/12-replica-db-connection/generated/binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as schema from './schema.graphql'

export interface Query {
users: <T = Array<User>>(args: { offset?: Int | null, limit?: Int | null, where?: UserWhereInput | null, orderBy?: UserOrderByInput | null }, info?: GraphQLResolveInfo | string, options?: Options) => Promise<T> ,
usersFromMaster: <T = Array<User>>(args?: {}, info?: GraphQLResolveInfo | string, options?: Options) => Promise<T> ,
user: <T = User>(args: { where: UserWhereUniqueInput }, info?: GraphQLResolveInfo | string, options?: Options) => Promise<T>
}

Expand Down
1 change: 1 addition & 0 deletions examples/12-replica-db-connection/generated/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type PageInfo {

type Query {
users(offset: Int, limit: Int = 50, where: UserWhereInput, orderBy: UserOrderByInput): [User!]!
usersFromMaster: [User!]!
user(where: UserWhereUniqueInput!): User!
}

Expand Down
14 changes: 12 additions & 2 deletions examples/12-replica-db-connection/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'reflect-metadata';
import { AdvancedConsoleLogger, Logger, QueryRunner } from 'typeorm';

import { BaseContext, Server } from '../../../src';

Expand All @@ -10,6 +11,16 @@ interface Context extends BaseContext {
};
}

export class CustomLogger extends AdvancedConsoleLogger implements Logger {
logQuery(query: string, parameters?: any[], queryRunner?: QueryRunner) {
if (!queryRunner) {
return console.log(query);
}

console.log(`[${(queryRunner as any).mode}] ${query}`);
}
}

export function getServer(AppOptions = {}, dbOptions = {}) {
return new Server<Context>(
{
Expand All @@ -23,9 +34,8 @@ export function getServer(AppOptions = {}, dbOptions = {}) {
}
};
},
connectDBReplica: true,
...AppOptions
},
dbOptions
{ ...dbOptions, logger: new CustomLogger() }
);
}
6 changes: 6 additions & 0 deletions examples/12-replica-db-connection/src/user.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export class UserResolver {
return this.userService.find<UserWhereInput>(where, orderBy, limit, offset);
}

// This query will use the "master" server, even though it would typically use "slave" by default for doing a find
@Query(() => [User])
async usersFromMaster(): Promise<User[]> {
return this.userService.findFromMaster();
}

@Query(() => User)
async user(@Arg('where') where: UserWhereUniqueInput): Promise<User> {
return this.userService.findOne<UserWhereUniqueInput>(where);
Expand Down
13 changes: 6 additions & 7 deletions examples/12-replica-db-connection/src/user.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Service } from 'typedi';
import { DeepPartial, Repository } from 'typeorm';
import { Repository } from 'typeorm';
import { InjectRepository } from 'typeorm-typedi-extensions';

import { BaseService } from '../../../src';
Expand All @@ -12,11 +12,10 @@ export class UserService extends BaseService<User> {
super(User, repository);
}

async create(data: DeepPartial<User>, userId: string): Promise<User> {
const newUser = await super.create(data, userId);

// Perform some side effects

return newUser;
// This query will use the "master" server, even though it would typically use "slave" by default for doing a find
async findFromMaster(): Promise<User[]> {
return super.find(undefined, undefined, undefined, undefined, undefined, {
replicationMode: 'master'
});
}
}
44 changes: 36 additions & 8 deletions src/core/BaseService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
DeepPartial,
EntityManager,
getRepository,
QueryBuilder,
ReplicationMode,
Repository,
SelectQueryBuilder
} from 'typeorm';
Expand Down Expand Up @@ -34,6 +36,10 @@ export interface BaseOptions {
manager?: EntityManager; // Allows consumers to pass in a TransactionManager
}

export interface AdvancedFindOptions {
replicationMode?: ReplicationMode;
}

interface WhereFilterAttributes {
[key: string]: string | number | null;
}
Expand Down Expand Up @@ -114,12 +120,13 @@ export class BaseService<E extends BaseModel> {
orderBy?: string,
limit?: number,
offset?: number,
fields?: string[]
fields?: string[],
options: AdvancedFindOptions = {}
): Promise<E[]> {
// TODO: FEATURE - make the default limit configurable
limit = limit ?? 20;
debugStatement('find:buildQuery');
const qb = this.buildFindQuery<W>(where, orderBy, { limit, offset }, fields);
const qb = this.buildFindQuery<W>(where, orderBy, { limit, offset }, fields, options);
try {
debugStatement('find:gettingMany');
const records = await qb.getMany();
Expand All @@ -129,6 +136,8 @@ export class BaseService<E extends BaseModel> {
debugStatement('find:error');
logger.error('failed on getMany', e);
throw e;
} finally {
this.cleanUpQueryBuilder(qb);
}
}

Expand All @@ -137,7 +146,8 @@ export class BaseService<E extends BaseModel> {
whereUserInput: any = {}, // V3: WhereExpression = {},
orderBy?: string | string[],
_pageOptions: RelayPageOptionsInput = {},
fields?: ConnectionInputFields
fields?: ConnectionInputFields,
options: AdvancedFindOptions = {}
): Promise<ConnectionResult<E>> {
// TODO: if the orderby items aren't included in `fields`, should we automatically include?

Expand Down Expand Up @@ -176,7 +186,8 @@ export class BaseService<E extends BaseModel> {
whereCombined,
this.relayService.effectiveOrderStrings(sorts, relayPageOptions),
{ limit: limit + 1 }, // We ask for 1 too many so that we know if there is an additional page
requestedFields.selectFields
requestedFields.selectFields,
options
);

let rawData;
Expand All @@ -189,6 +200,8 @@ export class BaseService<E extends BaseModel> {
rawData = await qb.getMany();
}

this.cleanUpQueryBuilder(qb);

// If we got the n+1 that we requested, pluck the last item off
const returnData = rawData.length > limit ? rawData.slice(0, limit) : rawData;

Expand All @@ -209,13 +222,19 @@ export class BaseService<E extends BaseModel> {
where: WhereExpression = {},
orderBy?: string | string[],
pageOptions?: LimitOffset,
fields?: string[]
fields?: string[],
options: AdvancedFindOptions = {}
): SelectQueryBuilder<E> {
try {
const DEFAULT_LIMIT = 50;
let qb = this.manager.connection
.createQueryBuilder<E>(this.entityClass, this.klass)
.setQueryRunner(this.manager.connection.createQueryRunner('slave'));
let qb = this.manager.connection.createQueryBuilder<E>(this.entityClass, this.klass);
if (options.replicationMode) {
const queryRunner = this.manager.connection.createQueryRunner(options.replicationMode);
qb.setQueryRunner(queryRunner);
(qb as any).warthogQueryRunnerOverride = queryRunner;
}

//
if (!pageOptions) {
pageOptions = {
limit: DEFAULT_LIMIT
Expand Down Expand Up @@ -476,6 +495,15 @@ export class BaseService<E extends BaseModel> {
return { id: found.id };
}

// This is really ugly. Shouldn't be attaching to the querybuilder, but need to keep track of whether this
// instance of the queryBuilder was created with a custom query runner so that it can be cleaned up
cleanUpQueryBuilder(qb: QueryBuilder<E>) {
// console.log(qb);
if ((qb as any).warthogQueryRunnerOverride) {
(qb as any).warthogQueryRunnerOverride.release();
}
}

attrsToDBColumns = (attrs: string[]): string[] => {
return attrs.map(this.attrToDBColumn);
};
Expand Down
10 changes: 8 additions & 2 deletions src/core/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ describe('Server', () => {
const customExpressApp: express.Application = express();
const appListenSpy = jest.spyOn(customExpressApp, 'listen');
server = buildServer(
{ expressApp: customExpressApp, connectDBReplica: true },
{ WARTHOG_DB_CONNECT_REPLICA: 'true' }
{ expressApp: customExpressApp },
{
WARTHOG_DB_REPLICA_HOST: 'localhost',
WARTHOG_DB_REPLICA_DATABASE: 'warthog-test',
WARTHOG_DB_REPLICA_PORT: '5432',
WARTHOG_DB_REPLICA_USERNAME: 'postgres',
WARTHOG_DB_REPLICA_PASSWORD: ''
}
);
await server.start();
const binding = await server.getBinding();
Expand Down
20 changes: 4 additions & 16 deletions src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { Connection, ConnectionOptions, useContainer as TypeORMUseContainer } fr
import { logger, Logger } from '../core/logger';
import { getRemoteBinding } from '../gql';
import { DataLoaderMiddleware, healthCheckMiddleware } from '../middleware';
import { createDBConnection, createReplicatedDBConnection } from '../torm';
import { createDBConnection, WarthogDBConnectionOptions } from '../torm';

import { CodeGenerator } from './code-generator';
import { Config } from './config';
Expand Down Expand Up @@ -47,7 +47,6 @@ export interface ServerOptions<T> {
bodyParserConfig?: OptionsJson;
onBeforeGraphQLMiddleware?: (app: express.Application) => void;
onAfterGraphQLMiddleware?: (app: express.Application) => void;
connectDBReplica?: boolean;
}

export class Server<C extends BaseContext> {
Expand All @@ -66,8 +65,7 @@ export class Server<C extends BaseContext> {

constructor(
private appOptions: ServerOptions<C>,
private dbOptions: Partial<ConnectionOptions> = {},
private dbReplicaOptions: Partial<ConnectionOptions> = {}
private dbOptions: Partial<WarthogDBConnectionOptions> = {}
) {
if (typeof this.appOptions.host !== 'undefined') {
process.env.WARTHOG_APP_HOST = this.appOptions.host;
Expand All @@ -93,11 +91,6 @@ export class Server<C extends BaseContext> {
? 'true'
: 'false';
}
if (typeof this.appOptions.connectDBReplica !== 'undefined') {
process.env.WARTHOG_DB_CONNECT_REPLICA = this.appOptions.connectDBReplica ? 'true' : 'false';
} else {
process.env.WARTHOG_DB_CONNECT_REPLICA = 'false';
}

// Ensure that Warthog, TypeORM and TypeGraphQL are all using the same typedi container
this.container = this.appOptions.container || Container;
Expand Down Expand Up @@ -132,13 +125,8 @@ export class Server<C extends BaseContext> {
async establishDBConnection(): Promise<Connection> {
if (!this.connection) {
debug('establishDBConnection:start');
if (this.config.get('WARTHOG_DB_CONNECT_REPLICA')) {
this.connection = await createReplicatedDBConnection(this.dbOptions);
this.allConnections = [this.connection];
} else {
this.connection = await createDBConnection(this.dbOptions);
this.allConnections = [this.connection];
}
this.connection = await createDBConnection(this.dbOptions);
this.allConnections = [this.connection];
debug('establishDBConnection:end');
}

Expand Down
Loading

0 comments on commit f4e22fb

Please sign in to comment.