Skip to content

Commit

Permalink
Merge pull request #125 from sgrigorev/master
Browse files Browse the repository at this point in the history
fix reconnect logic
  • Loading branch information
alexkvak authored Dec 1, 2022
2 parents 0bbec22 + 623dbd2 commit e2a1141
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 37 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ When sending a message indicating the recipients, the message sent to their queu
### Subscribe to broadcast messages by message type

```javascript
const logger = {
info: console.log,
warn: console.log,
error: console.error,
}
const client = createClient({
serviceName: 'news',
logger,
connectOptions: {
username: 'test',
password: '123',
Expand Down
12 changes: 0 additions & 12 deletions src/connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,4 @@ Array [
]
`);
});

it('should fail if logger is empty', () => {
const optionsMock = {
connectOptions: {
username: 'stub',
password: 'test'
},
serviceName: 'test'
};

expect(() => connect(optionsMock)).toThrowError('logger is required.');
})
});
4 changes: 0 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ export enum ConnectionStatus {
const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise<AMQPConnection> } => {
const { connectOptions, serviceName } = options;

if (!options.logger) {
throw new Error('logger is required.');
}

return connectServiceQueues(getAMQPNodeAdapter(), connectOptions, serviceName, options.logger);
};

Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { MessageHandlerOptions, MessageHandler } from './message';
export interface CreateServiceOptions {
appId?: string;
serviceName: string;
logger?: Logger;
logger: Logger;
connectOptions: AMQPOptions;
}

Expand Down
47 changes: 41 additions & 6 deletions src/service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import connectService, { ServiceConnection } from './service';
import { AMQPConnection } from './adapters/amqp-node';
import { ConnectionStatus } from './connection';
import {
amqpConnectError,
amqpConnectGracefullyStopped,
ConnectionNotInitialized
} from './errors';
import { amqpConnectError, amqpConnectGracefullyStopped, ConnectionNotInitialized } from './errors';

const testAdapter = { connect: jest.fn() };
const optionsMock = {
Expand Down Expand Up @@ -171,7 +167,8 @@ describe('#getConnection', () => {
});

describe('#handleConnectionClose', () => {
it('reconnects and rebinds handlers', async () => {
it('reconnects if status is CONNECTED', async () => {
serviceConnection.status = ConnectionStatus.CONNECTED;
serviceConnection.unsubscribe = jest.fn().mockResolvedValue({});
serviceConnection.connect = jest.fn();
serviceConnection.initQueue = jest.fn();
Expand All @@ -184,6 +181,7 @@ describe('#handleConnectionClose', () => {
});

it('reconnects and rebinds handlers', async () => {
serviceConnection.status = ConnectionStatus.CONNECTED;
serviceConnection.unsubscribe = jest.fn().mockResolvedValue({});
const connection = { queueBind: jest.fn() };
serviceConnection.connect = jest.fn().mockResolvedValue(connection);
Expand All @@ -199,10 +197,47 @@ describe('#handleConnectionClose', () => {
expect(serviceConnection.initQueue).toBeCalled();
expect(connection.queueBind).toBeCalledWith('dispatcher', 'dispatcher', '*.handler1');
});

it('reconnects and rebinds handlers only once', async () => {
serviceConnection.status = ConnectionStatus.CONNECTED;
serviceConnection.unsubscribe = jest.fn().mockResolvedValue({});
const connection = { queueBind: jest.fn() };
serviceConnection.connect = jest.fn().mockResolvedValue(connection);
serviceConnection.initQueue = jest.fn();

const handlerMock = async () => undefined;
serviceConnection.setActionHandler('handler1', handlerMock);
serviceConnection.setActionHandler('handler1', handlerMock);

await Promise.all([
serviceConnection.handleConnectionClose({} as unknown as Error),
serviceConnection.handleConnectionClose({} as unknown as Error),
serviceConnection.handleConnectionClose({} as unknown as Error)
]);

expect(serviceConnection.unsubscribe).toBeCalledTimes(1);
expect(serviceConnection.connect).toBeCalledTimes(1);
expect(serviceConnection.initQueue).toBeCalledTimes(1);
expect(connection.queueBind).toBeCalledTimes(1);
});

it.each([ConnectionStatus.DISCONNECTING, ConnectionStatus.CONNECTING])('does not reconnect if status is %s', async (status) => {
serviceConnection.status = status;
serviceConnection.unsubscribe = jest.fn().mockResolvedValue({});
serviceConnection.connect = jest.fn();
serviceConnection.initQueue = jest.fn();

await serviceConnection.handleConnectionClose({} as unknown as Error);

expect(serviceConnection.unsubscribe).not.toBeCalled();
expect(serviceConnection.connect).not.toBeCalled();
expect(serviceConnection.initQueue).not.toBeCalled();
});
});

describe('#handleConnectionError', () => {
it('logs error to console', () => {
serviceConnection.status = ConnectionStatus.CONNECTED;
serviceConnection.unsubscribe = jest.fn();
serviceConnection.connect = jest.fn();
const message = { content: 'message' };
Expand Down
33 changes: 19 additions & 14 deletions src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import EventEmitter from 'events';
import timeout from './timeout';
import randomPickConnectionString from './random-pick';
import {
EmptyMessageError,
AmqpConnectGracefullyStopped,
emptyMessageError,
amqpConnectError,
AmqpConnectGracefullyStopped,
amqpConnectGracefullyStopped,
ConnectionNotInitialized,
emptyMessageError,
EmptyMessageError,
unexpectedNonStringAction
} from './errors';
import { MessageOptions, MessageHandlerOptions, MessageHandler } from './message';
import { MessageHandler, MessageHandlerOptions, MessageOptions } from './message';
import { ConnectionStatus } from './connection';
import {
AMQPAdapter,
Expand Down Expand Up @@ -211,24 +211,29 @@ export class ServiceConnection extends EventEmitter {
* Handle connection close
*/
async handleConnectionClose(error: Error): Promise<void> {
if (this.status === ConnectionStatus.DISCONNECTING || this.status === ConnectionStatus.CONNECTING) {
return;
}

this.setConnectionStatus(ConnectionStatus.DISCONNECTED);
// set the "connecting" status in order to avoid concurrent connection in case
// when the handler is called several times in the short period of time
this.status = ConnectionStatus.CONNECTING;

const { password, ...restOptions } = this.options;
this.log.error('[amqp-connection] Connection closed.', error, restOptions);

this.emit(ConnectionStatus.DISCONNECTED);

await this.unsubscribe();

if (this.status !== ConnectionStatus.DISCONNECTING) {
const connection = await this.connect();
const connection = await this.connect();

const handlers = Object.keys(this.handlers).filter(name => name !== DEFAULT_ACTION);
const handlers = Object.keys(this.handlers).filter(name => name !== DEFAULT_ACTION);

if (handlers.length > 0) {
await this.initQueue(this.name);
if (handlers.length > 0) {
await this.initQueue(this.name);

for (const handler of handlers) {
await connection.queueBind(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${handler}`);
}
for (const handler of handlers) {
await connection.queueBind(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${handler}`);
}
}
}
Expand Down

0 comments on commit e2a1141

Please sign in to comment.