Skip to content

Commit

Permalink
Merge pull request #40 from Sense-Scape/38-feat-add-support-for-queue…
Browse files Browse the repository at this point in the history
…-size-reporting

38 feat add support for queue size reporting
  • Loading branch information
Grabt234 authored Jun 9, 2024
2 parents f75fbfa + 4ca837d commit d5d06ba
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 132 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.history/
*.exe
*.txt
*.bak
*.bak
Go_TCP_Websocket_Adapter
13 changes: 6 additions & 7 deletions Config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
"TCPRxConfig": {
"Port": "10010"
},
"WebSocketTxConfig": {
"Port": "10100",
"RegisteredChunks": [
"TimeChunk",
"FFTMagnitudeChunk"
]
"WebSocketReportingTxConfig": {
"Port": "10101"
},
"WebSocketDataTxConfig": {
"Port": "10100"
}
}
}
206 changes: 166 additions & 40 deletions Routines/ChunkRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ package Routines

import (
"sync"
"time"
"encoding/json"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
"github.com/gorilla/websocket"
"time"
"strconv"
"sync/atomic"
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 8096,
WriteBufferSize: 8096,
}

///
/// ROUTINE SAFE MAP FUNCTIONS
///
Expand All @@ -18,46 +26,64 @@ Each string corresponds to channel to send a chunk type to a
routine that shall handle that chunk
*/
type ChunkTypeToChannelMap struct {
mu sync.Mutex // Mutex to protect access to the map
chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs
loggingOutputChannel chan map[zerolog.Level]string // Channel to stream logging messages
reportingOutputChannel chan string // Channel to stream Reporting messages
chunkTypeRoutingMap map[string](chan string) // Map of chunk type string and channel key value pairs
mu sync.Mutex // Mutex to protect access to the map
}

func NewChunkTypeToChannelMap(loggingOutputChannel chan map[zerolog.Level]string, reportingOutputChannel chan string) *ChunkTypeToChannelMap {
p := new(ChunkTypeToChannelMap)
p.loggingOutputChannel = loggingOutputChannel
p.reportingOutputChannel = reportingOutputChannel
return p
}
/*
Data string will be routed in the map given that chunk type key exists
*/
func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeKey string, data string, router *gin.Engine) bool {
func (s *ChunkTypeToChannelMap) SendChunkToWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeKey string, data string, router *gin.Engine) {

// We first check if the channel exists
// And wait to try get it
chunkRoutingChannel, channelExists := s.TryGetChannel(chunkTypeKey)
if channelExists {
// and pass the data if it does
// and try pass the data if it does if there is space in the queue
if len(chunkRoutingChannel) >= cap(chunkRoutingChannel) {

// Note: One should see issues in the status messages. It may be useful to log this but the current
// Implementation does not gaurentee that the UI will service all queues which will lead most of them
// To be overloaded a lot of the time as only the loaded page will service a particular websocket

//loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeKey+" - Routing queue overflowwing")
return
}
chunkRoutingChannel <- data
return channelExists

} else {
// or drop data and return false
// operate on the router itself
loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Here")
// If it does not set up a weboscket connection
// To manage connections for this chunk type
s.RegisterChunkOnWebSocket(loggingChannel, chunkTypeKey, router)
return channelExists
loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "ChunkType - "+chunkTypeKey+" - registered for routing")
}
}

func (s *ChunkTypeToChannelMap) GetChannelData(chunkTypeKey string) (dataString string, success bool) {

// We first check if the channel exists
// And wait to try get it
chunkRoutingChannel, channelExists := s.TryGetChannel(chunkTypeKey)
if channelExists {
// and pass the data if it does
var data = <-chunkRoutingChannel
success = true
return data, success
} else {
// or drop data and return false
success = false
return "", success
if !channelExists {
return "", false
}

var timeoutCh = time.After(250 * time.Millisecond)

select {
case data := <-chunkRoutingChannel:
return data, true
case <-timeoutCh:
return "", false
}
}

/*
Expand All @@ -68,24 +94,36 @@ func (s *ChunkTypeToChannelMap) TryGetChannel(chunkType string) (extractedChanne
for {
var lockAcquired = make(chan struct{}, 1)

// Spin this lambda up and try get the channel
go func() {
s.mu.Lock()
extractedChannel, exists = s.chunkTypeRoutingMap[chunkType]
s.mu.Unlock()
defer close(lockAcquired)
}()

// Wait here until lock aquired channel is closed
select {
case <-lockAcquired:
// Lock was acquired
return extractedChannel, exists
case <-time.After(1 * time.Millisecond):
// Lock was not acquired, sleep and retry
time.Sleep(1 * time.Millisecond) // Sleep for 500 milliseconds, you can adjust the duration as needed.
time.Sleep(1 * time.Millisecond)
}
}
}

func (s *ChunkTypeToChannelMap) GetChannelLengthAndCapacity(chunkTypeString string) (length int, capacity int, exists bool) {
var channel,exits = s.TryGetChannel(chunkTypeString)

if !exits {
return -1, -1, exits
}

return len(channel), cap(channel), exits
}

func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[zerolog.Level]string, chunkTypeString string, router *gin.Engine) {


Expand All @@ -97,33 +135,121 @@ func (s *ChunkTypeToChannelMap)RegisterChunkOnWebSocket(loggingChannel chan map[
s.chunkTypeRoutingMap = chunkTypeChannelMap
}

s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 100)
s.chunkTypeRoutingMap[chunkTypeString] = make(chan string, 1000)

router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) {
// Upgrade the HTTP request into a websocket
WebSocketConnection, _ := upgrader.Upgrade(c.Writer, c.Request, nil)
// When you get this HTTP request open the websocket
// This permenantly add this to the http
// Router as we are using a reference
router.GET("/DataTypes/"+chunkTypeString, func(c *gin.Context) {

defer WebSocketConnection.Close()
// Upgrade the HTTP request into a websocket
loggingChannel <- CreateLogMessage(zerolog.InfoLevel, "Client calling for upgrade on /DataTypes/"+chunkTypeString)
WebSocketConnection, err := upgrader.Upgrade(c.Writer, c.Request, nil)
defer WebSocketConnection.Close()

currentTime := time.Now()
lastTime := currentTime
if err != nil {
loggingChannel <- CreateLogMessage(zerolog.ErrorLevel, "Error upgrading to WebSocket:"+ err.Error())
return
}

// Then start up
var dataString, _ = s.GetChannelData(chunkTypeString)
// Spin up Routines to manage this websocket upgrade request
// When this socket is closed all management of this queue is
// Stopped leading it to grow to its max capacity and lock up

WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString))
for {

currentTime = time.Now()
timeDiff := currentTime.Sub(lastTime)
var AtomicWebsocketClosed atomic.Bool // Atomic integer used as a flag (0: false, 1: true)
AtomicWebsocketClosed.Store(false)

var dataString, _ = s.GetChannelData(chunkTypeString)
var wg sync.WaitGroup
wg.Add(2)
go s.HandleReceivedSignals(loggingChannel, WebSocketConnection, &wg, &AtomicWebsocketClosed);
go s.HandleSignalTransmissions(loggingChannel ,WebSocketConnection, chunkTypeString, &wg, &AtomicWebsocketClosed )
wg.Wait()

loggingChannel <- CreateLogMessage(zerolog.InfoLevel, chunkTypeString + " Routine shut down")

// Rate limiting
if timeDiff > (time.Millisecond * 1) {
WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(dataString))
lastTime = currentTime
}
}
})
}

func (s *ChunkTypeToChannelMap)HandleReceivedSignals(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, wg *sync.WaitGroup, AtomicWebsocketClosed *atomic.Bool) {

defer wg.Done()

for{
// We now just wait for the close message
_, _, err := WebSocketConnection.ReadMessage()
if err != nil {
loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue reading message from WebSocket:" + err.Error())
AtomicWebsocketClosed.Store(true)
break;
}
}
}

func (s *ChunkTypeToChannelMap)HandleSignalTransmissions(loggingChannel chan map[zerolog.Level]string, WebSocketConnection *websocket.Conn, chunkTypeString string, wg *sync.WaitGroup, AtomicWebsocketClosed *atomic.Bool) {

defer wg.Done()
currentTime := time.Now()

for {
// Unmarshal the JSON string into a map
var bChannelExists = false
var strJSONData string

var chstrJSONData = make(chan string, 1)
var chbCannelExists = make(chan bool, 1)

// Launch a goroutine to try fetch data
go func() {
strJSONDataTemp, bChannelExistsTmp := s.GetChannelData(chunkTypeString)
chbCannelExists <- bChannelExistsTmp
chstrJSONData <- strJSONDataTemp
}()

// While also limiting this with a timeout procedure
var chTimeout = time.After(250 * time.Millisecond)

// And see which finihsed first
select {
case bChannelExists = <-chbCannelExists:
strJSONData = <- chstrJSONData
case <-chTimeout:
// And continure if a timeout occurred
continue
}

// When we have data check the client has not closed out beautiful, stunning and special connection
if AtomicWebsocketClosed.Load() {
loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "RX Websocket closed, exiting write routine")
break
}

// If the connection is open check we successfully got access to the channel and transmit data
if bChannelExists {
err := WebSocketConnection.WriteMessage(websocket.TextMessage, []byte(strJSONData))
if err != nil {
loggingChannel <- CreateLogMessage(zerolog.WarnLevel, "Issue writing message to WebSocket:"+ err.Error())
AtomicWebsocketClosed.Store(true)
break
}
}

// Then check if we should report the length of this chunks data channel
if time.Since(currentTime) > 1000*time.Millisecond {

var len, cap, _ = s.GetChannelLengthAndCapacity(chunkTypeString)
currentTime= time.Now()

// Create the reporting message
QueueLogMessage := SystemInfo{SystemStat:SystemStatistic{
StatEnvironment: "TCP_WS_Adapter",
StatName: chunkTypeString + "_Channel",
StatStaus: strconv.Itoa(len) + "/" + strconv.Itoa(cap),
}}

// And send it to the dedicated reporting routine
data, _ := json.Marshal(QueueLogMessage)
s.reportingOutputChannel <- string(data)
}
}
}
14 changes: 12 additions & 2 deletions Routines/LoggingRoutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@ import (
"github.com/rs/zerolog"
)

func HandleLogging(configJson map[string]interface{}, routineCompleteChannel chan bool, dataChannel chan map[zerolog.Level]string) {
type SystemStatistic struct {
StatEnvironment string `json:"StatEnvironment"`
StatName string `json:"StatName"`
StatStaus string `json:"StatStaus"`
}

type SystemInfo struct {
SystemStat SystemStatistic `json:"SystemInfo"`
}

func HandleLogging(configJson map[string]interface{}, routineCompleteChannel chan bool, incomingDataChannel chan map[zerolog.Level]string) {

// And finally create a logger
var LogLevel = zerolog.DebugLevel
Expand Down Expand Up @@ -79,7 +89,7 @@ func HandleLogging(configJson map[string]interface{}, routineCompleteChannel cha
logger.Info().Msg("Starting logging routine")

for {
levelMessageMap := <-dataChannel
levelMessageMap := <-incomingDataChannel

for logLevelKey, LogMessageString := range levelMessageMap {
if logLevelKey == zerolog.DebugLevel {
Expand Down
Loading

0 comments on commit d5d06ba

Please sign in to comment.