diff --git a/.gitignore b/.gitignore index 161a448..643ff53 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .history/ *.exe *.txt -*.bak \ No newline at end of file +*.bak +Go_TCP_Websocket_Adapter diff --git a/Config.json b/Config.json index dcd4efd..0d2477f 100644 --- a/Config.json +++ b/Config.json @@ -7,11 +7,10 @@ "TCPRxConfig": { "Port": "10010" }, - "WebSocketTxConfig": { - "Port": "10100", - "RegisteredChunks": [ - "TimeChunk", - "FFTMagnitudeChunk" - ] + "WebSocketReportingTxConfig": { + "Port": "10101" + }, + "WebSocketDataTxConfig": { + "Port": "10100" } -} \ No newline at end of file +} diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index 078d449..3853853 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -2,12 +2,20 @@ package Routines import ( "sync" - "time" + "encoding/json" "github.com/gin-gonic/gin" "github.com/rs/zerolog" "github.com/gorilla/websocket" + "time" + "strconv" + "sync/atomic" ) +var upgrader = websocket.Upgrader{ + ReadBufferSize: 8096, + WriteBufferSize: 8096, +} + /// /// ROUTINE SAFE MAP FUNCTIONS /// @@ -18,46 +26,64 @@ Each string corresponds to channel to send a chunk type to a routine that shall handle that chunk */ type ChunkTypeToChannelMap struct { - mu sync.Mutex // Mutex to protect access to the map - chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs + loggingOutputChannel chan map[zerolog.Level]string // Channel to stream logging messages + reportingOutputChannel chan string // Channel to stream Reporting messages + chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs + mu sync.Mutex // Mutex to protect access to the map } +func NewChunkTypeToChannelMap(loggingOutputChannel chan map[zerolog.Level]string, reportingOutputChannel chan string) *ChunkTypeToChannelMap { + p := new(ChunkTypeToChannelMap) + p.loggingOutputChannel = loggingOutputChannel + p.reportingOutputChannel = reportingOutputChannel + return p +} /* Data string will be routed in the map given that chunk type key exists */ -func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeKey string, data string, router *gin.Engine) bool { +func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeKey string, data string, router *gin.Engine) { // We first check if the channel exists // And wait to try get it chunkRoutingChannel, channelExists := s.TryGetChannel(chunkTypeKey) if channelExists { - // and pass the data if it does + // and try pass the data if it does if there is space in the queue + if len(chunkRoutingChannel) >= cap(chunkRoutingChannel) { + + // Note: One should see issues in the status messages. It may be useful to log this but the current + // Implementation does not gaurentee that the UI will service all queues which will lead most of them + // To be overloaded a lot of the time as only the loaded page will service a particular websocket + + //loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeKey+" - Routing queue overflowwing") + return + } chunkRoutingChannel <- data - return channelExists + } else { - // or drop data and return false - // operate on the router itself - loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Here") + // If it does not set up a weboscket connection + // To manage connections for this chunk type s.RegisterChunkOnWebSocket(loggingChannel, chunkTypeKey, router) - return channelExists + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeKey+" - registered for routing") } } func (s *ChunkTypeToChannelMap) GetChannelData(chunkTypeKey string) (dataString string, success bool) { + // We first check if the channel exists // And wait to try get it chunkRoutingChannel, channelExists := s.TryGetChannel(chunkTypeKey) - if channelExists { - // and pass the data if it does - var data = <-chunkRoutingChannel - success = true - return data, success - } else { - // or drop data and return false - success = false - return "", success + if !channelExists { + return "", false } + var timeoutCh = time.After(250 * time.Millisecond) + + select { + case data := <-chunkRoutingChannel: + return data, true + case <-timeoutCh: + return "", false + } } /* @@ -68,6 +94,7 @@ func (s *ChunkTypeToChannelMap) TryGetChannel(chunkType string) (extractedChanne for { var lockAcquired = make(chan struct{}, 1) + // Spin this lambda up and try get the channel go func() { s.mu.Lock() extractedChannel, exists = s.chunkTypeRoutingMap[chunkType] @@ -75,17 +102,28 @@ func (s *ChunkTypeToChannelMap) TryGetChannel(chunkType string) (extractedChanne defer close(lockAcquired) }() + // Wait here until lock aquired channel is closed select { case <-lockAcquired: // Lock was acquired return extractedChannel, exists case <-time.After(1 * time.Millisecond): // Lock was not acquired, sleep and retry - time.Sleep(1 * time.Millisecond) // Sleep for 500 milliseconds, you can adjust the duration as needed. + time.Sleep(1 * time.Millisecond) } } } +func (s *ChunkTypeToChannelMap) GetChannelLengthAndCapacity(chunkTypeString string) (length int, capacity int, exists bool) { + var channel,exits = s.TryGetChannel(chunkTypeString) + + if !exits { + return -1, -1, exits + } + + return len(channel), cap(channel), exits +} + func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeString string, router *gin.Engine) { @@ -97,33 +135,121 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ s.chunkTypeRoutingMap = chunkTypeChannelMap } - s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 100) + s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 1000) - router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) { - // Upgrade the HTTP request into a websocket - WebSocketConnection, _ := upgrader.Upgrade(c.Writer, c.Request, nil) + // When you get this HTTP request open the websocket + // This permenantly add this to the http + // Router as we are using a reference + router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) { - defer WebSocketConnection.Close() + // Upgrade the HTTP request into a websocket + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Client calling for upgrade on /DataTypes/"+chunkTypeString) + WebSocketConnection, err := upgrader.Upgrade(c.Writer, c.Request, nil) + defer WebSocketConnection.Close() - currentTime := time.Now() - lastTime := currentTime + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error upgrading to WebSocket:"+ err.Error()) + return + } - // Then start up - var dataString, _ = s.GetChannelData(chunkTypeString) + // Spin up Routines to manage this websocket upgrade request + // When this socket is closed all management of this queue is + // Stopped leading it to grow to its max capacity and lock up - WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) - for { - currentTime = time.Now() - timeDiff := currentTime.Sub(lastTime) + var AtomicWebsocketClosed atomic.Bool // Atomic integer used as a flag (0: false, 1: true) + AtomicWebsocketClosed.Store(false) - var dataString, _ = s.GetChannelData(chunkTypeString) + var wg sync.WaitGroup + wg.Add(2) + go s.HandleReceivedSignals(loggingChannel, WebSocketConnection, &wg, &AtomicWebsocketClosed); + go s.HandleSignalTransmissions(loggingChannel ,WebSocketConnection, chunkTypeString, &wg, &AtomicWebsocketClosed ) + wg.Wait() + + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, chunkTypeString + " Routine shut down") - // Rate limiting - if timeDiff > (time.Millisecond * 1) { - WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) - lastTime = currentTime - } - } }) } + +func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, wg *sync.WaitGroup, AtomicWebsocketClosed *atomic.Bool) { + + defer wg.Done() + + for{ + // We now just wait for the close message + _, _, err := WebSocketConnection.ReadMessage() + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue reading message from WebSocket:" + err.Error()) + AtomicWebsocketClosed.Store(true) + break; + } + } +} + +func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, chunkTypeString string, wg *sync.WaitGroup, AtomicWebsocketClosed *atomic.Bool) { + + defer wg.Done() + currentTime := time.Now() + + for { + // Unmarshal the JSON string into a map + var bChannelExists = false + var strJSONData string + + var chstrJSONData = make(chan string, 1) + var chbCannelExists = make(chan bool, 1) + + // Launch a goroutine to try fetch data + go func() { + strJSONDataTemp, bChannelExistsTmp := s.GetChannelData(chunkTypeString) + chbCannelExists <- bChannelExistsTmp + chstrJSONData <- strJSONDataTemp + }() + + // While also limiting this with a timeout procedure + var chTimeout = time.After(250 * time.Millisecond) + + // And see which finihsed first + select { + case bChannelExists = <-chbCannelExists: + strJSONData = <- chstrJSONData + case <-chTimeout: + // And continure if a timeout occurred + continue + } + + // When we have data check the client has not closed out beautiful, stunning and special connection + if AtomicWebsocketClosed.Load() { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "RX Websocket closed, exiting write routine") + break + } + + // If the connection is open check we successfully got access to the channel and transmit data + if bChannelExists { + err := WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(strJSONData)) + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue writing message to WebSocket:"+ err.Error()) + AtomicWebsocketClosed.Store(true) + break + } + } + + // Then check if we should report the length of this chunks data channel + if time.Since(currentTime) > 1000*time.Millisecond { + + var len, cap, _ = s.GetChannelLengthAndCapacity(chunkTypeString) + currentTime= time.Now() + + // Create the reporting message + QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{ + StatEnvironment: "TCP_WS_Adapter", + StatName: chunkTypeString + "_Channel", + StatStaus: strconv.Itoa(len) + "/" + strconv.Itoa(cap), + }} + + // And send it to the dedicated reporting routine + data, _ := json.Marshal(QueueLogMessage) + s.reportingOutputChannel <- string(data) + } + } +} \ No newline at end of file diff --git a/Routines/LoggingRoutine.go b/Routines/LoggingRoutine.go index a1628ba..48ca48d 100644 --- a/Routines/LoggingRoutine.go +++ b/Routines/LoggingRoutine.go @@ -7,7 +7,17 @@ import ( "github.com/rs/zerolog" ) -func HandleLogging(configJson map[string]interface{}, routineCompleteChannel chan bool, dataChannel chan map[zerolog.Level]string) { +type SystemStatistic struct { + StatEnvironment string `json:"StatEnvironment"` + StatName string `json:"StatName"` + StatStaus string `json:"StatStaus"` +} + +type SystemInfo struct { + SystemStat SystemStatistic `json:"SystemInfo"` +} + +func HandleLogging(configJson map[string]interface{}, routineCompleteChannel chan bool, incomingDataChannel chan map[zerolog.Level]string) { // And finally create a logger var LogLevel = zerolog.DebugLevel @@ -79,7 +89,7 @@ func HandleLogging(configJson map[string]interface{}, routineCompleteChannel cha logger.Info().Msg("Starting logging routine") for { - levelMessageMap := <-dataChannel + levelMessageMap := <-incomingDataChannel for logLevelKey, LogMessageString := range levelMessageMap { if logLevelKey == zerolog.DebugLevel { diff --git a/Routines/WSDataTxRoutine.go b/Routines/WSDataTxRoutine.go new file mode 100644 index 0000000..2c3d22d --- /dev/null +++ b/Routines/WSDataTxRoutine.go @@ -0,0 +1,97 @@ +package Routines + +import ( + "encoding/json" + "net/http" + "os" + "time" + "github.com/gin-gonic/gin" + "github.com/rs/zerolog" + "strconv" +) + +func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, OutgoingReportingChannel chan string) { + + // Create websocket variables + var port string + + // And then try parse the JSON string + if WebSocketTxConfig, exists := configJson["WebSocketDataTxConfig"].(map[string]interface{}); exists { + port = WebSocketTxConfig["Port"].(string) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "WebSocketDataTxConfig opening on port" + port) + } else { + loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketDataTxConfig Config not found or not correct") + os.Exit(1) + return + } + + // Then we run the HTTP router + router := gin.Default() + // Allow all origins to connect + // Note that is is not safe + upgrader.CheckOrigin = func(r *http.Request) bool { + return true + } + + go RunChunkRoutingRoutine(loggingChannel, incomingDataChannel, router, OutgoingReportingChannel) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Starting http router") + router.Run(":" + port) + +} + +func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine, OutgoingReportingChannel chan string) { + + chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel, OutgoingReportingChannel) + currentTime := time.Now() + + for { + + bSendData := false + var strJSONData string + timeoutCh := time.After(5 * time.Millisecond) + + // Try get data while waiting for a timeout + select { + case strJSONData = <-incomingDataChannel: + bSendData = true + case <-timeoutCh: + } + + // Then try send the data + if bSendData { + + var JSONData map[string]interface{} + + if err := json.Unmarshal([]byte(strJSONData), &JSONData); err != nil { + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) + continue + } + + // Then try forward the JSON data onwards + // By first getting the root JSON Key (ChunkType) + var chunkTypeStringKey string + for key := range JSONData { + chunkTypeStringKey = key + break // We assume there's only one root key + } + + // And checking if it exists and trying to route it + chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, strJSONData, router) + } + + if time.Since(currentTime) > 1000*time.Millisecond { + + currentTime= time.Now() + + QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{ + StatEnvironment: "TCP_WS_Adapter", + StatName: "Routing_Output_Channel", + StatStaus: strconv.Itoa(len(incomingDataChannel)) + "/" + strconv.Itoa(cap(incomingDataChannel)), + }} + + data, _ := json.Marshal(QueueLogMessage) + OutgoingReportingChannel <- string(data) + } + } +} + diff --git a/Routines/WSReportingTxRoutine.go b/Routines/WSReportingTxRoutine.go new file mode 100644 index 0000000..503a2aa --- /dev/null +++ b/Routines/WSReportingTxRoutine.go @@ -0,0 +1,75 @@ +package Routines + +import ( + "encoding/json" + "net/http" + "os" + "github.com/gin-gonic/gin" + "github.com/rs/zerolog" +) + + +func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChannel chan bool, loggingChannel chan map[zerolog.Level]string, incomingDataChannel chan string) { + + // Create websocket variables + var port string + + // And then try parse the JSON string + if WebSocketTxConfig, exists := configJson["WebSocketReportingTxConfig"].(map[string]interface{}); exists { + port = WebSocketTxConfig["Port"].(string) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "WebSocketReportingTxConfig opening on port" + port) + } else { + + loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketReportingTxConfig Config not found or not correct") + os.Exit(1) + return + } + + // Then we run the HTTP router + router := gin.Default() + // Allow all origins to connect + // Note that is is not safe + upgrader.CheckOrigin = func(r *http.Request) bool { + return true + } + + go RunReportingRoutine(loggingChannel, routineCompleteChannel, incomingDataChannel, router) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Starting http router") + router.Run(":" + port) + +} + +func RunReportingRoutine(loggingChannel chan map[zerolog.Level]string, routineCompleteChannel chan bool, incomingDataChannel chan string, router *gin.Engine) { + + chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel, incomingDataChannel) + + for { + + // Unmarshal the JSON string into a map + JSONDataString := <-incomingDataChannel + var JSONData map[string]interface{} + + // Try convert the JSON doc to a string + if err := json.Unmarshal([]byte(JSONDataString), &JSONData); err != nil { + // If it fails, then skip to next interation + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) + continue + } + + // Then try forward the JSON data onwards + // By first getting the root JSON Key (ChunkType) + var chunkTypeStringKey string + + // We assume there's only one root key + for key := range JSONData { + chunkTypeStringKey = key + break + } + + // And try tranmit it on the routing threads + chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) + } + + routineCompleteChannel <- true +} + diff --git a/Routines/WSTxRoutine.go b/Routines/WSTxRoutine.go deleted file mode 100644 index 60b9235..0000000 --- a/Routines/WSTxRoutine.go +++ /dev/null @@ -1,77 +0,0 @@ -package Routines - -import ( - "encoding/json" - "net/http" - "os" - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" - "github.com/rs/zerolog" -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, -} - -func HandleWebSocketChunkTransmissions(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { - - // Create websocket variables - var port string - - // And then try parse the JSON string - if WebSocketTxConfig, exists := configJson["WebSocketTxConfig"].(map[string]interface{}); exists { - port = WebSocketTxConfig["Port"].(string) - } else { - loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketTxConfig Config not found or not correct") - os.Exit(1) - return - } - - // Then we run the HTTP router - router := gin.Default() - // Allow all origins to connect - // Note that is is not safe - upgrader.CheckOrigin = func(r *http.Request) bool { - return true - } - - go RunChunkRoutingRoutine(loggingChannel, incomingDataChannel, router) - loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Starting http router") - router.Run(":" + port) - -} - -func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine) { - - chunkTypeRoutingMap := new(ChunkTypeToChannelMap) - // start up and handle JSON chunks - for { - - // Unmarshal the JSON string into a map - JSONDataString := <-incomingDataChannel - var JSONData map[string]interface{} - - if err := json.Unmarshal([]byte(JSONDataString), &JSONData); err != nil { - loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) - // loggingChannel <- CreateLogMessage(zerolog.DebugLevel, "Received JSON as follows { "+JSONDataString+" }") - // return - } else { - // Then try forward the JSON data onwards - // By first getting the root JSON Key (ChunkType) - var chunkTypeStringKey string - - for key := range JSONData { - chunkTypeStringKey = key - break // We assume there's only one root key - } - - // And checking if it exists and trying to route it - sentSuccessfully := chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) - if !sentSuccessfully { - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeStringKey+" - newly registered in routing map") - } - } - } -} - diff --git a/main.go b/main.go index f765570..94c9a78 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "time" - "github.com/Sense-Scape/Go_TCP_Websocket_Adapter/v2/Routines" "github.com/rs/zerolog" ) @@ -35,17 +34,21 @@ func main() { return } - LoggingChannel := make(chan map[zerolog.Level]string) - routineCount = routineCount + 1 + LoggingChannel := make(chan map[zerolog.Level]string, 1000) + go Routines.HandleLogging(serverConfigStringMap, routineCompleteChannel, LoggingChannel) routineCount = routineCount + 1 - GenericChunkChannel := make(chan string) + ReportingChannel := make(chan string, 1000) + go Routines.HandleWSReportingTx(serverConfigStringMap,routineCompleteChannel,LoggingChannel,ReportingChannel) + + routineCount = routineCount + 1 + GenericChunkChannel := make(chan string, 1000) go Routines.HandleTCPReceivals(serverConfigStringMap, LoggingChannel, GenericChunkChannel) routineCount = routineCount + 1 - go Routines.HandleWebSocketChunkTransmissions(serverConfigStringMap, LoggingChannel, GenericChunkChannel) + go Routines.HandleWSDataChunkTx(serverConfigStringMap, LoggingChannel, GenericChunkChannel, ReportingChannel) for { time.Sleep(60 * time.Second)