From 79f56771e3ee5086ec85a90dd13e39679e3ab471 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 16:34:22 +0100 Subject: [PATCH 1/5] align with pts2 --- machinery/src/capture/gortsplib.go | 16 ++++++++++++++-- machinery/src/capture/main.go | 10 ++++++---- machinery/src/webrtc/main.go | 18 ++++++++++-------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/machinery/src/capture/gortsplib.go b/machinery/src/capture/gortsplib.go index ad7be5d..d44d309 100644 --- a/machinery/src/capture/gortsplib.go +++ b/machinery/src/capture/gortsplib.go @@ -480,6 +480,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets // called when a video RTP packet arrives for H264 var filteredAU [][]byte if g.VideoH264Media != nil && g.VideoH264Forma != nil { + + dtsExtractor := h264.NewDTSExtractor2() + g.Client.OnPacketRTP(g.VideoH264Media, g.VideoH264Forma, func(rtppkt *rtp.Packet) { // This will check if we need to stop the thread, @@ -493,7 +496,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if len(rtppkt.Payload) > 0 { // decode timestamp + pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt) pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt) + fmt.Println(convertPTS(pts), convertPTS2(pts2)) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -571,12 +576,19 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets return } + // Extract DTS from RTP packets + dts2, err := dtsExtractor.Extract(originalAU, pts2) + if err != nil { + log.Log.Error("capture.golibrtsp.Start(): " + err.Error()) + return + } + pkt := packets.Packet{ IsKeyFrame: idrPresent, Packet: rtppkt, Data: enc, - Time: pts, - CompositionTime: pts, + Time: pts2, + CompositionTime: dts2, Idx: g.VideoH264Index, IsVideo: true, IsAudio: false, diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index dfd3914..4196a39 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -242,9 +242,10 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } - ttime := convertPTS(pkt.Time) + ttime := convertPTS2(pkt.Time) + dts := convertPTS2(pkt.CompositionTime) if pkt.IsVideo { - if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { + if err := myMuxer.Write(videoTrack, pkt.Data, ttime, dts); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } } else if pkt.IsAudio { @@ -261,9 +262,10 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat recordingStatus = "started" } else if start { - ttime := convertPTS(pkt.Time) + ttime := convertPTS2(pkt.Time) + dts := convertPTS2(pkt.CompositionTime) if pkt.IsVideo { - if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { + if err := myMuxer.Write(videoTrack, pkt.Data, ttime, dts); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } } else if pkt.IsAudio { diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index bc12c41..cec7425 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -341,8 +341,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C var cursorError error var pkt packets.Packet - var previousTimeVideo time.Duration - var previousTimeAudio time.Duration + //var previousTimeVideo int64 + //var previousTimeAudio int64 start := false receivedKeyFrame := false @@ -401,15 +401,16 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C if pkt.IsVideo { // Calculate the difference - bufferDuration := pkt.Time - previousTimeVideo - previousTimeVideo = pkt.Time + //bufferDuration := pkt.Time - previousTimeVideo + //previousTimeVideo = pkt.Time // Start at the first keyframe if pkt.IsKeyFrame { start = true } if start { - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration} + //bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond + sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)} if config.Capture.ForwardWebRTC == "true" { // We will send the video to a remote peer // TODO.. @@ -432,11 +433,12 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C } // Calculate the difference - bufferDuration := pkt.Time - previousTimeAudio - previousTimeAudio = pkt.Time + //bufferDuration := pkt.Time - previousTimeAudio + //previousTimeAudio = pkt.Time // We will send the audio - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration} + //bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond + sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)} if err := audioTrack.WriteSample(sample); err != nil && err != io.ErrClosedPipe { log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error()) } From 27e7d98c68f4e3071a71e0de002c5c9cfd909116 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 16:40:24 +0100 Subject: [PATCH 2/5] align with PTS2 --- machinery/src/capture/gortsplib.go | 8 ++++---- machinery/src/capture/main.go | 10 +++++++--- machinery/src/packets/packet.go | 18 ++++++++---------- machinery/src/packets/queue.go | 3 +-- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/machinery/src/capture/gortsplib.go b/machinery/src/capture/gortsplib.go index d44d309..fc28dcb 100644 --- a/machinery/src/capture/gortsplib.go +++ b/machinery/src/capture/gortsplib.go @@ -410,7 +410,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if g.AudioG711Media != nil && g.AudioG711Forma != nil { g.Client.OnPacketRTP(g.AudioG711Media, g.AudioG711Forma, func(rtppkt *rtp.Packet) { // decode timestamp - pts, ok := g.Client.PacketPTS(g.AudioG711Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.AudioG711Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -442,7 +442,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if g.AudioMPEG4Media != nil && g.AudioMPEG4Forma != nil { g.Client.OnPacketRTP(g.AudioMPEG4Media, g.AudioMPEG4Forma, func(rtppkt *rtp.Packet) { // decode timestamp - pts, ok := g.Client.PacketPTS(g.AudioMPEG4Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.AudioMPEG4Media, rtppkt) if !ok { log.Log.Error("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -577,7 +577,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets } // Extract DTS from RTP packets - dts2, err := dtsExtractor.Extract(originalAU, pts2) + dts2, err := dtsExtractor.Extract(filteredAU, pts2) if err != nil { log.Log.Error("capture.golibrtsp.Start(): " + err.Error()) return @@ -650,7 +650,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if len(rtppkt.Payload) > 0 { // decode timestamp - pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index 4196a39..cce8bc5 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -120,7 +120,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat nextPkt.IsKeyFrame && (timestamp+recordingPeriod-now <= 0 || now-startRecording >= maxRecordingPeriod) { // Write the last packet - ttime := convertPTS(pkt.Time) + ttime := convertPTS2(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -339,7 +339,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat log.Log.Info("capture.main.HandleRecordStream(motiondetection): Start motion based recording ") - var lastDuration time.Duration + var lastDuration int64 var lastRecordingTime int64 //var cws *cacheWriterSeeker @@ -447,7 +447,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat } if start { - ttime := convertPTS(pkt.Time) + ttime := convertPTS2(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error()) @@ -697,3 +697,7 @@ func JpegImage(captureDevice *Capture, communication *models.Communication) imag func convertPTS(v time.Duration) uint64 { return uint64(v.Milliseconds()) } + +func convertPTS2(v int64) uint64 { + return uint64(v) / 100 +} diff --git a/machinery/src/packets/packet.go b/machinery/src/packets/packet.go index d385bbc..68b673c 100644 --- a/machinery/src/packets/packet.go +++ b/machinery/src/packets/packet.go @@ -1,20 +1,18 @@ package packets import ( - "time" - "github.com/pion/rtp" ) // Packet represents an RTP Packet type Packet struct { Packet *rtp.Packet - IsAudio bool // packet is audio - IsVideo bool // packet is video - IsKeyFrame bool // video packet is key frame - Idx int8 // stream index in container format - Codec string // codec name - CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame - Time time.Duration // packet decode time - Data []byte // packet data + IsAudio bool // packet is audio + IsVideo bool // packet is video + IsKeyFrame bool // video packet is key frame + Idx int8 // stream index in container format + Codec string // codec name + CompositionTime int64 // packet presentation time minus decode time for H264 B-Frame + Time int64 // packet decode time + Data []byte // packet data } diff --git a/machinery/src/packets/queue.go b/machinery/src/packets/queue.go index a0de723..a250295 100644 --- a/machinery/src/packets/queue.go +++ b/machinery/src/packets/queue.go @@ -4,7 +4,6 @@ package packets import ( "io" "sync" - "time" ) // time @@ -145,7 +144,7 @@ func (self *Queue) Oldest() *QueueCursor { } // Create cursor position at specific time in buffered packets. -func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { +func (self *Queue) DelayedTime(dur int64) *QueueCursor { cursor := self.newCursor() cursor.init = func(buf *Buf, videoidx int) BufPos { i := buf.Tail - 1 From 185135ed94e2b0f8238931b1184b36af7d1a8775 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 16:55:58 +0100 Subject: [PATCH 3/5] add legacy timing for MP4 --- machinery/src/capture/gortsplib.go | 28 +++++++++++++++++----------- machinery/src/capture/main.go | 14 ++++++-------- machinery/src/packets/packet.go | 3 +++ 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/machinery/src/capture/gortsplib.go b/machinery/src/capture/gortsplib.go index fc28dcb..c5c851f 100644 --- a/machinery/src/capture/gortsplib.go +++ b/machinery/src/capture/gortsplib.go @@ -409,8 +409,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets // called when a MULAW audio RTP packet arrives if g.AudioG711Media != nil && g.AudioG711Forma != nil { g.Client.OnPacketRTP(g.AudioG711Media, g.AudioG711Forma, func(rtppkt *rtp.Packet) { + pts, ok := g.Client.PacketPTS(g.AudioG711Media, rtppkt) // decode timestamp - pts, ok := g.Client.PacketPTS2(g.AudioG711Media, rtppkt) + pts2, ok := g.Client.PacketPTS2(g.AudioG711Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -427,8 +428,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets IsKeyFrame: false, Packet: rtppkt, Data: op, - Time: pts, - CompositionTime: pts, + Time: pts2, + TimeLegacy: pts, + CompositionTime: pts2, Idx: g.AudioG711Index, IsVideo: false, IsAudio: true, @@ -442,7 +444,8 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if g.AudioMPEG4Media != nil && g.AudioMPEG4Forma != nil { g.Client.OnPacketRTP(g.AudioMPEG4Media, g.AudioMPEG4Forma, func(rtppkt *rtp.Packet) { // decode timestamp - pts, ok := g.Client.PacketPTS2(g.AudioMPEG4Media, rtppkt) + pts, ok := g.Client.PacketPTS(g.AudioMPEG4Media, rtppkt) + pts2, ok := g.Client.PacketPTS2(g.AudioMPEG4Media, rtppkt) if !ok { log.Log.Error("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -466,8 +469,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets IsKeyFrame: false, Packet: rtppkt, Data: enc, - Time: pts, - CompositionTime: pts, + Time: pts2, + TimeLegacy: pts, + CompositionTime: pts2, Idx: g.AudioG711Index, IsVideo: false, IsAudio: true, @@ -496,9 +500,8 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if len(rtppkt.Payload) > 0 { // decode timestamp - pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt) pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt) - fmt.Println(convertPTS(pts), convertPTS2(pts2)) + pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -588,6 +591,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets Packet: rtppkt, Data: enc, Time: pts2, + TimeLegacy: pts, CompositionTime: dts2, Idx: g.VideoH264Index, IsVideo: true, @@ -650,7 +654,8 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if len(rtppkt.Payload) > 0 { // decode timestamp - pts, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt) + pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt) + pts2, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -714,8 +719,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets IsKeyFrame: isRandomAccess, Packet: rtppkt, Data: enc, - Time: pts, - CompositionTime: pts, + Time: pts2, + TimeLegacy: pts, + CompositionTime: pts2, Idx: g.VideoH265Index, IsVideo: true, IsAudio: false, diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index cce8bc5..ddf840a 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -120,7 +120,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat nextPkt.IsKeyFrame && (timestamp+recordingPeriod-now <= 0 || now-startRecording >= maxRecordingPeriod) { // Write the last packet - ttime := convertPTS2(pkt.Time) + ttime := convertPTS(pkt.TimeLegacy) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -242,10 +242,10 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } - ttime := convertPTS2(pkt.Time) - dts := convertPTS2(pkt.CompositionTime) + ttime := convertPTS(pkt.TimeLegacy) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, dts); err != nil { + if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } } else if pkt.IsAudio { @@ -262,10 +262,9 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat recordingStatus = "started" } else if start { - ttime := convertPTS2(pkt.Time) - dts := convertPTS2(pkt.CompositionTime) + ttime := convertPTS(pkt.TimeLegacy) if pkt.IsVideo { - if err := myMuxer.Write(videoTrack, pkt.Data, ttime, dts); err != nil { + if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } } else if pkt.IsAudio { @@ -446,8 +445,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat start = true } if start { - - ttime := convertPTS2(pkt.Time) + ttime := convertPTS(pkt.TimeLegacy) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error()) diff --git a/machinery/src/packets/packet.go b/machinery/src/packets/packet.go index 68b673c..3f170ac 100644 --- a/machinery/src/packets/packet.go +++ b/machinery/src/packets/packet.go @@ -1,6 +1,8 @@ package packets import ( + "time" + "github.com/pion/rtp" ) @@ -14,5 +16,6 @@ type Packet struct { Codec string // codec name CompositionTime int64 // packet presentation time minus decode time for H264 B-Frame Time int64 // packet decode time + TimeLegacy time.Duration Data []byte // packet data } From 9f2ec9168885afba2df28e15d9f372702ca7a6d9 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 16:56:58 +0100 Subject: [PATCH 4/5] Update main.go --- machinery/src/capture/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index ddf840a..ddd4aac 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -244,7 +244,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat ttime := convertPTS(pkt.TimeLegacy) if pkt.IsVideo { - if err := myMuxer.Write(videoTrack, pkt.Data, ttime, dts); err != nil { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } From 10047319035243de961c3e1ebfc811d6f3b0758f Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 17:18:06 +0100 Subject: [PATCH 5/5] increase gop size --- machinery/src/components/Kerberos.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index f6eec60..d363f82 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -236,7 +236,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu if subStreamEnabled && rtspSubClient != nil { subQueue = packets.NewQueue() communication.SubQueue = subQueue - subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room). + subQueue.SetMaxGopCount(3) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room). subQueue.WriteHeader(videoSubStreams) go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication)