Skip to content

Commit

Permalink
Merge pull request #19 from efrei-craft/dev
Browse files Browse the repository at this point in the history
Emitting through Redis
  • Loading branch information
JiveOff authored Sep 5, 2023
2 parents 6172e69 + 9ea243d commit 3994a17
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/clients/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class RedisClient {
private static instance: RedisClient

// eslint-disable-next-line @typescript-eslint/no-empty-function
private constructor() {}
constructor() {}

public async publishToPlugin(channel: string, ...args: Array<string>) {
await this.client.publish(channel, args.join(RedisClient.SEPARATOR))
Expand Down
44 changes: 44 additions & 0 deletions src/realms/rest/emitter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import EventEmitter from "events"
import { Emitter } from "./helpers/Emitter"
import { Static, Type } from "@sinclair/typebox"
import { SocketStream } from "@fastify/websocket"
import { WebSocket } from "ws"
import RedisClient from "../../clients/Redis"

export const websockets = new Map<SocketStream, Set<EmitterMessageType>>()

export const emitter = new EventEmitter() as Emitter<EmitterMessage>
emitter.setMaxListeners(0)
Expand Down Expand Up @@ -52,3 +57,42 @@ export const emitterMessage = Type.Union(
)

export type EmitterMessage = Static<typeof emitterMessage>

export const prepareRedisListeners = () => {
const redisSubscriptionClient = new RedisClient()
redisSubscriptionClient.client.subscribe("emitter")
redisSubscriptionClient.client.on("message", (channel, message) => {
if (channel === "emitter") {
const body = JSON.parse(message.toString()) as EmitterMessage
emitter.emit(body.type, body.payload)
}
})

for (const messageType of Object.keys(EmitterMessageTypes)) {
const type = messageType as EmitterMessageType
emitter.on(type, (payload) => {
for (const [connection, subscriptions] of websockets.entries()) {
if (
subscriptions.has(type) &&
connection.socket.readyState === WebSocket.OPEN
) {
connection.socket.send(JSON.stringify({ type, payload }))
} else if (connection.socket.readyState !== WebSocket.OPEN) {
websockets.delete(connection)
}
}
})
}
}

export const emitMessage = <T extends EmitterMessageType>(
type: T,
payload: Extract<EmitterMessage, { type: T }> extends { type: T }
? Extract<EmitterMessage, { type: T }>["payload"]
: never
) => {
RedisClient.getInstance().client.publish(
"emitter",
JSON.stringify({ type, payload })
)
}
6 changes: 3 additions & 3 deletions src/realms/rest/endpoints/controllers/Player.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { HasSchemaScope } from "../../helpers/decorators/HasSchemaScope"
import { Permission } from "@prisma/client"
import QueueService from "../services/Queue.service"
import PartyService from "../services/Party.service"
import { emitter } from "../../emitter"
import { emitMessage } from "../../emitter"

@Controller({ route: "/players" })
export default class PlayerController {
Expand Down Expand Up @@ -161,7 +161,7 @@ export default class PlayerController {
const fetchedPlayer = await this.playerService.disconnectPlayer(
req.params.uuid
)
emitter.emit("serverPlayersChanged", null)
emitMessage("serverPlayersChanged", null)
return reply.code(200).send(fetchedPlayer)
}

Expand Down Expand Up @@ -192,7 +192,7 @@ export default class PlayerController {
} catch (_) {
/* empty */
}
emitter.emit("serverPlayersChanged", null)
emitMessage("serverPlayersChanged", null)
return reply.code(200).send(serverPlayer)
}

Expand Down
37 changes: 4 additions & 33 deletions src/realms/rest/endpoints/services/Misc.service.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,11 @@
import { Service } from "fastify-decorators"
import { SocketStream } from "@fastify/websocket"
import {
EmitterMessage,
EmitterMessageType,
EmitterMessageTypes,
emitter
} from "../../emitter"
import { EmitterMessage, emitMessage, websockets } from "../../emitter"
import { AnimusRestServer } from "../.."
import { WebSocket } from "ws"

@Service()
export default class MiscService {
websockets = new Map<SocketStream, Set<EmitterMessageType>>()

constructor() {
this.prepareListeners()
}

private prepareListeners() {
for (const messageType of Object.keys(EmitterMessageTypes)) {
const type = messageType as EmitterMessageType
emitter.on(type, (payload) => {
for (const [connection, subscriptions] of this.websockets.entries()) {
if (
subscriptions.has(type) &&
connection.socket.readyState === WebSocket.OPEN
) {
connection.socket.send(JSON.stringify({ type, payload }))
} else if (connection.socket.readyState !== WebSocket.OPEN) {
this.websockets.delete(connection)
}
}
})
}
}

handleWebSocket(connection: SocketStream) {
AnimusRestServer.getInstance().getLogger().debug("New websocket connection")

Expand All @@ -44,9 +15,9 @@ export default class MiscService {

if (body.type === "setSubscriptions") {
const subscriptions = new Set(body.payload.subscriptions)
this.websockets.set(connection, subscriptions)
websockets.set(connection, subscriptions)
} else if (body.type === "hello") {
emitter.emit("hello", { ok: true })
emitMessage("hello", { ok: true })
} else if (body.type === "ping") {
connection.socket.send(JSON.stringify({ type: "pong" }))
}
Expand All @@ -58,7 +29,7 @@ export default class MiscService {
connection.socket.on("message", handleMessage)

connection.socket.on("close", () => {
this.websockets.delete(connection)
websockets.delete(connection)

AnimusRestServer.getInstance()
.getLogger()
Expand Down
2 changes: 2 additions & 0 deletions src/realms/rest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import FastifyWebsocket from "@fastify/websocket"

import { bootstrap } from "fastify-decorators"
import { TypeBoxTypeProvider } from "@fastify/type-provider-typebox"
import { prepareRedisListeners } from "./emitter"

export class AnimusRestServer {
private static instance: AnimusRestServer
Expand Down Expand Up @@ -94,6 +95,7 @@ export class AnimusRestServer {

async start() {
await this.registerServerRoutes()
prepareRedisListeners()

this.getServer()
.listen({
Expand Down

0 comments on commit 3994a17

Please sign in to comment.