Skip to content

Commit

Permalink
fix bugs with index file
Browse files Browse the repository at this point in the history
  • Loading branch information
skif48 committed Feb 7, 2019
1 parent 1e7908b commit 0e21e33
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async function main() {
consumer.startConsuming().subscribe({next: console.log, error: console.error});

const anotherConnection = await connectionFactory.newConnection();
const publisherFactory = new PublisherFactory(anotherConnection, {exchange: {name: 'super_exchange'}});
const publisherFactory = new PublisherFactory(anotherConnection, {exchange: {name: 'exchange_super_queue'}});
const publisher = await publisherFactory.newPublisher();
setInterval(() => publisher.publishMessage(Buffer.from('hello hello!'), 'topic'), 1000);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async function main() {
consumer.startConsuming().subscribe({next: console.log, error: console.error});

const anotherConnection = await connectionFactory.newConnection();
const publisherFactory = new PublisherFactory(anotherConnection, {exchange: {name: 'super_exchange'}});
const publisherFactory = new PublisherFactory(anotherConnection, {exchange: {name: 'exchange_super_queue'}});
const publisher = await publisherFactory.newPublisher();
setInterval(() => publisher.publishMessage(Buffer.from('hello hello!'), 'topic'), 1000);
}
Expand Down
14 changes: 9 additions & 5 deletions src/models/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ export class Consumer implements RabbitMqPeer {
await this.channel.bindQueue(dlqMetadata.queue, `exchange_dlq_${this.configs.queue.name}`, '');
}

let exchangeName = `exchange_${this.configs.queue.name}`;
let exchangeType = 'topic';
if (this.configs.exchange) {
exchangeOptions.durable = this.configs.exchange.durable || false;
exchangeOptions.durable = typeof this.configs.exchange.durable === 'undefined' ? true : this.configs.exchange.durable;
exchangeOptions.arguments = this.configs.exchange.arguments || {};
exchangeName = this.configs.exchange.name;
exchangeType = this.configs.exchange.type;
}

await this.channel.assertExchange(this.configs.exchange.name, this.configs.exchange.type, exchangeOptions);
await this.channel.assertExchange(exchangeName, exchangeType, exchangeOptions);
const queueMetadata = await this.channel.assertQueue(this.configs.queue.name, {
durable: this.configs.queue.durable || false,
durable: typeof this.configs.queue.durable === 'undefined' ? true : this.configs.queue.durable,
arguments: this.configs.queue.arguments,
deadLetterExchange: this.configs.noDeadLetterQueue ? undefined : `dlq_${this.configs.queue.name}`,
});
await this.channel.bindQueue(queueMetadata.queue, this.configs.exchange.name, this.configs.queue.topic || '');
await this.channel.bindQueue(queueMetadata.queue, exchangeName, this.configs.queue.topic || exchangeType);

await this.channel.prefetch(this.configs.prefetch || DEFAULT_PREFETCH);

Expand All @@ -64,7 +68,7 @@ export class Consumer implements RabbitMqPeer {

amqpConnection.on('error', (err) => {
if (this.configs.autoReconnect !== false)
this.reconnect().toPromise().then(() => console.log(`Successfully reconnected to ${this.connection.getUri()}`));
this.reconnect().toPromise().then(() => console.log('Successfully reconnected to server'));
this.subject.error(new RabbitMqConnectionError(err.message))
});
amqpConnection.on('close', () => this.subject.error(new RabbitMqConnectionClosedError('AMQP server closed connection')));
Expand Down
4 changes: 2 additions & 2 deletions src/models/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ export class Publisher implements RabbitMqPeer {
.then(() => subscriber.complete())
.catch((err) => {
if (err instanceof RabbitMqConnectionError)
console.error(`Error while reconnecting to RabbitMQ: ${err.code}`);
console.error(`Error while reconnecting to server: ${err.code}`);
else
console.error(`Error while reconnecting to RabbitMQ: ${err.message}`);
console.error(`Error while reconnecting to server: ${err.message}`);
});
}).pipe(
timeout(this.configs.reconnectTimeoutMillis || DEFAULT_RECONNECT_TIMEOUT_MILLIS),
Expand Down

0 comments on commit 0e21e33

Please sign in to comment.