-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple websocket server and message processing #263
Comments
Could you clarify what you mean by processing? I assume you want to create a distributed (scalable) CPMS application. Just copy the AWS concept and create a dedicated OCPP service, which will handle the communication with the charge points, and then just sent the messages to dedicated services, which will handle your application logic. This is what we've done in our company - we've used RabbitMQ instead of IoT core. Just be cautious because you need to handle the incoming messages (sent by the application to the charge point), so routing will be a bit more complex. There's a similar answer in #59. |
Hi @xBlaz3kx, By "decoupled processing" we mean a particular server(Fargate) to only run the WebSockets server, and maintain connections with the clients. We want this server to offload the processing (i.e. taking the request object and returning the response object) to a Lambda through an MQTT broker (IoT Core). This is what we're aiming for:
And this is how I pictured this library/repo being used:
And I want to know whether this is possible in this repo:
|
Hey @V4G4X, I now better understand what you're trying to do - you just want to forward raw websocket messages to a MQTT Broker, process the OCPP request messages in another service, and respond with an OCPP response message (basically mapping websockets to MQTT). I think thats a good approach, but it might cause a lot of headache if something goes wrong during the forwarding phase. As far as I could understand from the code, this is not possible without substantial modification of both the Websocket and OCPPJ server implementations, as the OCPPJ requires the Websocket server as a dependency, and you'd like to split the responsibility between the two. |
Thanks for the quick reply @xBlaz3kx I'm having trouble understanding how your approach is different from our approach of "mapping web sockets to MQTT". Could you help me understand this a little better? |
@V4G4X the lib is designed to directly read your WS, hence the suggested architecture is typically: I'm not familiar with the specific solution offered by AWS but after quickly looking at it I'm left wondering whether this lib is best suited for your solution: you would lose all the state management (such as client state and message queuing) that the library offers. Also you'd have to bootstrap the entire lib on every lambda invocation, which introduces quite a bit of overhead for processing a single message. |
That being said, all three layers provided in the lib have a clean separation and can potentially be swapped out with custom code. Here is a working example that simply replaces the websocket layer with a dummy (not tested with the full AWS solution though): package main
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"time"
"github.com/aws/aws-lambda-go/lambda"
"github.com/lorenzodonini/ocpp-go/ocpp1.6/types"
"github.com/lorenzodonini/ocpp-go/ocpp1.6/core"
ocpp16 "github.com/lorenzodonini/ocpp-go/ocpp1.6"
"github.com/lorenzodonini/ocpp-go/ws"
)
// --------- Mock structs ---------
type CloudDummyChannel struct {
ws.Channel
ClientID string
}
func (ws *CloudDummyChannel) ID() string {
return ws.ClientID
}
func (ws *CloudDummyChannel) RemoteAddr() net.Addr {
return nil
}
func (ws *CloudDummyChannel) TLSConnectionState() *tls.ConnectionState {
return nil
}
type CloudWebsocketServer struct {
ws.WsServer
MessageHandler func(ws ws.Channel, data []byte) error
NewClientHandler func(ws ws.Channel)
DisconnectedClientHandler func(ws ws.Channel)
errC chan error
DataC chan []byte
}
func (ws *CloudWebsocketServer) Start(port int, listenPath string) {
// Dummy
}
func (ws *CloudWebsocketServer) Stop() {
// Dummy
}
func (ws *CloudWebsocketServer) Write(_ string, data []byte) error {
// Forward the data to the custom channel, so it can be picked up externally
ws.DataC <- data
return nil
}
func (ws *CloudWebsocketServer) SetMessageHandler(handler func(ws ws.Channel, data []byte) error) {
ws.MessageHandler = handler
}
func (ws *CloudWebsocketServer) SetNewClientHandler(handler func(ws ws.Channel)) {
ws.NewClientHandler = handler
}
func (ws *CloudWebsocketServer) SetDisconnectedClientHandler(handler func(ws ws.Channel)) {
ws.DisconnectedClientHandler = handler
}
func (ws *CloudWebsocketServer) AddSupportedSubprotocol(_ string) {
// Dummy
}
func (ws *CloudWebsocketServer) SetCheckClientHandler(handler func(id string, r *http.Request) bool) {
// Dummy
}
func (ws *CloudWebsocketServer) Errors() <-chan error {
if ws.errC == nil {
ws.errC = make(chan error, 1)
}
return ws.errC
}
// --------- Custom logic for AWS ---------
func NewCloudWebsocketServer(dataC chan []byte) *CloudWebsocketServer {
return &CloudWebsocketServer{
DataC: dataC,
}
}
type LambdaMessage struct {
ClientID string
Message []byte
}
type CoreHandler struct {
core.CentralSystemHandler
}
func (c *CoreHandler) OnBootNotification(chargePointId string, request *core.BootNotificationRequest) (confirmation *core.BootNotificationConfirmation, err error) {
fmt.Println(request)
// Custom handler logic goes here
return core.NewBootNotificationConfirmation(types.NewDateTime(time.Now()), 60, core.RegistrationStatusAccepted), nil
}
func HandleRequest(ctx context.Context, msg *LambdaMessage) (out []byte, err error) {
dataC := make(chan []byte)
wsDummy := NewCloudWebsocketServer(dataC)
// Start OCPP server
ocppServer := ocpp16.NewCentralSystem(nil, wsDummy)
coreHandler := &CoreHandler{}
ocppServer.SetCoreHandler(coreHandler)
ocppServer.Start(0, "")
// Simulate new client connection
wsChannel := &CloudDummyChannel{
ClientID: msg.ClientID,
}
wsDummy.NewClientHandler(wsChannel)
// Pass message to channel
err = wsDummy.MessageHandler(wsChannel, msg.Message)
if err != nil {
return
}
// Wait for response
out = <-dataC
fmt.Println(string(out))
// Close OCPP server cleanly to prevent error logs
ocppServer.Stop()
// Return response
return
}
func main() {
lambda.Start(HandleRequest)
// Uncomment to test locally
//_, _ = HandleRequest(context.Background(), &LambdaMessage{
// ClientID: "test",
// Message: []byte(`[2,"test","BootNotification",{"chargePointVendor":"test","chargePointModel":"test"}]`),
//})
} To efficiently process a single message without starting the entire OCPP server on every request, the lib would require some re-design (I'm not even sure if this would be worth it). |
Here is how we usually do it using Proto.Actor and actors + virtual actors: (https://github.com/asynkron/protoactor-go) We have a WebSocket gateway, which basically just keeps the WebSocket connections. This means that the central system cluster can be redeployed, bug fixed etc. w/o dropping the WebSocket connections to the chargers. I would love to give this a try using this lib, assuming I can decouple the WebSocket part from the actual ocpp message processing. |
we are trying to implement the ocpp message processing by referring the AWS blog post - https://aws.amazon.com/blogs/iot/building-an-ocpp-compliant-electric-vehicle-charge-point-operator-solution-using-aws-iot-core/
In the post the websocket is running inside the fargate container which forwards the message to IOT topic, subscribed lambda gets invoked when message is received.
Please help if you can point how to segregate or provide the wrappers to do so.
The text was updated successfully, but these errors were encountered: