Skip to content

Commit

Permalink
Add listener support to gather metrics & logging #3
Browse files Browse the repository at this point in the history
  • Loading branch information
kjs001127 authored Apr 24, 2023
2 parents c20b0be + a55f744 commit 41cfeb9
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
31 changes: 28 additions & 3 deletions pkg/jitter/jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import (

type Factory struct {
minLatency, maxLatency, window, defaultTickInterval int64
listener Listener
}

func NewFactory(minLatency, maxLatency, window, defaultTickInterval int64) *Factory {
func NewFactory(minLatency, maxLatency, window, defaultTickInterval int64, listener Listener) *Factory {
return &Factory{
minLatency: minLatency,
maxLatency: maxLatency,
window: window,
defaultTickInterval: defaultTickInterval,
listener: listener,
}
}

func (f *Factory) CreateBuffer() Buffer {
return NewJitter(f.minLatency, f.maxLatency, f.window, f.defaultTickInterval)
return NewJitter(f.minLatency, f.maxLatency, f.window, f.defaultTickInterval, f.listener)
}

type deltaWithSampleCnt struct {
Expand Down Expand Up @@ -50,9 +52,14 @@ type Jitter struct {
window int64 // 2000ms

defaultTickInterval int64

listener Listener
}

func NewJitter(minLatency, maxLatency, window, defaultTickInterval int64) *Jitter {
func NewJitter(minLatency, maxLatency, window, defaultTickInterval int64, listener Listener) *Jitter {
if listener == nil {
listener = &NullListener{}
}
b := &Jitter{
normal: skiplist.New(skiplist.Int64),
list: skiplist.New(skiplist.Int64),
Expand All @@ -64,6 +71,7 @@ func NewJitter(minLatency, maxLatency, window, defaultTickInterval int64) *Jitte
maxLatency: maxLatency,
window: window,
defaultTickInterval: defaultTickInterval,
listener: listener,
}
return b
}
Expand All @@ -79,6 +87,8 @@ func (b *Jitter) Put(p *Packet) {
b.Lock()
defer b.Unlock()

b.listener.OnPacketEnqueue(b.current, b.sumRemainingTs(), p)

if !b.marked || math.Abs(float64(p.Timestamp-b.targetTime())) > 100_000 {
b.init(p.Timestamp)
}
Expand Down Expand Up @@ -115,13 +125,15 @@ func (b *Jitter) Get() ([]*Packet, bool) {
if len(ret) == 0 {
b.loss.Set(targetTime, nil)
b.current += b.defaultTickInterval
b.listener.OnPacketLoss(b.current, b.sumRemainingTs())
return nil, false
}

lastPkt := ret[len(ret)-1]
newTargetTime := lastPkt.Timestamp + lastPkt.SampleCnt
incr := newTargetTime - targetTime
b.current += incr
b.listener.OnPacketDequeue(b.current, b.sumRemainingTs(), ret)

return ret, true
}
Expand Down Expand Up @@ -154,12 +166,14 @@ func (b *Jitter) adaptive() {
if b.sumTsOfLatePackets() > b.window*2/100 { // late 패킷들의 ptime 합이 윈도우의 2% 를 초과시
candidate := b.latency + maxInList(b.late)
b.latency = lo.Min([]int64{candidate, b.maxLatency})
b.listener.OnLatencyChanged(b.latency)
b.late.Init()
}

if b.loss.Len() == 0 && b.late.Len() == 0 { // loss 와 late 가 모두 없으면
candidate := b.latency - minInList(b.normal)
b.latency = lo.Max([]int64{candidate, b.minLatency})
b.listener.OnLatencyChanged(b.latency)
b.late.Init()
}
}
Expand All @@ -176,6 +190,17 @@ func (b *Jitter) sumTsOfLatePackets() int64 {
return ret
}

func (b *Jitter) sumRemainingTs() int64 {
ret := int64(0)
for el := b.list.Front(); el != nil; el = el.Next() {
pkt := el.Value.(*Packet)
if pkt.Timestamp >= b.current {
ret += pkt.SampleCnt
}
}
return ret
}

func maxInList(list *skiplist.SkipList) int64 {
var res int64 = math.MinInt64
for el := list.Front(); el != nil; el = el.Next() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/jitter/jitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func assertLoss(t *testing.T, b *Jitter, expectedTs int64) {
}

func Test_basic(t *testing.T) {
b := NewJitter(100, 400, 20*50, 20)
b := NewJitter(100, 400, 20*50, 20, nil)

b.Put(packet(1))
b.Put(packet(2))
Expand All @@ -54,7 +54,7 @@ func Test_basic(t *testing.T) {
}

func Test_basic2(t *testing.T) {
b := NewJitter(100, 400, 20*50, 20)
b := NewJitter(100, 400, 20*50, 20, nil)

b.Put(packet(1))

Expand Down
23 changes: 23 additions & 0 deletions pkg/jitter/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package jitter

type Listener interface {
OnPacketLoss(currentTs, remainingTs int64)
OnPacketEnqueue(currentTs, remainingTs int64, pkt *Packet)
OnPacketDequeue(currentTs, remainingTs int64, pkt []*Packet)
OnLatencyChanged(new int64)
}

type NullListener struct {
}

func (n NullListener) OnPacketLoss(currentTs, remainingTs int64) {
}

func (n NullListener) OnLatencyChanged(new int64) {
}

func (n NullListener) OnPacketEnqueue(currentTs, remainingTs int64, pkt *Packet) {
}

func (n NullListener) OnPacketDequeue(currentTs, remainingTs int64, pkt []*Packet) {
}
2 changes: 1 addition & 1 deletion pkg/jitter/packet_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestOverflow(t *testing.T) {
factory := NewFactory(100, 400, 20*50, 20)
factory := NewFactory(100, 400, 20*50, 20, nil)
packetBuffer := NewPacketBuffer(factory)

packetBuffer.Put(&Packet{Timestamp: 1<<32 - 20, Data: []byte{1}, SampleCnt: 20})
Expand Down

0 comments on commit 41cfeb9

Please sign in to comment.