From b4369ea9322128b8113bc609c3f1b5baa34545db Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 11 Jan 2024 22:35:56 +0100 Subject: [PATCH] improve non-blocking approve for agents tend to restart for some strange reason --- machinery/src/capture/Gortsplib.go | 3 +-- machinery/src/components/Kerberos.go | 30 +++++++++++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/machinery/src/capture/Gortsplib.go b/machinery/src/capture/Gortsplib.go index f7a6a2d..ad9ed07 100644 --- a/machinery/src/capture/Gortsplib.go +++ b/machinery/src/capture/Gortsplib.go @@ -25,7 +25,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format/rtph265" "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audio" - "github.com/bluenviron/gortsplib/v4/pkg/format/rtpsimpleaudio" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" @@ -63,7 +62,7 @@ type Golibrtsp struct { AudioG711Index int8 AudioG711Media *description.Media AudioG711Forma *format.G711 - AudioG711Decoder *rtpsimpleaudio.Decoder + AudioG711Decoder *rtplpcm.Decoder HasBackChannel bool AudioG711IndexBackChannel int8 diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index 6c57ab6..dd3a37f 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -314,9 +314,20 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu // Here we are cleaning up everything! if configuration.Config.Offline != "true" { - communication.HandleUpload <- "stop" + select { + case communication.HandleUpload <- "stop": + log.Log.Info("components.Kerberos.RunAgent(): stopping upload") + case <-time.After(1 * time.Second): + log.Log.Info("components.Kerberos.RunAgent(): stopping upload timed out") + } + } + + select { + case communication.HandleStream <- "stop": + log.Log.Info("components.Kerberos.RunAgent(): stopping stream") + case <-time.After(1 * time.Second): + log.Log.Info("components.Kerberos.RunAgent(): stopping stream timed out") } - communication.HandleStream <- "stop" // We use the steam channel to stop both main and sub stream. //if subStreamEnabled { // communication.HandleSubStream <- "stop" @@ -408,7 +419,12 @@ func ControlAgent(communication *models.Communication) { // After 15 seconds without activity this is thrown.. if occurence == 3 { log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking mainstream.") - communication.HandleBootstrap <- "restart" + select { + case communication.HandleBootstrap <- "restart": + log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking substream.") + case <-time.After(1 * time.Second): + log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking substream timed out") + } time.Sleep(2 * time.Second) occurence = 0 } @@ -430,8 +446,12 @@ func ControlAgent(communication *models.Communication) { // After 15 seconds without activity this is thrown.. if occurenceSub == 3 { - log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking substream.") - communication.HandleBootstrap <- "restart" + select { + case communication.HandleBootstrap <- "restart": + log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking substream.") + case <-time.After(1 * time.Second): + log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking substream timed out") + } time.Sleep(2 * time.Second) occurenceSub = 0 }