Skip to content

Commit

Permalink
Implement telegraf's own full metric type
Browse files Browse the repository at this point in the history
main reasons behind this:
- make adding/removing tags cheap
- make adding/removing fields cheap
- make parsing cheaper
- make parse -> decorate -> write out bytes metric flow much faster

Refactor serializer to use byte buffer
  • Loading branch information
sparrc committed Dec 1, 2016
1 parent 332f678 commit db7a4b2
Show file tree
Hide file tree
Showing 40 changed files with 1,375 additions and 397 deletions.
3 changes: 2 additions & 1 deletion agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -323,7 +324,7 @@ func (tm *TestMetricMaker) MakeMetric(
) telegraf.Metric {
switch mType {
case telegraf.Untyped:
if m, err := telegraf.NewMetric(measurement, tags, fields, t); err == nil {
if m, err := metric.New(measurement, tags, fields, t); err == nil {
return m
}
case telegraf.Counter:
Expand Down
12 changes: 2 additions & 10 deletions internal/models/makemetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

// makemetric is used by both RunningAggregator & RunningInput
Expand Down Expand Up @@ -135,16 +136,7 @@ func makemetric(
}
}

var m telegraf.Metric
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, fields, t)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, t)
default:
m, err = telegraf.NewMetric(measurement, tags, fields, t)
}
m, err := metric.New(measurement, tags, fields, t, mType)
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

type RunningAggregator struct {
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *RunningAggregator) Add(in telegraf.Metric) bool {
return false
}

in, _ = telegraf.NewMetric(name, tags, fields, t)
in, _ = metric.New(name, tags, fields, t)
}

r.metrics <- in
Expand Down
15 changes: 8 additions & 7 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/buffer"
"github.com/influxdata/telegraf/metric"
)

const (
Expand Down Expand Up @@ -56,23 +57,23 @@ func NewRunningOutput(

// AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
// Filter any tagexclude/taginclude parameters before adding metric
if ro.Config.Filter.IsActive() {
// In order to filter out tags, we need to create a new metric, since
// metrics are immutable once created.
name := metric.Name()
tags := metric.Tags()
fields := metric.Fields()
t := metric.Time()
name := m.Name()
tags := m.Tags()
fields := m.Fields()
t := m.Time()
if ok := ro.Config.Filter.Apply(name, fields, tags); !ok {
return
}
// error is not possible if creating from another metric, so ignore.
metric, _ = telegraf.NewMetric(name, tags, fields, t)
m, _ = metric.New(name, tags, fields, t)
}

ro.metrics.Add(metric)
ro.metrics.Add(m)
if ro.metrics.Len() == ro.MetricBatchSize {
batch := ro.metrics.Batch(ro.MetricBatchSize)
err := ro.write(batch)
Expand Down
192 changes: 27 additions & 165 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package telegraf
import (
"time"

// TODO remove
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
)

// ValueType is an enumeration of metric types that represent a simple value.
Expand All @@ -19,178 +19,40 @@ const (
)

type Metric interface {
// Name returns the measurement name of the metric
Name() string
Serialize() []byte
String() string // convenience function for string(Serialize())
Copy() Metric

// Name returns the tags associated with the metric
Tags() map[string]string
// Tag functions
HasTag(key string) bool
AddTag(key, value string)
RemoveTag(key string) bool

// Time return the timestamp for the metric
Time() time.Time
// Field functions
HasField(key string) bool
AddField(key string, value interface{})
RemoveField(key string) bool

// Type returns the metric type. Can be either telegraf.Gauge or telegraf.Counter
Type() ValueType
// Name functions
SetName(name string)
SetPrefix(prefix string)
SetSuffix(suffix string)

// UnixNano returns the unix nano time of the metric
// Getting data structure functions
Name() string
Tags() map[string]string
Fields() map[string]interface{}
Time() time.Time
UnixNano() int64

// HashID returns a non-cryptographic hash of the metric (name + tags)
// NOTE: do not persist & depend on this value to disk.
Type() ValueType
Len() int // returns the length of the serialized metric, including newline
HashID() uint64

// Fields returns the fields for the metric
Fields() map[string]interface{}

// String returns a line-protocol string of the metric
String() string

// PrecisionString returns a line-protocol string of the metric, at precision
PrecisionString(precison string) string

// Point returns a influxdb client.Point object
Point() *client.Point

// SetAggregate sets the metric's aggregate status
// This is so that aggregate metrics don't get re-sent to aggregator plugins
// aggregator things:
SetAggregate(bool)
// IsAggregate returns true if the metric is an aggregate
IsAggregate() bool

// Copy copies the metric
Copy() Metric
}

// metric is a wrapper of the influxdb client.Point struct
type metric struct {
pt models.Point

mType ValueType

isaggregate bool
}

func NewMetricFromPoint(pt models.Point) Metric {
return &metric{
pt: pt,
mType: Untyped,
}
}

// NewMetric returns an untyped metric.
func NewMetric(
name string,
tags map[string]string,
fields map[string]interface{},
t time.Time,
) (Metric, error) {
pt, err := models.NewPoint(name, models.NewTags(tags), fields, t)
if err != nil {
return nil, err
}
return &metric{
pt: pt,
mType: Untyped,
}, nil
}

// NewGaugeMetric returns a gauge metric.
// Gauge metrics should be used when the metric is can arbitrarily go up and
// down. ie, temperature, memory usage, cpu usage, etc.
func NewGaugeMetric(
name string,
tags map[string]string,
fields map[string]interface{},
t time.Time,
) (Metric, error) {
pt, err := models.NewPoint(name, models.NewTags(tags), fields, t)
if err != nil {
return nil, err
}
return &metric{
pt: pt,
mType: Gauge,
}, nil
}

// NewCounterMetric returns a Counter metric.
// Counter metrics should be used when the metric being created is an
// always-increasing counter. ie, net bytes received, requests served, errors, etc.
func NewCounterMetric(
name string,
tags map[string]string,
fields map[string]interface{},
t time.Time,
) (Metric, error) {
pt, err := models.NewPoint(name, models.NewTags(tags), fields, t)
if err != nil {
return nil, err
}
return &metric{
pt: pt,
mType: Counter,
}, nil
}

func (m *metric) Name() string {
return m.pt.Name()
}

func (m *metric) Tags() map[string]string {
return m.pt.Tags().Map()
}

func (m *metric) Time() time.Time {
return m.pt.Time()
}

func (m *metric) Type() ValueType {
return m.mType
}

func (m *metric) HashID() uint64 {
return m.pt.HashID()
}

func (m *metric) UnixNano() int64 {
return m.pt.UnixNano()
}

func (m *metric) Fields() map[string]interface{} {
return m.pt.Fields()
}

func (m *metric) String() string {
return m.pt.String()
}

func (m *metric) PrecisionString(precison string) string {
return m.pt.PrecisionString(precison)
}

func (m *metric) Point() *client.Point {
return client.NewPointFrom(m.pt)
}

func (m *metric) IsAggregate() bool {
return m.isaggregate
}

func (m *metric) SetAggregate(b bool) {
m.isaggregate = b
}

func (m *metric) Copy() Metric {
t := time.Time(m.Time())

tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}

out, _ := NewMetric(m.Name(), tags, fields, t)
return out
// Point returns a influxdb client.Point object
// TODO remove this function
Point() *client.Point
}
38 changes: 38 additions & 0 deletions metric/inline_strconv_parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package metric

import (
"reflect"
"strconv"
"unsafe"
)

// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}

// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}

// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}

// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}
Loading

0 comments on commit db7a4b2

Please sign in to comment.