Skip to content

Commit

Permalink
Merge pull request #143 from sgrigorev/master
Browse files Browse the repository at this point in the history
fix: change default host in getConnectionStringStandalone()

Delete unused appId parameter from CreateServiceOptions.

Enhance documentation: add more samples and JSDocs.
  • Loading branch information
alexkvak authored Feb 13, 2023
2 parents a844e3f + cfc8cb9 commit 0bf7462
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 29 deletions.
91 changes: 67 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,45 @@

The MB client creates an abstraction over the inter-service interaction on top of RabbitMQ. The library defines a common interface for messages and provides ways to send and subscribe to them. The client supports automatic re-connections to RabbitMQ and support for the Rabbit cluster.

The mechanism is quite simple and currently supports 2 simple operation modes (sending directly to the queue, sending to topic exchange)
The mechanism is quite simple and currently supports 2 simple operation modes (sending directly to the queue, sending to topic exchange).

When a client created, a durable topic exchange ("dispatcher" by default) is automatically created, and a service queue (with the name that was passed as serviceName during initialization).

When sending a message indicating the recipients, the message sent to their queue directly. Otherwise, the message sent via routingKey "{serviceName}. {Action}" to the dispatcher exchange.
When sending a message indicating the recipients, the message sent to their queue directly. Otherwise, the message sent via routingKey "{serviceName}.{Action}" to the dispatcher exchange.

# Examples
Check [FAQ](#FAQ) if you have questions.

## Create client

```javascript
import { createClient } from 'mbclient';

const logger = {
info: console.log,
warn: console.log,
error: console.error,
}

const client = createClient({
serviceName: 'news',
logger,
connectOptions: {
username: 'test',
password: '123',
host: 'localhost',
amqps: true,
frameMax: 8192,
},
});
```

See [AMQPOptions](https://github.com/Tinkoff/mbclient/blob/master/src/adapters/amqp-node.ts#L3) interface to get all available options.

## Subscribing

### Subscribe to broadcast messages by message type
### Subscribe to messages by message type

```javascript
const logger = {
info: console.log,
warn: console.log,
error: console.error,
}
const client = createClient({
serviceName: 'news',
logger,
connectOptions: {
username: 'test',
password: '123',
host: 'localhost',
port: 5672,
amqps: true,
frameMax: 8192,
},
});

// listening to messages
client.consumeByAction('logAction', ({ message, ack, nack }) => {
// do something
Expand All @@ -43,9 +51,21 @@ When sending a message indicating the recipients, the message sent to their queu
// do something
ack();
});
```
```

### Subscribe to all messages from service's queue

The handler will be called if there is no handler for specified action type.

```javascript
client.consume(({ message, ack, nack }) => {
// do something
ack();
});
```

## Publishing

### Publishing a message indicating recipients

```javascript
Expand All @@ -60,7 +80,7 @@ When sending a message indicating the recipients, the message sent to their queu
### Publication without specifying recipients (broadcast)
```javascript
client.send({
action: 'comeAction', // Everyone who consumed on comeAction will receive this message
action: 'someAction', // Everyone who consumed on someAction will receive this message
payload: 'some payload',
requestId: 'id',
});
Expand Down Expand Up @@ -89,6 +109,29 @@ Supported Events:
`disconnecting` - Close the connection (usually emitted when calling close for a graceful disconnect)

`disconnected` - Loss of connection with amqp due to an error or as a result of processing close ()

## FAQ

### How to send a message to an exchange other than the default one?

Create one more client instance with required exchange name in connectOptions.

### How to send a message with specific routing key?

```javascript
client.send({
action: 'someAction',
payload: 'some payload',
routingKey: 'my.routingKey'
});
```

### My action handler receive unexpected messages. How is this possible?

It may happen because of action names collision. For example, service A sends messages with action `entityCreated`
to service B directly (with specifying `recipients: ['serviceB']`). Later a service C was added that sends messages
with the same action, but uses broadcast sending. In this case service B will also receive messages from service C.

## License

```
Expand Down
44 changes: 43 additions & 1 deletion src/adapters/amqp-node.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,58 @@
import { AMQPClient, AMQPError } from '@cloudamqp/amqp-client';

export interface AMQPOptions {
/**
* Username for authentication
*/
username: string;
/**
* Password
*/
password: string;
/**
* AMQP host to connect to
* @default localhost:5672
*/
host?: string;
/**
* List of hosts if AMQP cluster is used
*/
cluster?: string[];
/**
* Enable secure connection
* @default false
*/
amqps?: boolean;
/**
* Virtual Host
* @default /
*/
vhost?: string;
/**
* Heartbeat timeout value in seconds (an integer) to negotiate with the server
* @default 30
*/
heartbeat?: number;
/**
* The size in bytes of the maximum frame allowed over the connection
* @default 4096
*/
frameMax?: number;
/**
* Max reconnect attempts
* @default Infinity
*/
maxReconnects?: number;
retryStrategy?: (times: number) => number;
/**
* Function that returns milliseconds number to delay before reconnect attempt.
* @default Default strategy implements exponential backoff algorithm.
* @param attempt attempt number
*/
retryStrategy?: (attempt: number) => number;
/**
* AMQP exchange name to bind to. It will be created if it doesn't exist.
* @default dispatcher
*/
exchange?: string;
}

Expand Down
36 changes: 33 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import connect from './connection';
import { AMQPOptions } from './adapters/amqp-node';
import { Logger } from './logger';
import { MessageHandlerOptions, MessageHandler } from './message';
import { MessageHandler } from './message';

export interface CreateServiceOptions {
appId?: string;
serviceName: string;
logger: Logger;
connectOptions: AMQPOptions;
Expand Down Expand Up @@ -41,6 +40,12 @@ export function createClient(options: CreateServiceOptions): Client {
const { service, connection } = connect(options);

return {
/**
* Send a message.
* When sending a message indicating the recipients, the message sent to their queue directly.
* Otherwise, the message sent via routingKey "{serviceName}.{Action}" to the dispatcher exchange.
* @param clientSendMessageOptions
*/
send: (clientSendMessageOptions: ClientSendMessage): Promise<void> =>
connection.then(() => {
const { payload, action, requestId, recipients = [], correlationId, routingKey, isOriginalContent = false } = clientSendMessageOptions;
Expand All @@ -60,17 +65,42 @@ export function createClient(options: CreateServiceOptions): Client {
return service.postMessage(recipients, payload, sendMessageOptions);
}),

/**
* Subscribe to messages from service queue.
* The handler will be called if there is no handler for specified action type.
* @param callback
*/
consume: (callback: MessageHandler): Promise<void> =>
connection.then(() => service.subscribe(callback)),

/**
* Subscribe to messages from service queue and handle only specific actions.
* @param actionType
* @param callback
*/
consumeByAction: (
actionType: string,
callback: (options: MessageHandlerOptions) => Promise<void>
callback: MessageHandler
): Promise<void> => connection.then(() => service.subscribeOn(actionType, callback)),

/**
* Unsubscribe to queue
*/
cancel: (): Promise<void> => connection.then(() => service.unsubscribe()),

/**
* Close connection
*/
close: (): Promise<void> => service.close(),

/**
* Subscribe to connection events
*/
on: service.on.bind(service),

/**
* Subscribe to connection events
*/
once: service.on.bind(service)
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export class ServiceConnection extends EventEmitter {
* Extract connection string from options using 'host' parameter
*/
getConnectionStringStandalone(): string {
const { username, password, amqps = false, host = '', vhost = '', heartbeat = DEFAULT_HEART_BEAT, frameMax = DEFAULT_FRAME_MAX } = this.options;
const { username, password, amqps = false, host = 'localhost:5672', vhost = '', heartbeat = DEFAULT_HEART_BEAT, frameMax = DEFAULT_FRAME_MAX } = this.options;
const protocol = amqps ? 'amqps' : 'amqp';
const connectionString = `${protocol}://${username}:${password}@${host}/${vhost}?frameMax=${frameMax}&heartbeat=${heartbeat}`;

Expand Down

0 comments on commit 0bf7462

Please sign in to comment.