From b65fd528f462f513348dcbf07a6fecb267fef1dc Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 30 May 2024 07:28:34 +0200 Subject: [PATCH 01/16] Fix: Removing explicit chunk registration - now automated --- Config.json | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Config.json b/Config.json index dcd4efd..1d0df15 100644 --- a/Config.json +++ b/Config.json @@ -8,10 +8,6 @@ "Port": "10010" }, "WebSocketTxConfig": { - "Port": "10100", - "RegisteredChunks": [ - "TimeChunk", - "FFTMagnitudeChunk" - ] + "Port": "10100" } } \ No newline at end of file From 1a68fd6d0a071a2e31ce3349c7576c2e812b56af Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 30 May 2024 07:35:35 +0200 Subject: [PATCH 02/16] Chore: Adding bin name to ignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 161a448..643ff53 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .history/ *.exe *.txt -*.bak \ No newline at end of file +*.bak +Go_TCP_Websocket_Adapter From dd36d9127092daae04a5e57f909206eae3c85e45 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 30 May 2024 07:35:58 +0200 Subject: [PATCH 03/16] Chore: Standardising naming --- Routines/{WSTxRoutine.go => WSDataTxRoutine.go} | 2 +- main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename Routines/{WSTxRoutine.go => WSDataTxRoutine.go} (93%) diff --git a/Routines/WSTxRoutine.go b/Routines/WSDataTxRoutine.go similarity index 93% rename from Routines/WSTxRoutine.go rename to Routines/WSDataTxRoutine.go index 60b9235..8f64d6c 100644 --- a/Routines/WSTxRoutine.go +++ b/Routines/WSDataTxRoutine.go @@ -14,7 +14,7 @@ var upgrader = websocket.Upgrader{ WriteBufferSize: 1024, } -func HandleWebSocketChunkTransmissions(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { +func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { // Create websocket variables var port string diff --git a/main.go b/main.go index f765570..2a39b27 100644 --- a/main.go +++ b/main.go @@ -45,7 +45,7 @@ func main() { go Routines.HandleTCPReceivals(serverConfigStringMap, LoggingChannel, GenericChunkChannel) routineCount = routineCount + 1 - go Routines.HandleWebSocketChunkTransmissions(serverConfigStringMap, LoggingChannel, GenericChunkChannel) + go Routines.HandleWSDataChunkTx(serverConfigStringMap, LoggingChannel, GenericChunkChannel) for { time.Sleep(60 * time.Second) From 5fe959ec7de8a606d9afd238fb9f79e01adcf0bd Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 30 May 2024 07:51:50 +0200 Subject: [PATCH 04/16] Feat: First non functional building commit of reporting routine --- Routines/ChunkRouter.go | 5 +++ Routines/WSDataTxRoutine.go | 12 ++---- Routines/WSReportingTxRoutine.go | 74 ++++++++++++++++++++++++++++++++ main.go | 5 ++- 4 files changed, 86 insertions(+), 10 deletions(-) create mode 100644 Routines/WSReportingTxRoutine.go diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index 078d449..79e9c1e 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -8,6 +8,11 @@ import ( "github.com/gorilla/websocket" ) +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + /// /// ROUTINE SAFE MAP FUNCTIONS /// diff --git a/Routines/WSDataTxRoutine.go b/Routines/WSDataTxRoutine.go index 8f64d6c..a67400b 100644 --- a/Routines/WSDataTxRoutine.go +++ b/Routines/WSDataTxRoutine.go @@ -5,25 +5,19 @@ import ( "net/http" "os" "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" "github.com/rs/zerolog" ) -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, -} - func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { - + // Create websocket variables var port string // And then try parse the JSON string - if WebSocketTxConfig, exists := configJson["WebSocketTxConfig"].(map[string]interface{}); exists { + if WebSocketTxConfig, exists := configJson["WebSocketDataTxConfig"].(map[string]interface{}); exists { port = WebSocketTxConfig["Port"].(string) } else { - loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketTxConfig Config not found or not correct") + loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketDataTxConfig Config not found or not correct") os.Exit(1) return } diff --git a/Routines/WSReportingTxRoutine.go b/Routines/WSReportingTxRoutine.go new file mode 100644 index 0000000..79c46eb --- /dev/null +++ b/Routines/WSReportingTxRoutine.go @@ -0,0 +1,74 @@ +package Routines + +import ( + "encoding/json" + "net/http" + "os" + "github.com/gin-gonic/gin" + "github.com/rs/zerolog" +) + + +func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChannel chan bool, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { + + // Create websocket variables + var port string + + // And then try parse the JSON string + if WebSocketTxConfig, exists := configJson["WebSocketReportingTxConfig"].(map[string]interface{}); exists { + port = WebSocketTxConfig["Port"].(string) + } else { + loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketReportingTxConfig Config not found or not correct") + os.Exit(1) + return + } + + // Then we run the HTTP router + router := gin.Default() + // Allow all origins to connect + // Note that is is not safe + upgrader.CheckOrigin = func(r *http.Request) bool { + return true + } + + go RunReportingRoutine(loggingChannel, routineCompleteChannel, incomingDataChannel, router) + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Starting http router") + router.Run(":" + port) + +} + +func RunReportingRoutine(loggingChannel chan map[zerolog.Level]string, routineCompleteChannel chan bool, incomingDataChannel <-chan string, router *gin.Engine) { + + chunkTypeRoutingMap := new(ChunkTypeToChannelMap) + + for { + + // Unmarshal the JSON string into a map + JSONDataString := <-incomingDataChannel + var JSONData map[string]interface{} + + if err := json.Unmarshal([]byte(JSONDataString), &JSONData); err != nil { + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) + // loggingChannel <- CreateLogMessage(zerolog.DebugLevel, "Received JSON as follows { "+JSONDataString+" }") + // return + } else { + // Then try forward the JSON data onwards + // By first getting the root JSON Key (ChunkType) + var chunkTypeStringKey string + + for key := range JSONData { + chunkTypeStringKey = key + break // We assume there's only one root key + } + + // And checking if it exists and trying to route it + sentSuccessfully := chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) + if !sentSuccessfully { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeStringKey+" - newly registered in routing map") + } + } + } + + routineCompleteChannel <- true +} + diff --git a/main.go b/main.go index 2a39b27..5ac52ee 100644 --- a/main.go +++ b/main.go @@ -35,10 +35,13 @@ func main() { return } + routineCount = routineCount + 1 LoggingChannel := make(chan map[zerolog.Level]string) + go Routines.HandleLogging(serverConfigStringMap, routineCompleteChannel, LoggingChannel) routineCount = routineCount + 1 - go Routines.HandleLogging(serverConfigStringMap, routineCompleteChannel, LoggingChannel) + ReportingChannel := make(chan string) + go Routines.HandleWSReportingTx(serverConfigStringMap,routineCompleteChannel,LoggingChannel,ReportingChannel) routineCount = routineCount + 1 GenericChunkChannel := make(chan string) From 7630afe46efc33afa2d8a8f1fb1651ad081e2a98 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 30 May 2024 08:30:04 +0200 Subject: [PATCH 05/16] Fix: Removing rate limiting from chunk routing --- Routines/ChunkRouter.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index 79e9c1e..2fdfcd1 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -107,28 +107,11 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) { // Upgrade the HTTP request into a websocket WebSocketConnection, _ := upgrader.Upgrade(c.Writer, c.Request, nil) - defer WebSocketConnection.Close() - currentTime := time.Now() - lastTime := currentTime - - // Then start up - var dataString, _ = s.GetChannelData(chunkTypeString) - - WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) for { - - currentTime = time.Now() - timeDiff := currentTime.Sub(lastTime) - var dataString, _ = s.GetChannelData(chunkTypeString) - - // Rate limiting - if timeDiff > (time.Millisecond * 1) { - WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) - lastTime = currentTime - } + WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) } }) } From 800bcf284e86759fd94458fda53619ded702b967 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 30 May 2024 16:53:17 +0200 Subject: [PATCH 06/16] Fix: now running on start --- Config.json | 7 +++++-- Routines/ChunkRouter.go | 2 +- Routines/WSDataTxRoutine.go | 15 +++++++++++---- Routines/WSReportingTxRoutine.go | 3 ++- main.go | 9 +++++---- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/Config.json b/Config.json index 1d0df15..35fb82d 100644 --- a/Config.json +++ b/Config.json @@ -7,7 +7,10 @@ "TCPRxConfig": { "Port": "10010" }, - "WebSocketTxConfig": { + "WebSocketReportingTxConfig": { "Port": "10100" + }, + "WebSocketDataTxConfig": { + "Port": "10101" } -} \ No newline at end of file +} \ No newline at end of file diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index 2fdfcd1..ae10311 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -2,10 +2,10 @@ package Routines import ( "sync" - "time" "github.com/gin-gonic/gin" "github.com/rs/zerolog" "github.com/gorilla/websocket" + "time" ) var upgrader = websocket.Upgrader{ diff --git a/Routines/WSDataTxRoutine.go b/Routines/WSDataTxRoutine.go index a67400b..e873dae 100644 --- a/Routines/WSDataTxRoutine.go +++ b/Routines/WSDataTxRoutine.go @@ -4,11 +4,12 @@ import ( "encoding/json" "net/http" "os" + "time" "github.com/gin-gonic/gin" "github.com/rs/zerolog" ) -func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { +func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, OutgoingReportingChannel chan<- string) { // Create websocket variables var port string @@ -30,15 +31,17 @@ func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan return true } - go RunChunkRoutingRoutine(loggingChannel, incomingDataChannel, router) - loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Starting http router") + go RunChunkRoutingRoutine(loggingChannel, incomingDataChannel, router, OutgoingReportingChannel) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Starting http router") router.Run(":" + port) } -func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine) { +func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine, OutgoingReportingChannel chan<- string) { chunkTypeRoutingMap := new(ChunkTypeToChannelMap) + currentTime := time.Now() + // start up and handle JSON chunks for { @@ -65,6 +68,10 @@ func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomi if !sentSuccessfully { loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeStringKey+" - newly registered in routing map") } + + if time.Since(currentTime) > 500*time.Millisecond { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "-------------newly registered in routing map") + } } } } diff --git a/Routines/WSReportingTxRoutine.go b/Routines/WSReportingTxRoutine.go index 79c46eb..6872796 100644 --- a/Routines/WSReportingTxRoutine.go +++ b/Routines/WSReportingTxRoutine.go @@ -18,6 +18,7 @@ func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChann if WebSocketTxConfig, exists := configJson["WebSocketReportingTxConfig"].(map[string]interface{}); exists { port = WebSocketTxConfig["Port"].(string) } else { + loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketReportingTxConfig Config not found or not correct") os.Exit(1) return @@ -32,7 +33,7 @@ func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChann } go RunReportingRoutine(loggingChannel, routineCompleteChannel, incomingDataChannel, router) - loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Starting http router") + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Starting http router") router.Run(":" + port) } diff --git a/main.go b/main.go index 5ac52ee..b9d69fe 100644 --- a/main.go +++ b/main.go @@ -36,19 +36,20 @@ func main() { } routineCount = routineCount + 1 - LoggingChannel := make(chan map[zerolog.Level]string) + LoggingChannel := make(chan map[zerolog.Level]string, 1000) + go Routines.HandleLogging(serverConfigStringMap, routineCompleteChannel, LoggingChannel) routineCount = routineCount + 1 - ReportingChannel := make(chan string) + ReportingChannel := make(chan string, 1000) go Routines.HandleWSReportingTx(serverConfigStringMap,routineCompleteChannel,LoggingChannel,ReportingChannel) routineCount = routineCount + 1 - GenericChunkChannel := make(chan string) + GenericChunkChannel := make(chan string, 1000) go Routines.HandleTCPReceivals(serverConfigStringMap, LoggingChannel, GenericChunkChannel) routineCount = routineCount + 1 - go Routines.HandleWSDataChunkTx(serverConfigStringMap, LoggingChannel, GenericChunkChannel) + go Routines.HandleWSDataChunkTx(serverConfigStringMap, LoggingChannel, GenericChunkChannel, ReportingChannel) for { time.Sleep(60 * time.Second) From 3b8b6070d945a685f14c7a7db6e09a4970b6e824 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Tue, 4 Jun 2024 20:37:51 +0200 Subject: [PATCH 07/16] Feat: Refactor chun router to close #38 --- Routines/ChunkRouter.go | 63 ++++++++++++++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index ae10311..d86c71a 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -9,8 +9,8 @@ import ( ) var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, + ReadBufferSize: 8096, + WriteBufferSize: 8096, } /// @@ -25,6 +25,7 @@ routine that shall handle that chunk type ChunkTypeToChannelMap struct { mu sync.Mutex // Mutex to protect access to the map chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs + RoutineWaitGroup sync.WaitGroup } /* @@ -42,7 +43,6 @@ func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zer } else { // or drop data and return false // operate on the router itself - loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Here") s.RegisterChunkOnWebSocket(loggingChannel, chunkTypeKey, router) return channelExists } @@ -104,14 +104,55 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 100) - router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) { - // Upgrade the HTTP request into a websocket - WebSocketConnection, _ := upgrader.Upgrade(c.Writer, c.Request, nil) - defer WebSocketConnection.Close() + // When you get this HTTP request open the websocket + // This permenantly add this to the http + // Router as we are using a reference + router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) { + + // Upgrade the HTTP request into a websocket + WebSocketConnection, err := upgrader.Upgrade(c.Writer, c.Request, nil) + defer WebSocketConnection.Close() + + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error upgrading to WebSocket:"+ err.Error()) + return + } + + s.RoutineWaitGroup.Add(2) + go s.HandleReceivedSignals(loggingChannel, WebSocketConnection); + go s.HandleSignalTransmissions(loggingChannel ,WebSocketConnection, chunkTypeString ) + s.RoutineWaitGroup.Wait() - for { - var dataString, _ = s.GetChannelData(chunkTypeString) - WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) - } }) } + +func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn) { + + defer s.RoutineWaitGroup.Done() + + for{ + _, _, err := WebSocketConnection.ReadMessage() + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue reading message to WebSocket:"+ err.Error()) + WebSocketConnection.Close() + break; + } + } + +} + +func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, chunkTypeString string) { + + defer s.RoutineWaitGroup.Done() + + for { + var dataString, _ = s.GetChannelData(chunkTypeString) + err := WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue writing message to WebSocket:"+ err.Error()) + WebSocketConnection.Close() + break + } + } + +} \ No newline at end of file From db45a34d81af3c0897f69d0e4d34090a23f3f590 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 6 Jun 2024 08:01:17 +0200 Subject: [PATCH 08/16] Feat: Refactor to allow internal handling of full input queue --- Routines/ChunkRouter.go | 52 ++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index d86c71a..a6d2d64 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -23,28 +23,43 @@ Each string corresponds to channel to send a chunk type to a routine that shall handle that chunk */ type ChunkTypeToChannelMap struct { - mu sync.Mutex // Mutex to protect access to the map - chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs - RoutineWaitGroup sync.WaitGroup + loggingOutputChannel chan map[zerolog.Level]string // Channel to stream loggin messages + chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs + mu sync.Mutex // Mutex to protect access to the map + routineWaitGroup sync.WaitGroup } +func NewChunkTypeToChannelMap(loggingOutputChannel chan map[zerolog.Level]string) *ChunkTypeToChannelMap { + p := new(ChunkTypeToChannelMap) + p.loggingOutputChannel = loggingOutputChannel + return p +} /* Data string will be routed in the map given that chunk type key exists */ -func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeKey string, data string, router *gin.Engine) bool { +func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeKey string, data string, router *gin.Engine) { // We first check if the channel exists // And wait to try get it chunkRoutingChannel, channelExists := s.TryGetChannel(chunkTypeKey) if channelExists { - // and pass the data if it does + // and try pass the data if it does if there is space in the queue + if len(chunkRoutingChannel) >= cap(chunkRoutingChannel) { + + // Note: One should see issues in the status messages. It may be useful to log this but the current + // Implementation does not gaurentee that the UI will service all queues which will lead most of them + // To be overloaded a lot of the time as only the loaded page will service a particular websocket + + //loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeKey+" - Routing queue overflowwing") + return + } chunkRoutingChannel <- data - return channelExists + } else { - // or drop data and return false - // operate on the router itself + // If it does not set up a weboscket connection + // To manage connections for this chunk type s.RegisterChunkOnWebSocket(loggingChannel, chunkTypeKey, router) - return channelExists + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeKey+" - registered for routing") } } @@ -73,6 +88,7 @@ func (s *ChunkTypeToChannelMap) TryGetChannel(chunkType string) (extractedChanne for { var lockAcquired = make(chan struct{}, 1) + // Spin this lambda up and try get the channel go func() { s.mu.Lock() extractedChannel, exists = s.chunkTypeRoutingMap[chunkType] @@ -80,13 +96,14 @@ func (s *ChunkTypeToChannelMap) TryGetChannel(chunkType string) (extractedChanne defer close(lockAcquired) }() + // Wait here until lock aquired channel is closed select { case <-lockAcquired: // Lock was acquired return extractedChannel, exists case <-time.After(1 * time.Millisecond): // Lock was not acquired, sleep and retry - time.Sleep(1 * time.Millisecond) // Sleep for 500 milliseconds, you can adjust the duration as needed. + time.Sleep(1 * time.Millisecond) } } } @@ -118,19 +135,23 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ return } - s.RoutineWaitGroup.Add(2) + // Spin up Routines to manage this websocket upgrade request + // When this socket is closed all management of this queue is + // Stopped leading it to grow to its max capacity and lock up + s.routineWaitGroup.Add(2) go s.HandleReceivedSignals(loggingChannel, WebSocketConnection); go s.HandleSignalTransmissions(loggingChannel ,WebSocketConnection, chunkTypeString ) - s.RoutineWaitGroup.Wait() + s.routineWaitGroup.Wait() }) } func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn) { - defer s.RoutineWaitGroup.Done() + defer s.routineWaitGroup.Done() for{ + // We now just wait for the close message _, _, err := WebSocketConnection.ReadMessage() if err != nil { loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue reading message to WebSocket:"+ err.Error()) @@ -138,14 +159,14 @@ func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zer break; } } - } func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, chunkTypeString string) { - defer s.RoutineWaitGroup.Done() + defer s.routineWaitGroup.Done() for { + // Now we get the data to transmit on the websocket var dataString, _ = s.GetChannelData(chunkTypeString) err := WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) if err != nil { @@ -154,5 +175,4 @@ func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map break } } - } \ No newline at end of file From f1d465fd5e3508bf8d4d2a16aef1e5ddb4fc171c Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 6 Jun 2024 08:03:02 +0200 Subject: [PATCH 09/16] Feat: Adding initial loggin data struct - will change in future --- Routines/LoggingRoutine.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/Routines/LoggingRoutine.go b/Routines/LoggingRoutine.go index a1628ba..48ca48d 100644 --- a/Routines/LoggingRoutine.go +++ b/Routines/LoggingRoutine.go @@ -7,7 +7,17 @@ import ( "github.com/rs/zerolog" ) -func HandleLogging(configJson map[string]interface{}, routineCompleteChannel chan bool, dataChannel chan map[zerolog.Level]string) { +type SystemStatistic struct { + StatEnvironment string `json:"StatEnvironment"` + StatName string `json:"StatName"` + StatStaus string `json:"StatStaus"` +} + +type SystemInfo struct { + SystemStat SystemStatistic `json:"SystemInfo"` +} + +func HandleLogging(configJson map[string]interface{}, routineCompleteChannel chan bool, incomingDataChannel chan map[zerolog.Level]string) { // And finally create a logger var LogLevel = zerolog.DebugLevel @@ -79,7 +89,7 @@ func HandleLogging(configJson map[string]interface{}, routineCompleteChannel cha logger.Info().Msg("Starting logging routine") for { - levelMessageMap := <-dataChannel + levelMessageMap := <-incomingDataChannel for logLevelKey, LogMessageString := range levelMessageMap { if logLevelKey == zerolog.DebugLevel { From 0bc84cb8457fd3d0b9a52087942981649135e68a Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 6 Jun 2024 08:04:01 +0200 Subject: [PATCH 10/16] Chore: Commenting file --- Routines/WSReportingTxRoutine.go | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/Routines/WSReportingTxRoutine.go b/Routines/WSReportingTxRoutine.go index 6872796..6990362 100644 --- a/Routines/WSReportingTxRoutine.go +++ b/Routines/WSReportingTxRoutine.go @@ -17,6 +17,7 @@ func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChann // And then try parse the JSON string if WebSocketTxConfig, exists := configJson["WebSocketReportingTxConfig"].(map[string]interface{}); exists { port = WebSocketTxConfig["Port"].(string) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "WebSocketReportingTxConfig opening on port" + port) } else { loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketReportingTxConfig Config not found or not correct") @@ -40,7 +41,7 @@ func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChann func RunReportingRoutine(loggingChannel chan map[zerolog.Level]string, routineCompleteChannel chan bool, incomingDataChannel <-chan string, router *gin.Engine) { - chunkTypeRoutingMap := new(ChunkTypeToChannelMap) + chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel) for { @@ -48,26 +49,25 @@ func RunReportingRoutine(loggingChannel chan map[zerolog.Level]string, routineCo JSONDataString := <-incomingDataChannel var JSONData map[string]interface{} + // Try convert the JSON doc to a string if err := json.Unmarshal([]byte(JSONDataString), &JSONData); err != nil { + // If it fails, then skip to next interation loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) - // loggingChannel <- CreateLogMessage(zerolog.DebugLevel, "Received JSON as follows { "+JSONDataString+" }") - // return - } else { - // Then try forward the JSON data onwards - // By first getting the root JSON Key (ChunkType) - var chunkTypeStringKey string - - for key := range JSONData { - chunkTypeStringKey = key - break // We assume there's only one root key - } - - // And checking if it exists and trying to route it - sentSuccessfully := chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) - if !sentSuccessfully { - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeStringKey+" - newly registered in routing map") - } + continue + } + + // Then try forward the JSON data onwards + // By first getting the root JSON Key (ChunkType) + var chunkTypeStringKey string + + // We assume there's only one root key + for key := range JSONData { + chunkTypeStringKey = key + break } + + // And try tranmit it on the routing threads + chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) } routineCompleteChannel <- true From 0e962100d93cbf25750a480a329d1542c9af788d Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 6 Jun 2024 08:04:47 +0200 Subject: [PATCH 11/16] Feat: Initial implementation of input Q logging --- Routines/WSDataTxRoutine.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/Routines/WSDataTxRoutine.go b/Routines/WSDataTxRoutine.go index e873dae..7281840 100644 --- a/Routines/WSDataTxRoutine.go +++ b/Routines/WSDataTxRoutine.go @@ -7,6 +7,7 @@ import ( "time" "github.com/gin-gonic/gin" "github.com/rs/zerolog" + "strconv" ) func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, OutgoingReportingChannel chan<- string) { @@ -17,6 +18,7 @@ func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan // And then try parse the JSON string if WebSocketTxConfig, exists := configJson["WebSocketDataTxConfig"].(map[string]interface{}); exists { port = WebSocketTxConfig["Port"].(string) + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "WebSocketDataTxConfig opening on port" + port) } else { loggingChannel <- CreateLogMessage(zerolog.FatalLevel, "WebSocketDataTxConfig Config not found or not correct") os.Exit(1) @@ -39,7 +41,7 @@ func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine, OutgoingReportingChannel chan<- string) { - chunkTypeRoutingMap := new(ChunkTypeToChannelMap) + chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel) currentTime := time.Now() // start up and handle JSON chunks @@ -64,13 +66,20 @@ func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomi } // And checking if it exists and trying to route it - sentSuccessfully := chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) - if !sentSuccessfully { - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeStringKey+" - newly registered in routing map") - } + chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) if time.Since(currentTime) > 500*time.Millisecond { - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "-------------newly registered in routing map") + + currentTime= time.Now() + + QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{ + StatEnvironment: "TCP_WS_Adapter", + StatName: chunkTypeStringKey, + StatStaus: strconv.Itoa(len(OutgoingReportingChannel)) + "/" + strconv.Itoa(cap(OutgoingReportingChannel)), + }} + + data, _ := json.Marshal(QueueLogMessage) + OutgoingReportingChannel <- string(data) } } } From 30b9a98abc23f749ea1f096556738698ae8aac8c Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Thu, 6 Jun 2024 08:05:19 +0200 Subject: [PATCH 12/16] Fix: Correcting default ports in config file --- Config.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Config.json b/Config.json index 35fb82d..0d2477f 100644 --- a/Config.json +++ b/Config.json @@ -8,9 +8,9 @@ "Port": "10010" }, "WebSocketReportingTxConfig": { - "Port": "10100" + "Port": "10101" }, "WebSocketDataTxConfig": { - "Port": "10101" + "Port": "10100" } -} \ No newline at end of file +} From 1cf67c550aa03e18c07dbb0f0e7a6fac1bdbea15 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Sun, 9 Jun 2024 11:55:48 +0200 Subject: [PATCH 13/16] Chore: Removing extra line --- main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/main.go b/main.go index b9d69fe..94c9a78 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "time" - "github.com/Sense-Scape/Go_TCP_Websocket_Adapter/v2/Routines" "github.com/rs/zerolog" ) From 93b03de9cf829e544e96199e09d57b5e6c58c3dc Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Sun, 9 Jun 2024 12:17:43 +0200 Subject: [PATCH 14/16] Feat: Passing data to reporting channel --- Routines/ChunkRouter.go | 138 ++++++++++++++++++++++++------- Routines/WSDataTxRoutine.go | 60 ++++++++------ Routines/WSReportingTxRoutine.go | 6 +- 3 files changed, 146 insertions(+), 58 deletions(-) diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index a6d2d64..b0b3ca4 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -2,10 +2,13 @@ package Routines import ( "sync" + "encoding/json" "github.com/gin-gonic/gin" "github.com/rs/zerolog" "github.com/gorilla/websocket" "time" + "strconv" + "sync/atomic" ) var upgrader = websocket.Upgrader{ @@ -23,15 +26,16 @@ Each string corresponds to channel to send a chunk type to a routine that shall handle that chunk */ type ChunkTypeToChannelMap struct { - loggingOutputChannel chan map[zerolog.Level]string // Channel to stream loggin messages + loggingOutputChannel chan map[zerolog.Level]string // Channel to stream logging messages + reportingOutputChannel chan string // Channel to stream Reporting messages chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs mu sync.Mutex // Mutex to protect access to the map - routineWaitGroup sync.WaitGroup } -func NewChunkTypeToChannelMap(loggingOutputChannel chan map[zerolog.Level]string) *ChunkTypeToChannelMap { +func NewChunkTypeToChannelMap(loggingOutputChannel chan map[zerolog.Level]string, reportingOutputChannel chan string) *ChunkTypeToChannelMap { p := new(ChunkTypeToChannelMap) p.loggingOutputChannel = loggingOutputChannel + p.reportingOutputChannel = reportingOutputChannel return p } /* @@ -64,20 +68,22 @@ func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zer } func (s *ChunkTypeToChannelMap) GetChannelData(chunkTypeKey string) (dataString string, success bool) { + // We first check if the channel exists // And wait to try get it chunkRoutingChannel, channelExists := s.TryGetChannel(chunkTypeKey) - if channelExists { - // and pass the data if it does - var data = <-chunkRoutingChannel - success = true - return data, success - } else { - // or drop data and return false - success = false - return "", success + if !channelExists { + return "", false } + var timeoutCh = time.After(250 * time.Millisecond) + + select { + case data := <-chunkRoutingChannel: + return data, true + case <-timeoutCh: + return "", false + } } /* @@ -108,6 +114,16 @@ func (s *ChunkTypeToChannelMap) TryGetChannel(chunkType string) (extractedChanne } } +func (s *ChunkTypeToChannelMap) GetChannelLengthAndCapacity(chunkTypeString string) (length int, capacity int, exists bool) { + var channel,exits = s.TryGetChannel(chunkTypeString) + + if !exits { + return -1, -1, exits + } + + return len(channel), cap(channel), exits +} + func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeString string, router *gin.Engine) { @@ -119,7 +135,7 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ s.chunkTypeRoutingMap = chunkTypeChannelMap } - s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 100) + s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 1000) // When you get this HTTP request open the websocket // This permenantly add this to the http @@ -127,6 +143,7 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) { // Upgrade the HTTP request into a websocket + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Client calling for upgrade on /DataTypes/"+chunkTypeString) WebSocketConnection, err := upgrader.Upgrade(c.Writer, c.Request, nil) defer WebSocketConnection.Close() @@ -138,41 +155,102 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[ // Spin up Routines to manage this websocket upgrade request // When this socket is closed all management of this queue is // Stopped leading it to grow to its max capacity and lock up - s.routineWaitGroup.Add(2) - go s.HandleReceivedSignals(loggingChannel, WebSocketConnection); - go s.HandleSignalTransmissions(loggingChannel ,WebSocketConnection, chunkTypeString ) - s.routineWaitGroup.Wait() + + + var AtomicWebsocketClosed atomic.Bool // Atomic integer used as a flag (0: false, 1: true) + AtomicWebsocketClosed.Store(false) + + var wg sync.WaitGroup + wg.Add(2) + go s.HandleReceivedSignals(loggingChannel, WebSocketConnection, &wg, &AtomicWebsocketClosed); + go s.HandleSignalTransmissions(loggingChannel ,WebSocketConnection, chunkTypeString, &wg, &AtomicWebsocketClosed ) + wg.Wait() + + loggingChannel <- CreateLogMessage(zerolog.InfoLevel, chunkTypeString + " Routine shut down") }) } -func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn) { +func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, wg *sync.WaitGroup, AtomicWebsocketClosed *atomic.Bool) { - defer s.routineWaitGroup.Done() + defer wg.Done() for{ // We now just wait for the close message _, _, err := WebSocketConnection.ReadMessage() if err != nil { - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue reading message to WebSocket:"+ err.Error()) - WebSocketConnection.Close() + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue reading message from WebSocket:" + err.Error()) + AtomicWebsocketClosed.Store(true) break; } } } -func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, chunkTypeString string) { - - defer s.routineWaitGroup.Done() +func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, chunkTypeString string, wg *sync.WaitGroup, AtomicWebsocketClosed *atomic.Bool) { + defer wg.Done() + currentTime := time.Now() + for { - // Now we get the data to transmit on the websocket - var dataString, _ = s.GetChannelData(chunkTypeString) - err := WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString)) - if err != nil { - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue writing message to WebSocket:"+ err.Error()) - WebSocketConnection.Close() + // Unmarshal the JSON string into a map + var bChannelExists = false + var strJSONData string + + var chstrJSONData = make(chan string, 1) + var chbCannelExists = make(chan bool, 1) + + // Launch a goroutine to try fetch data + go func() { + strJSONDataTemp, bChannelExistsTmp := s.GetChannelData(chunkTypeString) + chbCannelExists <- bChannelExistsTmp + chstrJSONData <- strJSONDataTemp + }() + + // While also limiting this with a timeout procedure + var chTimeout = time.After(250 * time.Millisecond) + + // And see which finihsed first + select { + case bChannelExists = <-chbCannelExists: + strJSONData = <- chstrJSONData + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "1") + case <-chTimeout: + // And continure if a timeout occurred + continue + } + + // When we have data check the client has not closed out beautiful, stunning and special connection + if AtomicWebsocketClosed.Load() { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "RX Websocket closed, exiting write routine") break } + + // If the connection is open check we successfully got access to the channel and transmit data + if bChannelExists { + err := WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(strJSONData)) + if err != nil { + loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue writing message to WebSocket:"+ err.Error()) + AtomicWebsocketClosed.Store(true) + break + } + } + + // Then check if we should report the length of this chunks data channel + if time.Since(currentTime) > 1000*time.Millisecond { + + var len, cap, _ = s.GetChannelLengthAndCapacity(chunkTypeString) + currentTime= time.Now() + + // Create the reporting message + QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{ + StatEnvironment: "TCP_WS_Adapter", + StatName: chunkTypeString + "_Channel", + StatStaus: strconv.Itoa(len) + "/" + strconv.Itoa(cap), + }} + + // And send it to the dedicated reporting routine + data, _ := json.Marshal(QueueLogMessage) + s.reportingOutputChannel <- string(data) + } } } \ No newline at end of file diff --git a/Routines/WSDataTxRoutine.go b/Routines/WSDataTxRoutine.go index 7281840..5f17849 100644 --- a/Routines/WSDataTxRoutine.go +++ b/Routines/WSDataTxRoutine.go @@ -10,7 +10,7 @@ import ( "strconv" ) -func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, OutgoingReportingChannel chan<- string) { +func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, OutgoingReportingChannel chan string) { // Create websocket variables var port string @@ -39,27 +39,37 @@ func HandleWSDataChunkTx(configJson map[string]interface{}, loggingChannel chan } -func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine, OutgoingReportingChannel chan<- string) { +func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string, router *gin.Engine, OutgoingReportingChannel chan string) { - chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel) - currentTime := time.Now() + chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel, OutgoingReportingChannel) + currentTime := time.Now() - // start up and handle JSON chunks for { - // Unmarshal the JSON string into a map - JSONDataString := <-incomingDataChannel - var JSONData map[string]interface{} + bSendData := false + var strJSONData string + timeoutCh := time.After(5 * time.Millisecond) + + // Try get data while waiting for a timeout + select { + case strJSONData = <-incomingDataChannel: + bSendData = true + case <-timeoutCh: + } + + // Then try send the data + if bSendData { + + var JSONData map[string]interface{} + + if err := json.Unmarshal([]byte(strJSONData), &JSONData); err != nil { + loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) + continue + } - if err := json.Unmarshal([]byte(JSONDataString), &JSONData); err != nil { - loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error unmarshaling JSON in routing routine:"+err.Error()) - // loggingChannel <- CreateLogMessage(zerolog.DebugLevel, "Received JSON as follows { "+JSONDataString+" }") - // return - } else { // Then try forward the JSON data onwards // By first getting the root JSON Key (ChunkType) var chunkTypeStringKey string - for key := range JSONData { chunkTypeStringKey = key break // We assume there's only one root key @@ -67,20 +77,20 @@ func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomi // And checking if it exists and trying to route it chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) + } - if time.Since(currentTime) > 500*time.Millisecond { + if time.Since(currentTime) > 1000*time.Millisecond { - currentTime= time.Now() + currentTime= time.Now() - QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{ - StatEnvironment: "TCP_WS_Adapter", - StatName: chunkTypeStringKey, - StatStaus: strconv.Itoa(len(OutgoingReportingChannel)) + "/" + strconv.Itoa(cap(OutgoingReportingChannel)), - }} - - data, _ := json.Marshal(QueueLogMessage) - OutgoingReportingChannel <- string(data) - } + QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{ + StatEnvironment: "TCP_WS_Adapter", + StatName: "Routing_Output_Channel", + StatStaus: strconv.Itoa(len(incomingDataChannel)) + "/" + strconv.Itoa(cap(incomingDataChannel)), + }} + + data, _ := json.Marshal(QueueLogMessage) + OutgoingReportingChannel <- string(data) } } } diff --git a/Routines/WSReportingTxRoutine.go b/Routines/WSReportingTxRoutine.go index 6990362..503a2aa 100644 --- a/Routines/WSReportingTxRoutine.go +++ b/Routines/WSReportingTxRoutine.go @@ -9,7 +9,7 @@ import ( ) -func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChannel chan bool, loggingChannel chan map[zerolog.Level]string, incomingDataChannel <-chan string) { +func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChannel chan bool, loggingChannel chan map[zerolog.Level]string, incomingDataChannel chan string) { // Create websocket variables var port string @@ -39,9 +39,9 @@ func HandleWSReportingTx(configJson map[string]interface{}, routineCompleteChann } -func RunReportingRoutine(loggingChannel chan map[zerolog.Level]string, routineCompleteChannel chan bool, incomingDataChannel <-chan string, router *gin.Engine) { +func RunReportingRoutine(loggingChannel chan map[zerolog.Level]string, routineCompleteChannel chan bool, incomingDataChannel chan string, router *gin.Engine) { - chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel) + chunkTypeRoutingMap := NewChunkTypeToChannelMap(loggingChannel, incomingDataChannel) for { From fcf4a7f0adfd18f05027b9633150e1893e1b981f Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Sun, 9 Jun 2024 12:20:34 +0200 Subject: [PATCH 15/16] Fix: to rename var --- Routines/WSDataTxRoutine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Routines/WSDataTxRoutine.go b/Routines/WSDataTxRoutine.go index 5f17849..2c3d22d 100644 --- a/Routines/WSDataTxRoutine.go +++ b/Routines/WSDataTxRoutine.go @@ -76,7 +76,7 @@ func RunChunkRoutingRoutine(loggingChannel chan map[zerolog.Level]string, incomi } // And checking if it exists and trying to route it - chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, JSONDataString, router) + chunkTypeRoutingMap.SendChunkToWebSocket(loggingChannel, chunkTypeStringKey, strJSONData, router) } if time.Since(currentTime) > 1000*time.Millisecond { From 4ca837d7db0842b0de5192e6492acef80057bb15 Mon Sep 17 00:00:00 2001 From: Grabt234 Date: Sun, 9 Jun 2024 12:22:25 +0200 Subject: [PATCH 16/16] Fix: to remove extra print --- Routines/ChunkRouter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/Routines/ChunkRouter.go b/Routines/ChunkRouter.go index b0b3ca4..3853853 100644 --- a/Routines/ChunkRouter.go +++ b/Routines/ChunkRouter.go @@ -213,7 +213,6 @@ func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map select { case bChannelExists = <-chbCannelExists: strJSONData = <- chstrJSONData - loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "1") case <-chTimeout: // And continure if a timeout occurred continue