diff --git a/src/cp/index.ts b/src/cp/index.ts index 98c5f26..b667ac6 100644 --- a/src/cp/index.ts +++ b/src/cp/index.ts @@ -54,22 +54,23 @@ export default class ChargePoint { private readonly csUrl: string ) { } - async connect(): Promise { + async connect(): Promise>> { const url = `${this.csUrl}/${this.id}`; const socket = new WebSocket(url, SUPPORTED_PROTOCOLS); - this.connection = new Connection( + const connection = new Connection( socket, this.requestHandler, centralSystemActions, chargePointActions, ); + this.connection = connection; // this.socket.on('close', () => (this.socket = undefined)); socket.on('error', console.error); - socket.on('message', (data) => this.connection?.handleWebsocketData(data)); + socket.on('message', (data) => connection?.handleWebsocketData(data)); return new Promise((resolve) => { - socket?.on('open', () => resolve()); + socket?.on('open', () => resolve(connection)); }); } diff --git a/src/cs/index.ts b/src/cs/index.ts index 2e98774..06901b6 100644 --- a/src/cs/index.ts +++ b/src/cs/index.ts @@ -88,7 +88,13 @@ type RequiredPick = Omit & Required> */ export default class CentralSystem { private cpHandler: RequestHandler; - private connections: Record> = {}; + /** each chargepoint has a list of connections because there is an + * issue with some chargers that do not fully close the previous websocket connection + * after creating a new websocket connection, with this we can still keep track of all + * current opened sockets, and only remove the connections whose sockets have closed. + * + * (for more info: see the test "if two sockets open before the first one closing, should still remain the latest socket") */ + private connections: Record>> = {}; private listeners: ConnectionListener[] = []; private websocketsServer: WebSocket.Server; private soapServer: soap.Server; @@ -137,7 +143,8 @@ export default class CentralSystem { switch (args.ocppVersion) { case 'v1.6-json': { - const connection = this.connections[args.chargePointId]; + // get the first available connection of this chargepoint + const [connection] = this.connections[args.chargePointId] ?? []; if (!connection) return Left(new OCPPRequestError('there is no connection to this charge point')); return connection @@ -187,9 +194,6 @@ export default class CentralSystem { const normalizeHeaders = (headers: Record) => Object.entries(headers).reduce>((acc, [key, val]) => (acc[key.toLowerCase()] = val, acc), {}); - server.addSoapHeader((action: any, args: any, headers: Record) => ({ - chargeBoxIdentity: normalizeHeaders(headers).chargeboxidentity - }), '', 'ocpp', 'urn://Ocpp/Cs/2012/06/'); server.addSoapHeader((action: any, args: any, headers: any) => ({ 'Action': '/' + action + 'Response', 'MessageID': uuid.v4(), @@ -256,14 +260,19 @@ export default class CentralSystem { centralSystemActions, this.options.rejectInvalidRequests, ); - this.connections[chargePointId] = connection; + + if (!this.connections[chargePointId]) { + this.connections[chargePointId] = []; + } + this.connections[chargePointId].push(connection); socket.on('message', (data) => { this.options.onRawWebsocketData?.(data, metadata); connection.handleWebsocketData(data) }); socket.on('close', () => { - delete this.connections[chargePointId]; + const closedIndex = this.connections[chargePointId].findIndex(c => c === connection); + this.connections[chargePointId].splice(closedIndex, 1); clearInterval(pingInterval); this.listeners.forEach((f) => f(chargePointId, 'disconnected')); }); diff --git a/src/integration.spec.ts b/src/integration.spec.ts index 8fb9f88..82c3901 100644 --- a/src/integration.spec.ts +++ b/src/integration.spec.ts @@ -106,4 +106,72 @@ describe('test cs<->cp communication', () => { afterAll(() => cs.close()); }); + + it('if two sockets open before the first one closing, should still remain the latest socket', async () => { + const PORT = 8082; + const cs = new CentralSystem(PORT, (_req, _cpId) => { + throw new Error('cs'); + }); + + const cp = new ChargePoint( + '123', + req => { + switch (req.action) { + case 'GetConfiguration': return { + action: req.action, + ocppVersion: req.ocppVersion, + configurationKey: [] + } + default: throw new Error('unsupported') + } + }, + `ws://localhost:${PORT}` + ); + + let triggerConnected = (_cpId: string) => { }; + let triggerDisconnected = (_cpId: string) => { }; + cs.addConnectionListener((cpId, status) => { + if (status === 'connected') triggerConnected(cpId); + if (status === 'disconnected') triggerDisconnected(cpId); + }); + const waitForConnection = (cpId: string) => + new Promise((resolve) => { + triggerConnected = (connectedId) => { + if (connectedId == cpId) resolve(cpId); + }; + }); + + const waitForDisconnection = (cpId: string) => + new Promise((resolve) => { + triggerDisconnected = (connectedId) => { + if (connectedId == cpId) resolve(cpId); + }; + }); + + // connecting once + let waitCentralSystem = waitForConnection(cp.id); + const firstConnection = await cp.connect(); + await waitCentralSystem; + + // connecting twice + waitCentralSystem = waitForConnection(cp.id); + const secondConnection = await cp.connect(); + await waitCentralSystem; + + waitCentralSystem = waitForDisconnection(cp.id); + firstConnection.close(); + await waitCentralSystem; + + // should send request to the second connection + (await cs.sendRequest({ + action: 'GetConfiguration', + ocppVersion: 'v1.6-json', + chargePointId: cp.id, + payload: {} + })).unsafeCoerce(); + + // cleanup + cp.close(); + cs.close() + }) }); diff --git a/src/ws/connection.ts b/src/ws/connection.ts index f6d690d..e7d827a 100644 --- a/src/ws/connection.ts +++ b/src/ws/connection.ts @@ -11,7 +11,7 @@ import { EitherAsync, Left, Right, Just, Nothing, MaybeAsync } from 'purify-ts'; export default class Connection> { private messageTriggers: Record void> = {}; constructor( - private readonly socket: WebSocket, + public readonly socket: WebSocket, private readonly requestHandler: RequestHandler, private readonly requestedActions: ReqAction[], private readonly respondedActions: ActionName<'v1.6-json'>[],