From 9ea243d83572246278e077eec7f382d41ca40922 Mon Sep 17 00:00:00 2001 From: JiveOff Date: Tue, 5 Sep 2023 22:51:10 +0200 Subject: [PATCH] Emitting through Redis --- src/clients/Redis.ts | 2 +- src/realms/rest/emitter.ts | 44 +++++++++++++++++++ .../controllers/Player.controller.ts | 6 +-- .../rest/endpoints/services/Misc.service.ts | 37 ++-------------- src/realms/rest/index.ts | 2 + 5 files changed, 54 insertions(+), 37 deletions(-) diff --git a/src/clients/Redis.ts b/src/clients/Redis.ts index 3b21527..770624f 100644 --- a/src/clients/Redis.ts +++ b/src/clients/Redis.ts @@ -8,7 +8,7 @@ export default class RedisClient { private static instance: RedisClient // eslint-disable-next-line @typescript-eslint/no-empty-function - private constructor() {} + constructor() {} public async publishToPlugin(channel: string, ...args: Array) { await this.client.publish(channel, args.join(RedisClient.SEPARATOR)) diff --git a/src/realms/rest/emitter.ts b/src/realms/rest/emitter.ts index 1a133ef..c726906 100644 --- a/src/realms/rest/emitter.ts +++ b/src/realms/rest/emitter.ts @@ -1,6 +1,11 @@ import EventEmitter from "events" import { Emitter } from "./helpers/Emitter" import { Static, Type } from "@sinclair/typebox" +import { SocketStream } from "@fastify/websocket" +import { WebSocket } from "ws" +import RedisClient from "../../clients/Redis" + +export const websockets = new Map>() export const emitter = new EventEmitter() as Emitter emitter.setMaxListeners(0) @@ -52,3 +57,42 @@ export const emitterMessage = Type.Union( ) export type EmitterMessage = Static + +export const prepareRedisListeners = () => { + const redisSubscriptionClient = new RedisClient() + redisSubscriptionClient.client.subscribe("emitter") + redisSubscriptionClient.client.on("message", (channel, message) => { + if (channel === "emitter") { + const body = JSON.parse(message.toString()) as EmitterMessage + emitter.emit(body.type, body.payload) + } + }) + + for (const messageType of Object.keys(EmitterMessageTypes)) { + const type = messageType as EmitterMessageType + emitter.on(type, (payload) => { + for (const [connection, subscriptions] of websockets.entries()) { + if ( + subscriptions.has(type) && + connection.socket.readyState === WebSocket.OPEN + ) { + connection.socket.send(JSON.stringify({ type, payload })) + } else if (connection.socket.readyState !== WebSocket.OPEN) { + websockets.delete(connection) + } + } + }) + } +} + +export const emitMessage = ( + type: T, + payload: Extract extends { type: T } + ? Extract["payload"] + : never +) => { + RedisClient.getInstance().client.publish( + "emitter", + JSON.stringify({ type, payload }) + ) +} diff --git a/src/realms/rest/endpoints/controllers/Player.controller.ts b/src/realms/rest/endpoints/controllers/Player.controller.ts index 3db9315..56810c8 100644 --- a/src/realms/rest/endpoints/controllers/Player.controller.ts +++ b/src/realms/rest/endpoints/controllers/Player.controller.ts @@ -33,7 +33,7 @@ import { HasSchemaScope } from "../../helpers/decorators/HasSchemaScope" import { Permission } from "@prisma/client" import QueueService from "../services/Queue.service" import PartyService from "../services/Party.service" -import { emitter } from "../../emitter" +import { emitMessage } from "../../emitter" @Controller({ route: "/players" }) export default class PlayerController { @@ -161,7 +161,7 @@ export default class PlayerController { const fetchedPlayer = await this.playerService.disconnectPlayer( req.params.uuid ) - emitter.emit("serverPlayersChanged", null) + emitMessage("serverPlayersChanged", null) return reply.code(200).send(fetchedPlayer) } @@ -192,7 +192,7 @@ export default class PlayerController { } catch (_) { /* empty */ } - emitter.emit("serverPlayersChanged", null) + emitMessage("serverPlayersChanged", null) return reply.code(200).send(serverPlayer) } diff --git a/src/realms/rest/endpoints/services/Misc.service.ts b/src/realms/rest/endpoints/services/Misc.service.ts index cc6ada3..88e6def 100644 --- a/src/realms/rest/endpoints/services/Misc.service.ts +++ b/src/realms/rest/endpoints/services/Misc.service.ts @@ -1,40 +1,11 @@ import { Service } from "fastify-decorators" import { SocketStream } from "@fastify/websocket" -import { - EmitterMessage, - EmitterMessageType, - EmitterMessageTypes, - emitter -} from "../../emitter" +import { EmitterMessage, emitMessage, websockets } from "../../emitter" import { AnimusRestServer } from "../.." import { WebSocket } from "ws" @Service() export default class MiscService { - websockets = new Map>() - - constructor() { - this.prepareListeners() - } - - private prepareListeners() { - for (const messageType of Object.keys(EmitterMessageTypes)) { - const type = messageType as EmitterMessageType - emitter.on(type, (payload) => { - for (const [connection, subscriptions] of this.websockets.entries()) { - if ( - subscriptions.has(type) && - connection.socket.readyState === WebSocket.OPEN - ) { - connection.socket.send(JSON.stringify({ type, payload })) - } else if (connection.socket.readyState !== WebSocket.OPEN) { - this.websockets.delete(connection) - } - } - }) - } - } - handleWebSocket(connection: SocketStream) { AnimusRestServer.getInstance().getLogger().debug("New websocket connection") @@ -44,9 +15,9 @@ export default class MiscService { if (body.type === "setSubscriptions") { const subscriptions = new Set(body.payload.subscriptions) - this.websockets.set(connection, subscriptions) + websockets.set(connection, subscriptions) } else if (body.type === "hello") { - emitter.emit("hello", { ok: true }) + emitMessage("hello", { ok: true }) } else if (body.type === "ping") { connection.socket.send(JSON.stringify({ type: "pong" })) } @@ -58,7 +29,7 @@ export default class MiscService { connection.socket.on("message", handleMessage) connection.socket.on("close", () => { - this.websockets.delete(connection) + websockets.delete(connection) AnimusRestServer.getInstance() .getLogger() diff --git a/src/realms/rest/index.ts b/src/realms/rest/index.ts index 542156f..9a5363c 100644 --- a/src/realms/rest/index.ts +++ b/src/realms/rest/index.ts @@ -12,6 +12,7 @@ import FastifyWebsocket from "@fastify/websocket" import { bootstrap } from "fastify-decorators" import { TypeBoxTypeProvider } from "@fastify/type-provider-typebox" +import { prepareRedisListeners } from "./emitter" export class AnimusRestServer { private static instance: AnimusRestServer @@ -94,6 +95,7 @@ export class AnimusRestServer { async start() { await this.registerServerRoutes() + prepareRedisListeners() this.getServer() .listen({