Skip to content

Commit

Permalink
Add websocket request/response listener to central-system
Browse files Browse the repository at this point in the history
  • Loading branch information
eduhenke committed Dec 21, 2021
1 parent 0ac2ebb commit baa2292
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
3 changes: 2 additions & 1 deletion examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const cs = new CentralSystem(8080, (req, metadata) => {
}, {
onRawSocketData: data => console.log(data.toString('ascii')),
onRawSoapData: console.log,
onRawWebsocketData: (data, { chargePointId }) => console.log(chargePointId, data.toString())
onRawWebsocketData: (data, { chargePointId }) => console.log(chargePointId, data.toString()),
onWebsocketRequestResponse: (initiator, type, data, { chargePointId }) => console.log({ initiator, type, data, chargePointId }),
});

cs.addConnectionListener(console.log);
Expand Down
13 changes: 12 additions & 1 deletion src/cs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import WebSocket from 'ws';
import { IncomingMessage, createServer, Server, ServerResponse } from 'http';
import { ActionName, Request, RequestHandler, Response } from '../messages';
import { ChargePointAction, chargePointActions } from '../messages/cp';
import { Connection, SUPPORTED_PROTOCOLS } from '../ws';
import { Connection, OCPPJMessage, SUPPORTED_PROTOCOLS } from '../ws';
import { CentralSystemAction, centralSystemActions } from '../messages/cs';
import { OCPPRequestError, ValidationError } from '../errors';
import { EitherAsync, Left } from 'purify-ts';
Expand All @@ -31,6 +31,9 @@ export type RequestMetadata = {
validationError?: ValidationError;
};

export type WebsocketRequestResponseListener =
(initiator: 'chargepoint' | 'central-system', type: 'request' | 'response', data: OCPPJMessage, metadata: Omit<RequestMetadata, 'validationError'>) => void;

export type CSSendRequestArgs<T extends CentralSystemAction<V>, V extends OCPPVersion> = {
ocppVersion: 'v1.6-json',
chargePointId: string,
Expand All @@ -57,6 +60,8 @@ export type CentralSystemOptions = {
onRawSocketData?: (data: Buffer) => void
onRawSoapData?: (type: 'replied' | 'received', data: string) => void
onRawWebsocketData?: (data: WebSocket.Data, metadata: Omit<RequestMetadata, 'validationError'>) => void,

onWebsocketRequestResponse?: WebsocketRequestResponseListener,
/** in milliseconds */
websocketPingInterval?: number
}
Expand Down Expand Up @@ -259,6 +264,12 @@ export default class CentralSystem {
chargePointActions,
centralSystemActions,
this.options.rejectInvalidRequests,
{
onReceiveRequest: message => this.options.onWebsocketRequestResponse?.('chargepoint', 'request', message, metadata),
onSendResponse: message => this.options.onWebsocketRequestResponse?.('chargepoint', 'response', message, metadata),
onReceiveResponse: message => this.options.onWebsocketRequestResponse?.('central-system', 'response', message, metadata),
onSendRequest: message => this.options.onWebsocketRequestResponse?.('central-system', 'request', message, metadata),
},
);

if (!this.connections[chargePointId]) {
Expand Down
25 changes: 18 additions & 7 deletions src/ws/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export default class Connection<ReqAction extends ActionName<'v1.6-json'>> {
private readonly requestedActions: ReqAction[],
private readonly respondedActions: ActionName<'v1.6-json'>[],
private readonly rejectInvalidRequests: boolean = true,
private readonly handlers?: {
onReceiveRequest: (m: OCPPJMessage) => void,
onSendResponse: (m: OCPPJMessage) => void,
onSendRequest: (m: OCPPJMessage) => void,
onReceiveResponse: (m: OCPPJMessage) => void,
},
) { }

public sendRequest<T extends ActionName<'v1.6-json'>>(action: T, { action: _, ocppVersion: __, ...payload }: Request<T, 'v1.6-json'>): EitherAsync<OCPPRequestError, Response<T, 'v1.6-json'>> {
Expand All @@ -28,22 +34,25 @@ export default class Connection<ReqAction extends ActionName<'v1.6-json'>> {
if (validateResult.isLeft())
return Left(new OCPPApplicationError(validateResult.extract().toString()))

await this.sendOCPPMessage({
const requestMessage = {
id,
type: MessageType.CALL,
action,
payload,
})
const message = await waitResponse;
} as const;
this.handlers?.onSendRequest(requestMessage);
await this.sendOCPPMessage(requestMessage);
const responseMessage = await waitResponse;

if (message.type === MessageType.CALL) return Left(
this.handlers?.onReceiveResponse(responseMessage);
if (responseMessage.type === MessageType.CALL) return Left(
new OCPPRequestError('response received was of CALL type, should be either CALLRESULT or CALLERROR')
);
if (message.type === MessageType.CALLERROR) return Left(
new OCPPRequestError('other side responded with error', message.errorCode, message.errorDescription, message.errorDetails)
if (responseMessage.type === MessageType.CALLERROR) return Left(
new OCPPRequestError('other side responded with error', responseMessage.errorCode, responseMessage.errorDescription, responseMessage.errorDetails)
);

return Right(message.payload as Response<T, 'v1.6-json'>);
return Right(responseMessage.payload as Response<T, 'v1.6-json'>);
})
}

Expand Down Expand Up @@ -71,6 +80,7 @@ export default class Connection<ReqAction extends ActionName<'v1.6-json'>> {
return MaybeAsync.fromPromise(async () => {
switch (message.type) {
case MessageType.CALL:
this.handlers?.onReceiveRequest(message);
const response =
await EitherAsync.liftEither(validateMessageRequest(message.action, message.payload ?? {}, this.requestedActions))
.map<{
Expand Down Expand Up @@ -111,6 +121,7 @@ export default class Connection<ReqAction extends ActionName<'v1.6-json'>> {
id: message.id,
payload,
}));
this.handlers?.onSendResponse(formattedResponse);
return Just(formattedResponse);
case MessageType.CALLERROR:
case MessageType.CALLRESULT:
Expand Down

0 comments on commit baa2292

Please sign in to comment.