Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
feat(KanaoGateway): handle latency in stats RPC
Browse files Browse the repository at this point in the history
Co-authored-by: KagChi <[email protected]>
  • Loading branch information
Hazmi35 and KagChi authored Feb 24, 2024
1 parent 4fc424b commit 1eb4277
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
12 changes: 7 additions & 5 deletions services/kanao-gateway/src/Structures/KanaoGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import EventEmitter from "node:events";
import { join } from "node:path";
import process from "node:process";
import { fileURLToPath } from "node:url";
import type { Collection } from "@discordjs/collection";
import { REST } from "@discordjs/rest";
import { CompressionMethod, WebSocketManager, WebSocketShardEvents } from "@discordjs/ws";
import type { SessionInfo, ShardRange } from "@discordjs/ws";
import { GatewayExchangeRoutes, RabbitMQ } from "@nezuchan/constants";
import { RoutedQueue, Util, createAmqpChannel } from "@nezuchan/utilities";
import type { Awaitable } from "@sapphire/utilities";
import type { Channel } from "amqplib";
import Database from "better-sqlite3";
import { eq, sql } from "drizzle-orm";
import { drizzle } from "drizzle-orm/better-sqlite3";
import { migrate } from "drizzle-orm/better-sqlite3/migrator";
import APM from "prometheus-middleware";
import { createLogger } from "../Utilities/Logger.js";
import type { Status } from "../Utilities/WebSockets/ProcessShardingStrategy.js";
import { ProcessShardingStrategy } from "../Utilities/WebSockets/ProcessShardingStrategy.js";
import { amqp, clientId, discordToken, enablePrometheus, gatewayCompression, gatewayGuildPerShard, gatewayHandShakeTimeout, gatewayHelloTimeout, gatewayIntents, gatewayLargeThreshold, gatewayPresenceName, gatewayPresenceStatus, gatewayPresenceType, gatewayReadyTimeout, gatewayResume, gatewayShardCount, gatewayShardsPerWorkers, getShardCount, lokiHost, prometheusPath, prometheusPort, proxy, replicaId, storeLogs } from "../config.js";
import * as schema from "./DatabaseSchema.js";
Expand Down Expand Up @@ -137,11 +140,10 @@ export class NezuGateway extends EventEmitter {
if (message) {
const content = JSON.parse(message.content.toString()) as { replyTo: string; };
const stats = [];
for (const [shardId, status] of await this.ws.fetchStatus()) {
const stat = await this.drizzle.query.status.findFirst({
where: () => eq(schema.status.shardId, shardId)
});
stats.push({ shardId, status, latency: stat?.latency ?? -1 });

// @ts-expect-error Override
for (const [shardId, status] of await this.ws.fetchStatus() as Awaitable<Collection<number, Status>>) {
stats.push({ shardId, ...status });
}
channel.ack(message);
await amqpChannel.sendToQueue(content.replyTo, Buffer.from(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Result } from "@sapphire/result";
import type { Channel, ConsumeMessage } from "amqplib";
import Database from "better-sqlite3";
import type { GatewaySendPayload } from "discord-api-types/v10";
import { eq } from "drizzle-orm";
import { drizzle } from "drizzle-orm/better-sqlite3/driver";
import { migrate } from "drizzle-orm/better-sqlite3/migrator";
import type { Listener } from "../../Stores/Listener.js";
Expand Down Expand Up @@ -249,10 +250,15 @@ export class ProcessBootstrapper {
throw new Error(`Shard ${payload.shardId} does not exist`);
}

const status = await this.drizzle.query.status.findFirst({
where: eq(schema.status.shardId, payload.shardId)
});

const response = {
op: WorkerReceivePayloadOp.FetchStatusResponse,
status: shard.status,
nonce: payload.nonce
nonce: payload.nonce,
latency: status?.latency ?? -1
};

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { createLogger } from "../Logger.js";

const __dirname = url.fileURLToPath(new URL(".", import.meta.url));

export type Status = { status: WebSocketShardStatus; latency: number; };

/**
* Strategy used to spawn threads in child_process
*/
Expand All @@ -37,7 +39,7 @@ export class ProcessShardingStrategy implements IShardingStrategy {

private readonly destroyPromises = new Collection<number, () => void>();

private readonly fetchStatusPromises = new Collection<number, (status: WebSocketShardStatus) => void>();
private readonly fetchStatusPromises = new Collection<number, (status: Status) => void>();

private readonly waitForIdentifyControllers = new Collection<number, AbortController>();

Expand Down Expand Up @@ -142,7 +144,7 @@ export class ProcessShardingStrategy implements IShardingStrategy {
* {@inheritDoc IShardingStrategy.fetchStatus}
*/
public async fetchStatus(): Promise<Collection<number, WebSocketShardStatus>> {
const statuses = new Collection<number, WebSocketShardStatus>();
const statuses = new Collection<number, Status>();

for (const [shardId, worker] of this.#workerByShardId.entries()) {
const nonce = Math.random();
Expand All @@ -152,7 +154,7 @@ export class ProcessShardingStrategy implements IShardingStrategy {
nonce
} satisfies WorkerSendPayload;

const promise = new Promise<WebSocketShardStatus>(resolve => this.fetchStatusPromises.set(nonce, resolve));
const promise = new Promise<Status>(resolve => this.fetchStatusPromises.set(nonce, resolve));
try {
if (worker.connected) worker.send(payload);
} catch {
Expand All @@ -163,7 +165,8 @@ export class ProcessShardingStrategy implements IShardingStrategy {
statuses.set(shardId, status);
}

return statuses;
// @ts-expect-error - Override
return statuses as Collection<number, WebSocketShardStatus>;
}

private async setupWorker(workerData: WorkerData): Promise<void> {
Expand Down Expand Up @@ -332,7 +335,7 @@ export class ProcessShardingStrategy implements IShardingStrategy {
}

case WorkerReceivePayloadOp.FetchStatusResponse: {
this.fetchStatusPromises.get(payload.nonce)?.(payload.status);
this.fetchStatusPromises.get(payload.nonce)?.({ status: payload.status, latency: (payload as unknown as { latency: number; }).latency });
this.fetchStatusPromises.delete(payload.nonce);
break;
}
Expand Down

0 comments on commit 1eb4277

Please sign in to comment.