From 2681bd2fe32c5beb9edf94f1268f3e69a94a6c49 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Sun, 7 Jan 2024 20:20:51 +0100 Subject: [PATCH] hot fix: keep track of main and sub stream separately (one of them might block) --- machinery/src/capture/Gortsplib.go | 32 +++++++++++----- machinery/src/capture/RTSPClient.go | 2 +- machinery/src/cloud/Cloud.go | 2 +- machinery/src/components/Kerberos.go | 55 +++++++++++++++++++++++++-- machinery/src/models/Communication.go | 4 ++ machinery/src/onvif/main.go | 2 +- 6 files changed, 81 insertions(+), 16 deletions(-) diff --git a/machinery/src/capture/Gortsplib.go b/machinery/src/capture/Gortsplib.go index 55a1cfa..a9e5ce2 100644 --- a/machinery/src/capture/Gortsplib.go +++ b/machinery/src/capture/Gortsplib.go @@ -351,7 +351,7 @@ func (g *Golibrtsp) ConnectBackChannel(ctx context.Context) (err error) { } // Start the RTSP client, and start reading packets. -func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) { +func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) { log.Log.Debug("capture.golibrtsp.Start(): started") // called when a MULAW audio RTP packet arrives @@ -527,10 +527,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati if idrPresent { // Increment packets, so we know the device // is not blocking. - r := communication.PackageCounter.Load().(int64) - log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data))) - communication.PackageCounter.Store((r + 1) % 1000) - communication.LastPacketTimer.Store(time.Now().Unix()) + if streamType == "main" { + r := communication.PackageCounter.Load().(int64) + log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data))) + communication.PackageCounter.Store((r + 1) % 1000) + communication.LastPacketTimer.Store(time.Now().Unix()) + } else if streamType == "sub" { + r := communication.PackageCounterSub.Load().(int64) + log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data))) + communication.PackageCounterSub.Store((r + 1) % 1000) + communication.LastPacketTimerSub.Store(time.Now().Unix()) + } } } @@ -637,10 +644,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati if isRandomAccess { // Increment packets, so we know the device // is not blocking. - r := communication.PackageCounter.Load().(int64) - log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data))) - communication.PackageCounter.Store((r + 1) % 1000) - communication.LastPacketTimer.Store(time.Now().Unix()) + if streamType == "main" { + r := communication.PackageCounter.Load().(int64) + log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data))) + communication.PackageCounter.Store((r + 1) % 1000) + communication.LastPacketTimer.Store(time.Now().Unix()) + } else if streamType == "sub" { + r := communication.PackageCounterSub.Load().(int64) + log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data))) + communication.PackageCounterSub.Store((r + 1) % 1000) + communication.LastPacketTimerSub.Store(time.Now().Unix()) + } } } diff --git a/machinery/src/capture/RTSPClient.go b/machinery/src/capture/RTSPClient.go index 1c94f54..bb6527d 100644 --- a/machinery/src/capture/RTSPClient.go +++ b/machinery/src/capture/RTSPClient.go @@ -44,7 +44,7 @@ type RTSPClient interface { ConnectBackChannel(ctx context.Context) error // Start the RTSP client, and start reading packets. - Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error + Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error // Start the RTSP client, and start reading packets. StartBackChannel(ctx context.Context) (err error) diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index 26748e5..cefed4e 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -324,7 +324,7 @@ loop: // Try again pullPointAddress, err = onvif.CreatePullPointSubscription(device) if err != nil { - log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) + log.Log.Debug("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } } diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index e262517..9c19d9b 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -36,12 +36,20 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm packageCounter.Store(int64(0)) communication.PackageCounter = &packageCounter + var packageCounterSub atomic.Value + packageCounterSub.Store(int64(0)) + communication.PackageCounterSub = &packageCounterSub + // This is used when the last packet was received (timestamp), // this metric is used to determine if the camera is still online/connected. var lastPacketTimer atomic.Value packageCounter.Store(int64(0)) communication.LastPacketTimer = &lastPacketTimer + var lastPacketTimerSub atomic.Value + packageCounterSub.Store(int64(0)) + communication.LastPacketTimerSub = &lastPacketTimerSub + // This is used to understand if we have a working Kerberos Hub connection // cloudTimestamp will be updated when successfully sending heartbeats. var cloudTimestamp atomic.Value @@ -245,7 +253,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu log.Log.Info("components.Kerberos.RunAgent(): SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1)) queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room). queue.WriteHeader(videoStreams) - go rtspClient.Start(context.Background(), queue, configuration, communication) + go rtspClient.Start(context.Background(), "main", queue, configuration, communication) + + // Main stream is connected and ready to go. + communication.MainStreamConnected = true // Try to create backchannel rtspBackChannelClient := captureDevice.SetBackChannelClient(rtspUrl) @@ -261,7 +272,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu communication.SubQueue = subQueue subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room). subQueue.WriteHeader(videoSubStreams) - go rtspSubClient.Start(context.Background(), subQueue, configuration, communication) + go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication) + + // Sub stream is connected and ready to go. + communication.SubStreamConnected = true } // Handle livestream SD (low resolution over MQTT) @@ -320,6 +334,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu // If we reach this point, we are stopping the stream. communication.CameraConnected = false + communication.MainStreamConnected = false + communication.SubStreamConnected = false // Cancel the main context, this will stop all the other goroutines. (*communication.CancelContext)() @@ -397,14 +413,19 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu func ControlAgent(communication *models.Communication) { log.Log.Debug("components.Kerberos.ControlAgent(): started") packageCounter := communication.PackageCounter + packageSubCounter := communication.PackageCounterSub go func() { // A channel to check the camera activity var previousPacket int64 = 0 + var previousPacketSub int64 = 0 var occurence = 0 + var occurenceSub = 0 for { // If camera is connected, we'll check if we are still receiving packets. if communication.CameraConnected { + + // First we'll check the main stream. packetsR := packageCounter.Load().(int64) if packetsR == previousPacket { // If we are already reconfiguring, @@ -416,16 +437,42 @@ func ControlAgent(communication *models.Communication) { occurence = 0 } - log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read " + strconv.FormatInt(packetsR, 10)) + log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from main stream: " + strconv.FormatInt(packetsR, 10)) // After 15 seconds without activity this is thrown.. if occurence == 3 { - log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery.") + log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking main stream.") communication.HandleBootstrap <- "restart" time.Sleep(2 * time.Second) occurence = 0 } + + // Now we'll check the sub stream. + packetsSubR := packageSubCounter.Load().(int64) + if communication.SubStreamConnected { + if packetsSubR == previousPacketSub { + // If we are already reconfiguring, + // we dont need to check if the stream is blocking. + if !communication.IsConfiguring.IsSet() { + occurenceSub = occurenceSub + 1 + } + } else { + occurenceSub = 0 + } + + log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from sub stream: " + strconv.FormatInt(packetsSubR, 10)) + + // After 15 seconds without activity this is thrown.. + if occurenceSub == 3 { + log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking sub stream.") + communication.HandleBootstrap <- "restart" + time.Sleep(2 * time.Second) + occurenceSub = 0 + } + } + previousPacket = packageCounter.Load().(int64) + previousPacketSub = packageSubCounter.Load().(int64) } time.Sleep(5 * time.Second) diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go index e5e46db..262a86f 100644 --- a/machinery/src/models/Communication.go +++ b/machinery/src/models/Communication.go @@ -15,6 +15,8 @@ type Communication struct { CancelContext *context.CancelFunc PackageCounter *atomic.Value LastPacketTimer *atomic.Value + PackageCounterSub *atomic.Value + LastPacketTimerSub *atomic.Value CloudTimestamp *atomic.Value HandleBootstrap chan string HandleStream chan string @@ -33,5 +35,7 @@ type Communication struct { SubQueue *packets.Queue Image string CameraConnected bool + MainStreamConnected bool + SubStreamConnected bool HasBackChannel bool } diff --git a/machinery/src/onvif/main.go b/machinery/src/onvif/main.go index 2e0951e..3175a32 100644 --- a/machinery/src/onvif/main.go +++ b/machinery/src/onvif/main.go @@ -992,7 +992,7 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) { stringBody := string(b2) decodedXML, et, err := getXMLNode(stringBody, "CreatePullPointSubscriptionResponse") if err != nil { - log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error()) + log.Log.Debug("onvif.main.CreatePullPointSubscription(): " + err.Error()) } else { if err := decodedXML.DecodeElement(&createPullPointSubscriptionResponse, et); err != nil { log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())