Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
feat: add amqp reconnect workaround (#454)
Browse files Browse the repository at this point in the history
* feat: add amqp reconnect workaround

* fix: resolve build error
  • Loading branch information
KagChi authored Jun 18, 2024
1 parent ccc2006 commit b395697
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 7 deletions.
1 change: 0 additions & 1 deletion .nvmrc

This file was deleted.

9 changes: 8 additions & 1 deletion packages/core/src/Structures/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ export class Client extends EventEmitter {
public connect(): void {
this.amqp = createAmqpChannel(this.options.amqpUrl, {
setup: async (channel: Channel) => this.setupAmqp(channel)
});
}, { connectionOptions: { timeout: 360_000 } });

this.rest.setToken(this.options.token!);
}

public async setupAmqp(channel: Channel): Promise<void> {
channel.on("close", () => {
if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) {
// @ts-expect-error Reconnect workaround
this.amqp._onConnect({ connection: this.amqp._connectionManager.connection });
}
});

await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

const routing = new RoutedQueue(GatewayExchangeRoutes.DISPATCH, this.clientId, this.options.instanceName);
Expand Down
27 changes: 24 additions & 3 deletions services/kanao-cache/src/Structures/KanaoCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ import { clientId, storeLogs, lokiHost, databaseUrl, amqp, databaseConnectionLim
export class KanaoCache extends EventEmitter {
public cacheQueue = createAmqpChannel(amqp, {
setup: async (channel: Channel) => this.setupCacheQueue(channel)
});
}, { connectionOptions: { timeout: 360_000 } });

public rpcQueue = createAmqpChannel(amqp, {
setup: async (channel: Channel) => this.setupRpc(channel)
});
}, { connectionOptions: { timeout: 360_000 } });

public queryRpcQueue = createAmqpChannel(amqp, {
setup: async (channel: Channel) => this.setupQueryRpc(channel)
});
}, { connectionOptions: { timeout: 360_000 } });

public logger = createLogger("kanao-cache", clientId, storeLogs, lokiHost);

Expand Down Expand Up @@ -57,6 +57,13 @@ export class KanaoCache extends EventEmitter {
}

public async setupCacheQueue(channel: Channel): Promise<void> {
channel.on("close", () => {
if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) {
// @ts-expect-error Reconnect workaround
this.cacheQueue._onConnect({ connection: this.cacheQueue._connectionManager.connection });
}
});

await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

// Used for receiving receive events from the gateway
Expand All @@ -78,6 +85,13 @@ export class KanaoCache extends EventEmitter {
}

public async setupQueryRpc(channel: Channel): Promise<void> {
channel.on("close", () => {
if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) {
// @ts-expect-error Reconnect workaround
this.queryRpcQueue._onConnect({ connection: this.queryRpcQueue._connectionManager.connection });
}
});

const rpc = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, clientId, "cache-query");
await channel.assertQueue(rpc.queue, { durable: false, autoDelete: true });
await channel.bindQueue(rpc.queue, RabbitMQ.GATEWAY_EXCHANGE, rpc.key);
Expand Down Expand Up @@ -117,6 +131,13 @@ export class KanaoCache extends EventEmitter {
}

public async setupRpc(channel: Channel): Promise<void> {
channel.on("close", () => {
if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) {
// @ts-expect-error Reconnect workaround
this.rpcQueue._onConnect({ connection: this.rpcQueue._connectionManager.connection });
}
});

// Used for Counts RPC
const rpc = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.counts`, clientId, "cache-rpc");
await channel.assertQueue(rpc.queue, { durable: false });
Expand Down
9 changes: 8 additions & 1 deletion services/kanao-gateway/src/Structures/KanaoGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ export class NezuGateway extends EventEmitter {
public setupAmqp(): void {
const amqpChannel = createAmqpChannel(amqp, {
setup: async (channel: Channel) => {
channel.on("close", () => {
if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) {
// @ts-expect-error Reconnect workaround
amqpChannel._onConnect({ connection: amqpChannel._connectionManager.connection });
}
});

await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

// Used for Stats RPC
Expand Down Expand Up @@ -163,7 +170,7 @@ export class NezuGateway extends EventEmitter {
}
});
}
});
}, { connectionOptions: { timeout: 360_000 } });

amqpChannel.on("error", err => this.logger.error(err, "AMQP Channel on main process Error"));
amqpChannel.on("close", () => this.logger.warn("AMQP Channel on main process Closed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ export class ProcessBootstrapper {

const amqpChannel = createAmqpChannel(amqp, {
setup: async (channel: Channel) => {
channel.on("close", () => {
if ("sendMessage" in channel && "sendOrEnqueue" in channel && channel.sendMessage === channel.sendOrEnqueue) {
// @ts-expect-error Reconnect workaround
amqpChannel._onConnect({ connection: amqpChannel._connectionManager.connection });
}
});

await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });
await channel.assertQueue(routing.queue, { durable: false, autoDelete: true });
await channel.consume(routing.queue, async m => this.onConsumeMessage(channel, m));
Expand All @@ -109,7 +116,7 @@ export class ProcessBootstrapper {
this.data.shardIds.map(async shardId => channel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key))
);
}
});
}, { connectionOptions: { timeout: 360_000 } });

amqpChannel.on("error", err => this.logger.error(err, `AMQP Channel on process ${this.data.processId} Error`));
amqpChannel.on("close", () => this.logger.warn(`AMQP Channel on process ${this.data.processId} Closed`));
Expand Down

0 comments on commit b395697

Please sign in to comment.