Skip to content

Commit

Permalink
Add Retransmission and FEC to TrackLocal
Browse files Browse the repository at this point in the history
If the MediaEngine contains support for them a SSRC will be generated
appropriately

Co-authored-by: aggresss <[email protected]>
Co-authored-by: Kevin Wang <[email protected]>
  • Loading branch information
3 people committed Oct 3, 2024
1 parent bd2309f commit abd134e
Show file tree
Hide file tree
Showing 14 changed files with 413 additions and 89 deletions.
6 changes: 3 additions & 3 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,18 @@ func Test_Interceptor_BindUnbind(t *testing.T) {
if cnt := atomic.LoadUint32(&cntUnbindLocalStream); cnt != 1 {
t.Errorf("UnbindLocalStreamFn is expected to be called once, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 1 {
if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 2 {
t.Errorf("BindRemoteStreamFn is expected to be called once, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 1 {
if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 2 {
t.Errorf("UnbindRemoteStreamFn is expected to be called once, but called %d times", cnt)
}

// BindRTCPWriter/Reader and Close should be called from both side.
if cnt := atomic.LoadUint32(&cntBindRTCPWriter); cnt != 2 {
t.Errorf("BindRTCPWriterFn is expected to be called twice, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 2 {
if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 3 {
t.Errorf("BindRTCPReaderFn is expected to be called twice, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntClose); cnt != 2 {
Expand Down
48 changes: 37 additions & 11 deletions mediaengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ const (
// MimeTypePCMA PCMA MIME type
// Note: Matching should be case insensitive.
MimeTypePCMA = "audio/PCMA"
// MimeTypeRTX RTX MIME type
// Note: Matching should be case insensitive.
MimeTypeRTX = "video/rtx"
// MimeTypeFlexFEC FEC MIME Type
// Note: Matching should be case insensitive.
MimeTypeFlexFEC = "video/flexfec"
)

type mediaEngineHeaderExtension struct {
Expand Down Expand Up @@ -106,7 +112,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 96,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil},
PayloadType: 97,
},

Expand All @@ -115,7 +121,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 102,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=102", nil},
PayloadType: 103,
},

Expand All @@ -124,7 +130,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 104,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=104", nil},
PayloadType: 105,
},

Expand All @@ -133,7 +139,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 106,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=106", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=106", nil},
PayloadType: 107,
},

Expand All @@ -142,7 +148,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 108,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=108", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=108", nil},
PayloadType: 109,
},

Expand All @@ -151,7 +157,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 127,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=127", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=127", nil},
PayloadType: 125,
},

Expand All @@ -160,7 +166,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 39,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=39", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=39", nil},
PayloadType: 40,
},

Expand All @@ -169,7 +175,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 45,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=45", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=45", nil},
PayloadType: 46,
},

Expand All @@ -178,7 +184,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 98,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=98", nil},
PayloadType: 99,
},

Expand All @@ -187,7 +193,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 100,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=100", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=100", nil},
PayloadType: 101,
},

Expand All @@ -196,7 +202,7 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
PayloadType: 112,
},
{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=112", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=112", nil},
PayloadType: 113,
},
} {
Expand Down Expand Up @@ -702,3 +708,23 @@ func payloaderForCodec(codec RTPCodecCapability) (rtp.Payloader, error) {
return nil, ErrNoPayloaderForCodec
}
}

func (m *MediaEngine) isRTXEnabled(typ RTPCodecType, directions []RTPTransceiverDirection) bool {
for _, p := range m.getRTPParametersByKind(typ, directions).Codecs {
if p.MimeType == MimeTypeRTX {
return true
}
}

return false
}

func (m *MediaEngine) isFECEnabled(typ RTPCodecType, directions []RTPTransceiverDirection) bool {
for _, p := range m.getRTPParametersByKind(typ, directions).Codecs {
if strings.Contains(p.MimeType, MimeTypeFlexFEC) {
return true
}
}

return false
}
16 changes: 8 additions & 8 deletions mediaengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,31 +364,31 @@ a=fmtp:97 apt=96
PayloadType: 96,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil},
PayloadType: 97,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", nil},
PayloadType: 102,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=102", nil},
PayloadType: 103,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", nil},
PayloadType: 104,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=104", nil},
PayloadType: 105,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=2", nil},
PayloadType: 98,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=98", nil},
PayloadType: 99,
}, RTPCodecTypeVideo))
assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels)))
Expand All @@ -400,15 +400,15 @@ a=fmtp:97 apt=96
assert.Equal(t, vp9Codec.MimeType, MimeTypeVP9)
vp9RTX, _, err := m.getCodecByPayload(97)
assert.NoError(t, err)
assert.Equal(t, vp9RTX.MimeType, "video/rtx")
assert.Equal(t, vp9RTX.MimeType, MimeTypeRTX)

h264P1Codec, _, err := m.getCodecByPayload(106)
assert.NoError(t, err)
assert.Equal(t, h264P1Codec.MimeType, MimeTypeH264)
assert.Equal(t, h264P1Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f")
h264P1RTX, _, err := m.getCodecByPayload(107)
assert.NoError(t, err)
assert.Equal(t, h264P1RTX.MimeType, "video/rtx")
assert.Equal(t, h264P1RTX.MimeType, MimeTypeRTX)
assert.Equal(t, h264P1RTX.SDPFmtpLine, "apt=106")

h264P0Codec, _, err := m.getCodecByPayload(108)
Expand All @@ -417,7 +417,7 @@ a=fmtp:97 apt=96
assert.Equal(t, h264P0Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f")
h264P0RTX, _, err := m.getCodecByPayload(109)
assert.NoError(t, err)
assert.Equal(t, h264P0RTX.MimeType, "video/rtx")
assert.Equal(t, h264P0RTX.MimeType, MimeTypeRTX)
assert.Equal(t, h264P0RTX.SDPFmtpLine, "apt=108")
})

Expand All @@ -443,7 +443,7 @@ a=fmtp:97 apt=96
PayloadType: 96,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
RTPCodecCapability: RTPCodecCapability{MimeTypeRTX, 90000, 0, "apt=96", nil},
PayloadType: 97,
}, RTPCodecTypeVideo))
assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels)))
Expand Down
13 changes: 13 additions & 0 deletions rtpcodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package webrtc

import (
"fmt"
"strings"

"github.com/pion/webrtc/v4/internal/fmtp"
Expand Down Expand Up @@ -123,3 +124,15 @@ func codecParametersFuzzySearch(needle RTPCodecParameters, haystack []RTPCodecPa

return RTPCodecParameters{}, codecMatchNone
}

// Given a CodecParameters find the RTX CodecParameters if one exists
func findRTXCodecParameters(needle PayloadType, haystack []RTPCodecParameters) (RTPCodecParameters, bool) {
aptStr := fmt.Sprintf("apt=%d", needle)
for _, c := range haystack {
if aptStr == c.SDPFmtpLine {
return c, true

Check warning on line 133 in rtpcodec.go

View check run for this annotation

Codecov / codecov/patch

rtpcodec.go#L129-L133

Added lines #L129 - L133 were not covered by tests
}
}

return RTPCodecParameters{}, false

Check warning on line 137 in rtpcodec.go

View check run for this annotation

Codecov / codecov/patch

rtpcodec.go#L137

Added line #L137 was not covered by tests
}
7 changes: 7 additions & 0 deletions rtpcodingparameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ type RTPRtxParameters struct {
SSRC SSRC `json:"ssrc"`
}

// RTPFecParameters dictionary contains information relating to forward error correction (FEC) settings.
// https://draft.ortc.org/#dom-rtcrtpfecparameters
type RTPFecParameters struct {
SSRC SSRC `json:"ssrc"`
}

// RTPCodingParameters provides information relating to both encoding and decoding.
// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
// http://draft.ortc.org/#dom-rtcrtpcodingparameters
Expand All @@ -17,4 +23,5 @@ type RTPCodingParameters struct {
SSRC SSRC `json:"ssrc"`
PayloadType PayloadType `json:"payloadType"`
RTX RTPRtxParameters `json:"rtx"`
FEC RTPFecParameters `json:"fec"`
}
52 changes: 9 additions & 43 deletions rtpreceiver_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@
package webrtc

import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"strconv"
"strings"
"testing"
"time"

"github.com/pion/randutil"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/transport/v3/test"
Expand Down Expand Up @@ -86,19 +81,18 @@ func TestSetRTPParameters(t *testing.T) {
func Test_RTX_Read(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()

var ssrc *uint32
ssrcLines := ""
rtxSsrc := randutil.NewMathRandomGenerator().Uint32()

pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)

track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)

_, err = pcOffer.AddTrack(track)
rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)

rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC

rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
Expand All @@ -111,7 +105,7 @@ func Test_RTX_Read(t *testing.T) {

assert.NoError(t, readRTPErr)
assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, *ssrc)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))
assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD})

Expand All @@ -120,50 +114,22 @@ func Test_RTX_Read(t *testing.T) {
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, rtxSsrc)
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500)

rtxReadCancel()
}
}
})

assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(offer string) (modified string) {
scanner := bufio.NewScanner(strings.NewReader(offer))
for scanner.Scan() {
l := scanner.Text()

if strings.HasPrefix(l, "a=ssrc") {
if ssrc == nil {
lineSplit := strings.Split(l, " ")[0]
parsed, atoiErr := strconv.ParseUint(strings.TrimPrefix(lineSplit, "a=ssrc:"), 10, 32)
assert.NoError(t, atoiErr)

parsedSsrc := uint32(parsed)
ssrc = &parsedSsrc

modified += fmt.Sprintf("a=ssrc-group:FID %d %d\r\n", *ssrc, rtxSsrc)
}

ssrcLines += l + "\n"
} else if ssrcLines != "" {
ssrcLines = strings.ReplaceAll(ssrcLines, fmt.Sprintf("%d", *ssrc), fmt.Sprintf("%d", rtxSsrc))
modified += ssrcLines
ssrcLines = ""
}

modified += l + "\n"
}

return modified
}))
assert.NoError(t, signalPair(pcOffer, pcAnswer))

func() {
for i := uint16(0); ; i++ {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: *ssrc,
SSRC: uint32(ssrc),
PayloadType: 96,
SequenceNumber: i,
},
Expand All @@ -182,7 +148,7 @@ func Test_RTX_Read(t *testing.T) {
// Send the RTX
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: rtxSsrc,
SSRC: uint32(rtxSsrc),
PayloadType: 97,
SequenceNumber: i + 500,
}, rtxPayload)
Expand Down
Loading

0 comments on commit abd134e

Please sign in to comment.