Skip to content

Commit

Permalink
hot fix: keep track of main and sub stream separately (one of them mi…
Browse files Browse the repository at this point in the history
…ght block)
  • Loading branch information
cedricve committed Jan 7, 2024
1 parent 93adb3d commit 2681bd2
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 16 deletions.
32 changes: 23 additions & 9 deletions machinery/src/capture/Gortsplib.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (g *Golibrtsp) ConnectBackChannel(ctx context.Context) (err error) {
}

// Start the RTSP client, and start reading packets.
func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
log.Log.Debug("capture.golibrtsp.Start(): started")

// called when a MULAW audio RTP packet arrives
Expand Down Expand Up @@ -527,10 +527,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati
if idrPresent {
// Increment packets, so we know the device
// is not blocking.
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
if streamType == "main" {
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
} else if streamType == "sub" {
r := communication.PackageCounterSub.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounterSub.Store((r + 1) % 1000)
communication.LastPacketTimerSub.Store(time.Now().Unix())
}
}
}

Expand Down Expand Up @@ -637,10 +644,17 @@ func (g *Golibrtsp) Start(ctx context.Context, queue *packets.Queue, configurati
if isRandomAccess {
// Increment packets, so we know the device
// is not blocking.
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
if streamType == "main" {
r := communication.PackageCounter.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounter.Store((r + 1) % 1000)
communication.LastPacketTimer.Store(time.Now().Unix())
} else if streamType == "sub" {
r := communication.PackageCounterSub.Load().(int64)
log.Log.Debug("capture.golibrtsp.Start(): packet size " + strconv.Itoa(len(pkt.Data)))
communication.PackageCounterSub.Store((r + 1) % 1000)
communication.LastPacketTimerSub.Store(time.Now().Unix())
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion machinery/src/capture/RTSPClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type RTSPClient interface {
ConnectBackChannel(ctx context.Context) error

// Start the RTSP client, and start reading packets.
Start(ctx context.Context, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error
Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) error

// Start the RTSP client, and start reading packets.
StartBackChannel(ctx context.Context) (err error)
Expand Down
2 changes: 1 addition & 1 deletion machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ loop:
// Try again
pullPointAddress, err = onvif.CreatePullPointSubscription(device)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
log.Log.Debug("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
}

Expand Down
55 changes: 51 additions & 4 deletions machinery/src/components/Kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,20 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm
packageCounter.Store(int64(0))
communication.PackageCounter = &packageCounter

var packageCounterSub atomic.Value
packageCounterSub.Store(int64(0))
communication.PackageCounterSub = &packageCounterSub

// This is used when the last packet was received (timestamp),
// this metric is used to determine if the camera is still online/connected.
var lastPacketTimer atomic.Value
packageCounter.Store(int64(0))
communication.LastPacketTimer = &lastPacketTimer

var lastPacketTimerSub atomic.Value
packageCounterSub.Store(int64(0))
communication.LastPacketTimerSub = &lastPacketTimerSub

// This is used to understand if we have a working Kerberos Hub connection
// cloudTimestamp will be updated when successfully sending heartbeats.
var cloudTimestamp atomic.Value
Expand Down Expand Up @@ -245,7 +253,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
log.Log.Info("components.Kerberos.RunAgent(): SetMaxGopCount was set with: " + strconv.Itoa(int(config.Capture.PreRecording)+1))
queue.SetMaxGopCount(int(config.Capture.PreRecording) + 1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
queue.WriteHeader(videoStreams)
go rtspClient.Start(context.Background(), queue, configuration, communication)
go rtspClient.Start(context.Background(), "main", queue, configuration, communication)

// Main stream is connected and ready to go.
communication.MainStreamConnected = true

// Try to create backchannel
rtspBackChannelClient := captureDevice.SetBackChannelClient(rtspUrl)
Expand All @@ -261,7 +272,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
communication.SubQueue = subQueue
subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
subQueue.WriteHeader(videoSubStreams)
go rtspSubClient.Start(context.Background(), subQueue, configuration, communication)
go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication)

// Sub stream is connected and ready to go.
communication.SubStreamConnected = true
}

// Handle livestream SD (low resolution over MQTT)
Expand Down Expand Up @@ -320,6 +334,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu

// If we reach this point, we are stopping the stream.
communication.CameraConnected = false
communication.MainStreamConnected = false
communication.SubStreamConnected = false

// Cancel the main context, this will stop all the other goroutines.
(*communication.CancelContext)()
Expand Down Expand Up @@ -397,14 +413,19 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
func ControlAgent(communication *models.Communication) {
log.Log.Debug("components.Kerberos.ControlAgent(): started")
packageCounter := communication.PackageCounter
packageSubCounter := communication.PackageCounterSub
go func() {
// A channel to check the camera activity
var previousPacket int64 = 0
var previousPacketSub int64 = 0
var occurence = 0
var occurenceSub = 0
for {

// If camera is connected, we'll check if we are still receiving packets.
if communication.CameraConnected {

// First we'll check the main stream.
packetsR := packageCounter.Load().(int64)
if packetsR == previousPacket {
// If we are already reconfiguring,
Expand All @@ -416,16 +437,42 @@ func ControlAgent(communication *models.Communication) {
occurence = 0
}

log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read " + strconv.FormatInt(packetsR, 10))
log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from main stream: " + strconv.FormatInt(packetsR, 10))

// After 15 seconds without activity this is thrown..
if occurence == 3 {
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery.")
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking main stream.")
communication.HandleBootstrap <- "restart"
time.Sleep(2 * time.Second)
occurence = 0
}

// Now we'll check the sub stream.
packetsSubR := packageSubCounter.Load().(int64)
if communication.SubStreamConnected {
if packetsSubR == previousPacketSub {
// If we are already reconfiguring,
// we dont need to check if the stream is blocking.
if !communication.IsConfiguring.IsSet() {
occurenceSub = occurenceSub + 1
}
} else {
occurenceSub = 0
}

log.Log.Info("components.Kerberos.ControlAgent(): Number of packets read from sub stream: " + strconv.FormatInt(packetsSubR, 10))

// After 15 seconds without activity this is thrown..
if occurenceSub == 3 {
log.Log.Info("components.Kerberos.ControlAgent(): Restarting machinery because of blocking sub stream.")
communication.HandleBootstrap <- "restart"
time.Sleep(2 * time.Second)
occurenceSub = 0
}
}

previousPacket = packageCounter.Load().(int64)
previousPacketSub = packageSubCounter.Load().(int64)
}

time.Sleep(5 * time.Second)
Expand Down
4 changes: 4 additions & 0 deletions machinery/src/models/Communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Communication struct {
CancelContext *context.CancelFunc
PackageCounter *atomic.Value
LastPacketTimer *atomic.Value
PackageCounterSub *atomic.Value
LastPacketTimerSub *atomic.Value
CloudTimestamp *atomic.Value
HandleBootstrap chan string
HandleStream chan string
Expand All @@ -33,5 +35,7 @@ type Communication struct {
SubQueue *packets.Queue
Image string
CameraConnected bool
MainStreamConnected bool
SubStreamConnected bool
HasBackChannel bool
}
2 changes: 1 addition & 1 deletion machinery/src/onvif/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) {
stringBody := string(b2)
decodedXML, et, err := getXMLNode(stringBody, "CreatePullPointSubscriptionResponse")
if err != nil {
log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())
log.Log.Debug("onvif.main.CreatePullPointSubscription(): " + err.Error())
} else {
if err := decodedXML.DecodeElement(&createPullPointSubscriptionResponse, et); err != nil {
log.Log.Error("onvif.main.CreatePullPointSubscription(): " + err.Error())
Expand Down

0 comments on commit 2681bd2

Please sign in to comment.