Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
fix(ProcessBootstrapper): only ack if it's a owned shard and use repl…
Browse files Browse the repository at this point in the history
…icaId as appId
  • Loading branch information
Hazmi35 authored Feb 24, 2024
1 parent cf7ced0 commit a2cc713
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 50 deletions.
47 changes: 0 additions & 47 deletions services/kanao-gateway/.example.env

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { migrate } from "drizzle-orm/better-sqlite3/migrator";
import type { Listener } from "../../Stores/Listener.js";
import { ListenerStore } from "../../Stores/ListenerStore.js";
import * as schema from "../../Structures/DatabaseSchema.js";
import { discordToken, storeLogs, lokiHost, amqp, clientId } from "../../config.js";
import { discordToken, storeLogs, lokiHost, amqp, clientId, replicaId } from "../../config.js";
import { createLogger } from "../Logger.js";
import { ProcessContextFetchingStrategy } from "./ProcessContextFetchingStrategy.js";

Expand Down Expand Up @@ -100,7 +100,7 @@ export class ProcessBootstrapper {
setup: async (channel: Channel) => {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${this.data.processId}`);
const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}}`);
const { queue } = await channel.assertQueue(routing.queue, { durable: false });

for (const shard of this.data.shardIds) {
Expand All @@ -127,9 +127,11 @@ export class ProcessBootstrapper {

public async onConsumeMessage(channel: Channel, message: ConsumeMessage | null): Promise<void> {
if (!message) return;
channel.ack(message);
const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; };
const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey);
if (shardId === null) return;
if (!this.shards.has(shardId)) return;
channel.ack(message);
switch (content.op) {
case ShardOp.SEND: {
const shard = this.shards.get(shardId);
Expand Down

0 comments on commit a2cc713

Please sign in to comment.