Skip to content

Commit

Permalink
Merge pull request #226 from plane-watch/tracker-producer-input-handling
Browse files Browse the repository at this point in the history
Input handling and sync.Pool speedups
  • Loading branch information
bluntelk authored Nov 11, 2023
2 parents 8f12b07 + b545721 commit 381c0ea
Show file tree
Hide file tree
Showing 21 changed files with 418 additions and 217 deletions.
15 changes: 7 additions & 8 deletions cmd/pw_ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func main() {
func commonSetup(c *cli.Context) (*tracker.Tracker, error) {
monitoring.RunWebServer(c)

// let's parse our URL forms
producers, err := setup.HandleSourceFlags(c)
if nil != err {
return nil, err
}

trackerOpts := make([]tracker.Option, 0)
trackerOpts = append(trackerOpts, tracker.WithPrometheusCounters(prometheusGaugeCurrentPlanes, prometheusCounterFramesDecoded))
Expand All @@ -126,10 +129,6 @@ func commonSetup(c *cli.Context) (*tracker.Tracker, error) {
}
}

producers, err := setup.HandleSourceFlags(c)
if nil != err {
return nil, err
}
for _, p := range producers {
trk.AddProducer(p)
}
Expand All @@ -138,9 +137,9 @@ func commonSetup(c *cli.Context) (*tracker.Tracker, error) {
}

func runSimple(c *cli.Context) error {
defer func() {
recover()
}()
//defer func() {
// recover()
//}()
logging.ConfigureForCli()

trk, err := commonSetup(c)
Expand Down
3 changes: 2 additions & 1 deletion cmd/website_decode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func runHttpServer(c *cli.Context) error {
_, _ = fmt.Fprintln(w, "Not an AVR Frame", html.EscapeString(err.Error()))
return
}
pt.GetPlane(frame.Icao()).HandleModeSFrame(frame, refLat, refLon)
source := tracker.FrameSource{RefLat: refLat, RefLon: refLon}
pt.GetPlane(frame.Icao()).HandleModeSFrame(frame, &source)
icaoList[frame.Icao()] = frame.Icao()
frame.Describe(w)
}
Expand Down
15 changes: 8 additions & 7 deletions lib/dedupe/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestBTreeSweep1(t *testing.T) {

frame, _ := beast.NewFrame(beastModeSShort, false)

if nil == filter.Handle(&frame) {
if nil == filter.Handle(frame) {
t.Errorf("Expected to add a frame")
}

Expand All @@ -33,13 +33,13 @@ func TestFilterBTree_Handle(t *testing.T) {

frame, _ := beast.NewFrame(beastModeSShort, false)

resp := filter.Handle(&frame)
resp := filter.Handle(frame)

if resp == nil {
t.Errorf("Expected the same frame back")
}

if nil != filter.Handle(&frame) {
if nil != filter.Handle(frame) {
t.Errorf("Got a duplicated frame back")
}
}
Expand Down Expand Up @@ -109,10 +109,10 @@ func BenchmarkFilterBTree_HandleDuplicates(b *testing.B) {
filter := NewFilterBTree(WithSweeperInterval(0), WithDedupeMaxAge(time.Minute))

frame, _ := beast.NewFrame(beastModeSShort, false)
filter.Handle(&frame)
filter.Handle(frame)

for n := 0; n < b.N; n++ {
if nil != filter.Handle(&frame) {
if nil != filter.Handle(frame) {
b.Error("Should not have gotten a non empty response - duplicate handled incorrectly!?")
}
}
Expand All @@ -126,12 +126,13 @@ func BenchmarkFilterBTree_HandleUnique(b *testing.B) {
for n := 0; n < b.N; n++ {
beastModeSTest := []byte{0x1a, 0x32, 0x22, 0x1b, 0x54, 0xf0, 0x81, 0x2b, 0x26, byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), 0, 0, byte(degree)}
msg, _ := beast.NewFrame(beastModeSTest, false)
if nil == filter.Handle(&msg) {
if nil == filter.Handle(msg) {
b.Fatalf("Expected to insert new message %0X", beastModeSTest)
}
if nil != filter.Handle(&msg) {
if nil != filter.Handle(msg) {
b.Fatalf("Failed duplicate insert of %0X", beastModeSTest)
}
beast.Release(msg)
}
if filter.btree.Len() != b.N {
b.Errorf("Did not get the same number of items as tested. expected %d, got %d", b.N, filter.btree.Len())
Expand Down
8 changes: 4 additions & 4 deletions lib/dedupe/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package dedupe
import "plane.watch/lib/tracker/beast"

func makeBeastMessages(iterMax int) []*beast.Frame {
//max := 0x00FFFFFF
max := iterMax * iterMax * iterMax
messages := make([]*beast.Frame, 0, max)
//maxVal := 0x00FFFFFF
maxVal := iterMax * iterMax * iterMax
messages := make([]*beast.Frame, 0, maxVal)

// setup our test data
template := make([]byte, len(beastModeSShort))
Expand All @@ -22,7 +22,7 @@ func makeBeastMessages(iterMax int) []*beast.Frame {
shrt[14] = byte(y)
shrt[15] = byte(z)
frame, _ := beast.NewFrame(shrt, false)
messages = append(messages, &frame)
messages = append(messages, frame)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions lib/dedupe/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestFilter_Handle(t *testing.T) {
if nil != err {
t.Error(err)
}
fe := tracker.NewFrameEvent(&frame, nil)
fe := tracker.NewFrameEvent(frame, nil)

resp := filter.Handle(&fe)

Expand All @@ -35,7 +35,7 @@ func BenchmarkFilter_HandleDuplicates(b *testing.B) {
filter := NewFilter()

frame, _ := beast.NewFrame(beastModeSShort, false)
fe := tracker.NewFrameEvent(&frame, nil)
fe := tracker.NewFrameEvent(frame, nil)
filter.Handle(&fe)

for n := 0; n < b.N; n++ {
Expand All @@ -52,14 +52,15 @@ func BenchmarkFilter_HandleUnique(b *testing.B) {
for n := 0; n < b.N; n++ {
beastModeSTest := []byte{0x1a, 0x32, 0x22, 0x1b, 0x54, 0xf0, 0x81, 0x2b, 0x26, byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), 0, 0, 0}
msg, _ := beast.NewFrame(beastModeSTest, false)
fe := tracker.NewFrameEvent(&msg, nil)
fe := tracker.NewFrameEvent(msg, nil)

if nil == filter.Handle(&fe) {
b.Fatal("Expected to insert new message")
}
if nil != filter.Handle(&fe) {
b.Fatal("Failed duplicate insert")
}
beast.Release(msg)
}

if int(filter.list.Len()) != b.N {
Expand Down
58 changes: 45 additions & 13 deletions lib/dedupe/forgetfulmap/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type (
forgettable ForgettableFunc

itemCounter prometheus.Gauge

useSyncPool bool
}

// a generic wrapper for things that can be lost
Expand All @@ -31,11 +33,27 @@ type (
Option func(*ForgetfulSyncMap)
)

var (
marbleBag *sync.Pool
)

func init() {
marbleBag = &sync.Pool{
New: func() any {
return &marble{
added: time.Time{},
value: nil,
}
},
}
}

func NewForgetfulSyncMap(opts ...Option) *ForgetfulSyncMap {
f := &ForgetfulSyncMap{
lookup: &sync.Map{},
sweepInterval: 10 * time.Second,
oldAfter: 60 * time.Second,
useSyncPool: true,
}
for _, opt := range opts {
opt(f)
Expand All @@ -51,6 +69,12 @@ func NewForgetfulSyncMap(opts ...Option) *ForgetfulSyncMap {
return f
}

func UseMemSyncPool(use bool) Option {
return func(syncMap *ForgetfulSyncMap) {
syncMap.useSyncPool = use
}
}

func WithPrometheusCounters(numItems prometheus.Gauge) Option {
return func(syncMap *ForgetfulSyncMap) {
syncMap.itemCounter = numItems
Expand Down Expand Up @@ -159,20 +183,20 @@ func (f *ForgetfulSyncMap) AddKey(key interface{}) {
return
}
if kb, ok := key.([]byte); ok {
if 0 == len(kb) {
if len(kb) == 0 {
return
}
}
if ks, ok := key.(string); ok {
if "" == ks {
if ks == "" {
return
}
}
f.Store(key, nil)
}
func (f *ForgetfulSyncMap) AddKeyStr(key string) {
// avoid storing empty things
if "" == key {
if key == "" {
return
}
f.Store(key, nil)
Expand All @@ -188,22 +212,30 @@ func (f *ForgetfulSyncMap) Load(key any) (any, bool) {
return t.value, retBool
}
return nil, false
} else {
return retVal, retBool
}
return retVal, retBool
}

// Store remembers an item
func (f *ForgetfulSyncMap) Store(key, value interface{}) {
f.lookup.Store(key, &marble{
added: time.Now(),
value: value,
})
var m *marble
if f.useSyncPool {
m = marbleBag.Get().(*marble)
} else {
m = &marble{}
}
m.added = time.Now()
m.value = value
f.lookup.Store(key, m)
}

// Delete Removes an item from the list
func (f *ForgetfulSyncMap) Delete(key interface{}) {
f.lookup.Delete(key)
m, ok := f.lookup.Load(key)
if ok {
marbleBag.Put(m)
f.lookup.Delete(key)
}
}

// Len returns a count of the number of items in the list
Expand All @@ -219,11 +251,11 @@ func (f *ForgetfulSyncMap) Len() (entries int32) {
// Range Iterates over the underlying sync.Map and calls the user function once per item
func (f *ForgetfulSyncMap) Range(rangeFunc func(key, value interface{}) bool) {
f.lookup.Range(func(key, value interface{}) bool {
if m, ok := value.(*marble); ok {
m, ok := value.(*marble)
if ok {
return rangeFunc(key, m.value)
} else {
return rangeFunc(key, value)
}
return rangeFunc(key, value)
})
}

Expand Down
56 changes: 49 additions & 7 deletions lib/dedupe/forgetfulmap/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestForgetfulSyncMap_DontSweepNewPlane(t *testing.T) {
t.Error("Test plane not added.")
}

//this shouldn't sweep our new plane.
// this shouldn't sweep our new plane.
testMap.sweep()

if testMap.Len() != 1 {
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestForgetfulSyncMap_Range(t *testing.T) {
item := testItem{value: "item 222"}
testMap.Store("test", item)

if 1 != testMap.Len() {
if testMap.Len() != 1 {
t.Error("Failed to store test item")
}

Expand All @@ -226,7 +226,7 @@ func TestForgetfulSyncMap_Range(t *testing.T) {
if !tOk {
t.Error("Failed to get our test item out unmolested")
}
if "item 222" != typedLoadedItem.value {
if typedLoadedItem.value != "item 222" {
t.Errorf("item came out changed?! - %+v", item)
}

Expand All @@ -237,14 +237,14 @@ func TestForgetfulSyncMap_Range(t *testing.T) {
if !tOk {
t.Error("Failed to get our test item out unmolested")
}
if "item 222" != typedLoadedItem.value {
if typedLoadedItem.value != "item 222" {
t.Errorf("item came out changed?! - %+v", item)
}

return true
})

if 1 != counter {
if counter != 1 {
t.Error("Failed to range correctly through the map")
}
}
Expand All @@ -268,12 +268,12 @@ func TestForgetfulSyncMap_SweepWithCustomExpiryFunc(t *testing.T) {
item2 := testItem{value: "item 222", remove: true}
testMap.Store("item2", item2)

if 2 != testMap.Len() {
if testMap.Len() != 2 {
t.Error("Failed to store test items")
}

testMap.sweep()
if 1 != testMap.Len() {
if testMap.Len() != 1 {
t.Error("Failed to remove our test2 item on sweep")
}

Expand All @@ -283,3 +283,45 @@ func TestForgetfulSyncMap_SweepWithCustomExpiryFunc(t *testing.T) {
t.Error("Failed load our item1")
}
}

func BenchmarkForgetfulSyncMap(b *testing.B) {
m := NewForgetfulSyncMap(
UseMemSyncPool(false),
WithSweepInterval(time.Second*300),
WithOldAgeAfterSeconds(300),
WithForgettableAction(func(key, value any, added time.Time) bool {
return false
}),
)

var i, j int64
for n := 0; n < b.N; n++ {
for j = 0; j < 100; j++ {
m.AddKey((i * 100) + j)
}
for j = 0; j < 100; j++ {
m.Delete((i * 100) + j)
}
}
}

func BenchmarkForgetfulSyncMapPool(b *testing.B) {
m := NewForgetfulSyncMap(
UseMemSyncPool(true),
WithSweepInterval(time.Second*300),
WithOldAgeAfterSeconds(300),
WithForgettableAction(func(key, value any, added time.Time) bool {
return false
}),
)

var i, j int64
for n := 0; n < b.N; n++ {
for j = 0; j < 100; j++ {
m.AddKey((i * 100) + j)
}
for j = 0; j < 100; j++ {
m.Delete((i * 100) + j)
}
}
}
Loading

0 comments on commit 381c0ea

Please sign in to comment.