Skip to content

Commit

Permalink
ifname processor: expire old cached entries (influxdata#7838)
Browse files Browse the repository at this point in the history
  • Loading branch information
reimda authored Jul 16, 2020
1 parent 9a5fd65 commit 9b58590
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 35 deletions.
5 changes: 5 additions & 0 deletions plugins/processors/ifname/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ Telegraf minimum version: Telegraf 1.15.0
## stay in order set this to true. keeping the metrics ordered may
## be slightly slower
# ordered = false

## cache_ttl is the amount of time interface names are cached for a
## given agent. After this period elapses if names are needed they
## will be retrieved again.
# cache_ttl = "8h"
```

### Example processing:
Expand Down
19 changes: 12 additions & 7 deletions plugins/processors/ifname/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"container/list"
)

// Type aliases let the implementation be more generic
type keyType = string
type valType = nameMap
type LRUValType = TTLValType

type hashType map[keyType]*list.Element

Expand All @@ -21,7 +19,7 @@ type LRUCache struct {
// Pair is the value of a list node.
type Pair struct {
key keyType
value valType
value LRUValType
}

// initializes a new LRUCache.
Expand All @@ -34,19 +32,19 @@ func NewLRUCache(capacity uint) LRUCache {
}

// Get a list node from the hash map.
func (c *LRUCache) Get(key keyType) (valType, bool) {
func (c *LRUCache) Get(key keyType) (LRUValType, bool) {
// check if list node exists
if node, ok := c.m[key]; ok {
val := node.Value.(*list.Element).Value.(Pair).value
// move node to front
c.l.MoveToFront(node)
return val, true
}
return valType{}, false
return LRUValType{}, false
}

// Put key and value in the LRUCache
func (c *LRUCache) Put(key keyType, value valType) {
func (c *LRUCache) Put(key keyType, value LRUValType) {
// check if list node exists
if node, ok := c.m[key]; ok {
// move the node to front
Expand Down Expand Up @@ -76,3 +74,10 @@ func (c *LRUCache) Put(key keyType, value valType) {
c.m[key] = ptr
}
}

func (c *LRUCache) Delete(key keyType) {
if node, ok := c.m[key]; ok {
c.l.Remove(node)
delete(c.m, key)
}
}
6 changes: 3 additions & 3 deletions plugins/processors/ifname/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
func TestCache(t *testing.T) {
c := NewLRUCache(2)

c.Put("ones", nameMap{1: "one"})
twoMap := nameMap{2: "two"}
c.Put("ones", LRUValType{val: nameMap{1: "one"}})
twoMap := LRUValType{val: nameMap{2: "two"}}
c.Put("twos", twoMap)
c.Put("threes", nameMap{3: "three"})
c.Put("threes", LRUValType{val: nameMap{3: "three"}})

_, ok := c.Get("ones")
require.False(t, ok)
Expand Down
85 changes: 61 additions & 24 deletions plugins/processors/ifname/ifname.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/snmp"
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
Expand Down Expand Up @@ -66,11 +67,18 @@ var sampleConfig = `
## stay in order set this to true. keeping the metrics ordered may
## be slightly slower
# ordered = false
## cache_ttl is the amount of time interface names are cached for a
## given agent. After this period elapses if names are needed they
## will be retrieved again.
# cache_ttl = "8h"
`

type nameMap map[uint64]string
type mapFunc func(agent string) (nameMap, error)
type keyType = string
type valType = nameMap

type mapFunc func(agent string) (nameMap, error)
type makeTableFunc func(string) (*si.Table, error)

type sigMap map[string](chan struct{})
Expand All @@ -82,17 +90,18 @@ type IfName struct {

snmp.ClientConfig

CacheSize uint `toml:"max_cache_entries"`
MaxParallelLookups int `toml:"max_parallel_lookups"`
Ordered bool `toml:"ordered"`
CacheSize uint `toml:"max_cache_entries"`
MaxParallelLookups int `toml:"max_parallel_lookups"`
Ordered bool `toml:"ordered"`
CacheTTL config.Duration `toml:"cache_ttl"`

Log telegraf.Logger `toml:"-"`

ifTable *si.Table `toml:"-"`
ifXTable *si.Table `toml:"-"`

rwLock sync.RWMutex `toml:"-"`
cache *LRUCache `toml:"-"`
cache *TTLCache `toml:"-"`

parallel parallel.Parallel `toml:"-"`
acc telegraf.Accumulator `toml:"-"`
Expand All @@ -106,6 +115,8 @@ type IfName struct {
sigsLock sync.Mutex `toml:"-"`
}

const minRetry time.Duration = 5 * time.Minute

func (d *IfName) SampleConfig() string {
return sampleConfig
}
Expand All @@ -118,7 +129,7 @@ func (d *IfName) Init() error {
d.getMapRemote = d.getMapRemoteNoMock
d.makeTable = makeTableNoMock

c := NewLRUCache(d.CacheSize)
c := NewTTLCache(time.Duration(d.CacheTTL), d.CacheSize)
d.cache = &c

d.sigs = make(sigMap)
Expand All @@ -144,17 +155,42 @@ func (d *IfName) addTag(metric telegraf.Metric) error {
return fmt.Errorf("couldn't parse source tag as uint")
}

m, err := d.getMap(agent)
if err != nil {
return fmt.Errorf("couldn't retrieve the table of interface names: %w", err)
}
firstTime := true
for {
m, age, err := d.getMap(agent)
if err != nil {
return fmt.Errorf("couldn't retrieve the table of interface names: %w", err)
}

name, found := m[num]
if !found {
return fmt.Errorf("interface number %d isn't in the table of interface names", num)
name, found := m[num]
if found {
// success
metric.AddTag(d.DestTag, name)
return nil
}

// We have the agent's interface map but it doesn't contain
// the interface we're interested in. If the entry is old
// enough, retrieve it from the agent once more.
if age < minRetry {
return fmt.Errorf("interface number %d isn't in the table of interface names", num)
}

if firstTime {
d.invalidate(agent)
firstTime = false
continue
}

// not found, cache hit, retrying
return fmt.Errorf("missing interface but couldn't retrieve table")
}
metric.AddTag(d.DestTag, name)
return nil
}

func (d *IfName) invalidate(agent string) {
d.rwLock.RLock()
d.cache.Delete(agent)
d.rwLock.RUnlock()
}

func (d *IfName) Start(acc telegraf.Accumulator) error {
Expand Down Expand Up @@ -203,15 +239,15 @@ func (d *IfName) Stop() error {

// getMap gets the interface names map either from cache or from the SNMP
// agent
func (d *IfName) getMap(agent string) (nameMap, error) {
func (d *IfName) getMap(agent string) (entry nameMap, age time.Duration, err error) {
var sig chan struct{}

// Check cache
d.rwLock.RLock()
m, ok := d.cache.Get(agent)
m, ok, age := d.cache.Get(agent)
d.rwLock.RUnlock()
if ok {
return m, nil
return m, age, nil
}

// Is this the first request for this agent?
Expand All @@ -229,28 +265,28 @@ func (d *IfName) getMap(agent string) (nameMap, error) {
<-sig
// Check cache again
d.rwLock.RLock()
m, ok := d.cache.Get(agent)
m, ok, age := d.cache.Get(agent)
d.rwLock.RUnlock()
if ok {
return m, nil
return m, age, nil
} else {
return nil, fmt.Errorf("getting remote table from cache")
return nil, 0, fmt.Errorf("getting remote table from cache")
}
}

// The cache missed and this is the first request for this
// agent.

// Make the SNMP request
m, err := d.getMapRemote(agent)
m, err = d.getMapRemote(agent)
if err != nil {
//failure. signal without saving to cache
d.sigsLock.Lock()
close(sig)
delete(d.sigs, agent)
d.sigsLock.Unlock()

return nil, fmt.Errorf("getting remote table: %w", err)
return nil, 0, fmt.Errorf("getting remote table: %w", err)
}

// Cache it
Expand All @@ -264,7 +300,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) {
delete(d.sigs, agent)
d.sigsLock.Unlock()

return m, nil
return m, 0, nil
}

func (d *IfName) getMapRemoteNoMock(agent string) (nameMap, error) {
Expand Down Expand Up @@ -310,6 +346,7 @@ func init() {
Version: 2,
Community: "public",
},
CacheTTL: config.Duration(8 * time.Hour),
}
})
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/processors/ifname/ifname_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/snmp"
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestGetMap(t *testing.T) {
Version: 2,
Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout
},
CacheTTL: config.Duration(10 * time.Second),
}

// This test mocks the snmp transaction so don't run net-snmp
Expand Down Expand Up @@ -137,7 +139,7 @@ func TestGetMap(t *testing.T) {
wgReq.Add(1)
go func() {
defer wgReq.Done()
m, err := d.getMap("agent")
m, _, err := d.getMap("agent")
require.NoError(t, err)
require.Equal(t, expected, m)
}()
Expand Down
52 changes: 52 additions & 0 deletions plugins/processors/ifname/ttl_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ifname

import (
"time"
)

type TTLValType struct {
time time.Time // when entry was added
val valType
}

type timeFunc func() time.Time

type TTLCache struct {
validDuration time.Duration
lru LRUCache
now timeFunc
}

func NewTTLCache(valid time.Duration, capacity uint) TTLCache {
return TTLCache{
lru: NewLRUCache(capacity),
validDuration: valid,
now: time.Now,
}
}

func (c *TTLCache) Get(key keyType) (valType, bool, time.Duration) {
v, ok := c.lru.Get(key)
if !ok {
return valType{}, false, 0
}
age := c.now().Sub(v.time)
if age < c.validDuration {
return v.val, ok, age
} else {
c.lru.Delete(key)
return valType{}, false, 0
}
}

func (c *TTLCache) Put(key keyType, value valType) {
v := TTLValType{
val: value,
time: c.now(),
}
c.lru.Put(key, v)
}

func (c *TTLCache) Delete(key keyType) {
c.lru.Delete(key)
}
43 changes: 43 additions & 0 deletions plugins/processors/ifname/ttl_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ifname

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTTLCacheExpire(t *testing.T) {
c := NewTTLCache(1*time.Second, 100)

c.now = func() time.Time {
return time.Unix(0, 0)
}

c.Put("ones", nameMap{1: "one"})
require.Len(t, c.lru.m, 1)

c.now = func() time.Time {
return time.Unix(1, 0)
}

_, ok, _ := c.Get("ones")
require.False(t, ok)
require.Len(t, c.lru.m, 0)
require.Equal(t, c.lru.l.Len(), 0)
}

func TestTTLCache(t *testing.T) {
c := NewTTLCache(1*time.Second, 100)

c.now = func() time.Time {
return time.Unix(0, 0)
}

expected := nameMap{1: "one"}
c.Put("ones", expected)

actual, ok, _ := c.Get("ones")
require.True(t, ok)
require.Equal(t, expected, actual)
}

0 comments on commit 9b58590

Please sign in to comment.