Skip to content

Commit

Permalink
Fix multiple sockets opening without closing
Browse files Browse the repository at this point in the history
  • Loading branch information
eduhenke committed Jul 28, 2021
1 parent c910a6b commit 36880c6
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 12 deletions.
9 changes: 5 additions & 4 deletions src/cp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@ export default class ChargePoint {
private readonly csUrl: string
) { }

async connect(): Promise<void> {
async connect(): Promise<Connection<CentralSystemAction<'v1.6-json'>>> {
const url = `${this.csUrl}/${this.id}`;
const socket = new WebSocket(url, SUPPORTED_PROTOCOLS);

this.connection = new Connection(
const connection = new Connection(
socket,
this.requestHandler,
centralSystemActions,
chargePointActions,
);
this.connection = connection;
// this.socket.on('close', () => (this.socket = undefined));
socket.on('error', console.error);
socket.on('message', (data) => this.connection?.handleWebsocketData(data));
socket.on('message', (data) => connection?.handleWebsocketData(data));

return new Promise((resolve) => {
socket?.on('open', () => resolve());
socket?.on('open', () => resolve(connection));
});
}

Expand Down
23 changes: 16 additions & 7 deletions src/cs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@ type RequiredPick<T, K extends keyof T> = Omit<T, K> & Required<Pick<T, K>>
*/
export default class CentralSystem {
private cpHandler: RequestHandler<ChargePointAction, RequestMetadata>;
private connections: Record<string, Connection<ChargePointAction>> = {};
/** each chargepoint has a list of connections because there is an
* issue with some chargers that do not fully close the previous websocket connection
* after creating a new websocket connection, with this we can still keep track of all
* current opened sockets, and only remove the connections whose sockets have closed.
*
* (for more info: see the test "if two sockets open before the first one closing, should still remain the latest socket") */
private connections: Record<string, Array<Connection<ChargePointAction>>> = {};
private listeners: ConnectionListener[] = [];
private websocketsServer: WebSocket.Server;
private soapServer: soap.Server;
Expand Down Expand Up @@ -137,7 +143,8 @@ export default class CentralSystem {

switch (args.ocppVersion) {
case 'v1.6-json': {
const connection = this.connections[args.chargePointId];
// get the first available connection of this chargepoint
const [connection] = this.connections[args.chargePointId] ?? [];
if (!connection) return Left(new OCPPRequestError('there is no connection to this charge point'));

return connection
Expand Down Expand Up @@ -187,9 +194,6 @@ export default class CentralSystem {
const normalizeHeaders = (headers: Record<string, string>) =>
Object.entries(headers).reduce<Record<string, string>>((acc, [key, val]) => (acc[key.toLowerCase()] = val, acc), {});

server.addSoapHeader((action: any, args: any, headers: Record<string, string>) => ({
chargeBoxIdentity: normalizeHeaders(headers).chargeboxidentity
}), '', 'ocpp', 'urn://Ocpp/Cs/2012/06/');
server.addSoapHeader((action: any, args: any, headers: any) => ({
'Action': '/' + action + 'Response',
'MessageID': uuid.v4(),
Expand Down Expand Up @@ -256,14 +260,19 @@ export default class CentralSystem {
centralSystemActions,
this.options.rejectInvalidRequests,
);
this.connections[chargePointId] = connection;

if (!this.connections[chargePointId]) {
this.connections[chargePointId] = [];
}
this.connections[chargePointId].push(connection);

socket.on('message', (data) => {
this.options.onRawWebsocketData?.(data, metadata);
connection.handleWebsocketData(data)
});
socket.on('close', () => {
delete this.connections[chargePointId];
const closedIndex = this.connections[chargePointId].findIndex(c => c === connection);
this.connections[chargePointId].splice(closedIndex, 1);
clearInterval(pingInterval);
this.listeners.forEach((f) => f(chargePointId, 'disconnected'));
});
Expand Down
68 changes: 68 additions & 0 deletions src/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,72 @@ describe('test cs<->cp communication', () => {

afterAll(() => cs.close());
});

it('if two sockets open before the first one closing, should still remain the latest socket', async () => {
const PORT = 8082;
const cs = new CentralSystem(PORT, (_req, _cpId) => {
throw new Error('cs');
});

const cp = new ChargePoint(
'123',
req => {
switch (req.action) {
case 'GetConfiguration': return {
action: req.action,
ocppVersion: req.ocppVersion,
configurationKey: []
}
default: throw new Error('unsupported')
}
},
`ws://localhost:${PORT}`
);

let triggerConnected = (_cpId: string) => { };
let triggerDisconnected = (_cpId: string) => { };
cs.addConnectionListener((cpId, status) => {
if (status === 'connected') triggerConnected(cpId);
if (status === 'disconnected') triggerDisconnected(cpId);
});
const waitForConnection = (cpId: string) =>
new Promise((resolve) => {
triggerConnected = (connectedId) => {
if (connectedId == cpId) resolve(cpId);
};
});

const waitForDisconnection = (cpId: string) =>
new Promise((resolve) => {
triggerDisconnected = (connectedId) => {
if (connectedId == cpId) resolve(cpId);
};
});

// connecting once
let waitCentralSystem = waitForConnection(cp.id);
const firstConnection = await cp.connect();
await waitCentralSystem;

// connecting twice
waitCentralSystem = waitForConnection(cp.id);
const secondConnection = await cp.connect();
await waitCentralSystem;

waitCentralSystem = waitForDisconnection(cp.id);
firstConnection.close();
await waitCentralSystem;

// should send request to the second connection
(await cs.sendRequest({
action: 'GetConfiguration',
ocppVersion: 'v1.6-json',
chargePointId: cp.id,
payload: {}
})).unsafeCoerce();

// cleanup
cp.close();
cs.close()
})
});
2 changes: 1 addition & 1 deletion src/ws/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { EitherAsync, Left, Right, Just, Nothing, MaybeAsync } from 'purify-ts';
export default class Connection<ReqAction extends ActionName<'v1.6-json'>> {
private messageTriggers: Record<string, (m: OCPPJMessage) => void> = {};
constructor(
private readonly socket: WebSocket,
public readonly socket: WebSocket,
private readonly requestHandler: RequestHandler<ReqAction, ValidationError | undefined, 'v1.6-json'>,
private readonly requestedActions: ReqAction[],
private readonly respondedActions: ActionName<'v1.6-json'>[],
Expand Down

0 comments on commit 36880c6

Please sign in to comment.