From 4197426a734d901f7a13d0bdbb29d6555a340f18 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 3 Jun 2019 17:34:48 -0700 Subject: [PATCH] Ignore errors serializing single metrics (#5943) --- plugins/outputs/amqp/amqp.go | 3 ++- plugins/outputs/cloud_pubsub/pubsub.go | 6 ++++-- plugins/outputs/file/file.go | 3 ++- plugins/outputs/instrumental/instrumental.go | 3 ++- plugins/outputs/kafka/kafka.go | 6 +++--- plugins/outputs/kinesis/kinesis.go | 6 +++--- plugins/outputs/mqtt/mqtt.go | 5 +++-- plugins/outputs/nats/nats.go | 7 ++++--- plugins/outputs/nsq/nsq.go | 7 ++++--- plugins/outputs/socket_writer/socket_writer.go | 7 +++---- plugins/serializers/influx/influx.go | 3 +++ plugins/serializers/influx/reader.go | 9 +++++---- plugins/serializers/registry.go | 5 ++++- 13 files changed, 42 insertions(+), 28 deletions(-) diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index f82faef64d29e..cb4cc45017abf 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -301,7 +301,8 @@ func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) { for _, metric := range metrics { octets, err := q.serializer.Serialize(metric) if err != nil { - return nil, err + log.Printf("D! [outputs.amqp] Could not serialize metric: %v", err) + continue } _, err = buf.Write(octets) if err != nil { diff --git a/plugins/outputs/cloud_pubsub/pubsub.go b/plugins/outputs/cloud_pubsub/pubsub.go index c8fbf242da21c..5abb04afbaa6a 100644 --- a/plugins/outputs/cloud_pubsub/pubsub.go +++ b/plugins/outputs/cloud_pubsub/pubsub.go @@ -2,11 +2,12 @@ package cloud_pubsub import ( "context" + "encoding/base64" "fmt" + "log" "sync" "cloud.google.com/go/pubsub" - "encoding/base64" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" @@ -229,7 +230,8 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro for i, m := range metrics { b, err := ps.serializer.Serialize(m) if err != nil { - return nil, err + log.Printf("D! [outputs.cloud_pubsub] Could not serialize metric: %v", err) + continue } if ps.Base64Data { diff --git a/plugins/outputs/file/file.go b/plugins/outputs/file/file.go index 89380ae7e265e..a5eb422b72fb7 100644 --- a/plugins/outputs/file/file.go +++ b/plugins/outputs/file/file.go @@ -3,6 +3,7 @@ package file import ( "fmt" "io" + "log" "os" "time" @@ -101,7 +102,7 @@ func (f *File) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { b, err := f.serializer.Serialize(metric) if err != nil { - return fmt.Errorf("failed to serialize message: %s", err) + log.Printf("D! [outputs.file] Could not serialize metric: %v", err) } _, err = f.writer.Write(b) diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index 117c9d4348c3e..f142705a575c4 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -110,7 +110,8 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { buf, err := s.Serialize(m) if err != nil { - log.Printf("E! Error serializing a metric to Instrumental: %s", err) + log.Printf("D! [outputs.instrumental] Could not serialize metric: %v", err) + continue } switch metricType { diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index f2951e6d5eab8..3df5a3a67efab 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -6,13 +6,12 @@ import ( "log" "strings" + "github.com/Shopify/sarama" "github.com/influxdata/telegraf" tlsint "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" uuid "github.com/satori/go.uuid" - - "github.com/Shopify/sarama" ) var ValidTopicSuffixMethods = []string{ @@ -294,7 +293,8 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := k.serializer.Serialize(metric) if err != nil { - return err + log.Printf("D! [outputs.kafka] Could not serialize metric: %v", err) + continue } m := &sarama.ProducerMessage{ diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 497676486293c..1b7b747e96b46 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -6,12 +6,11 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/satori/go.uuid" - "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/internal/config/aws" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" + "github.com/satori/go.uuid" ) type ( @@ -221,7 +220,8 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { values, err := k.serializer.Serialize(metric) if err != nil { - return err + log.Printf("D! [outputs.kinesis] Could not serialize metric: %v", err) + continue } partitionKey := k.getPartitionKey(metric) diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index bacdd3b0e85d1..f6fba5501c556 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -2,6 +2,7 @@ package mqtt import ( "fmt" + "log" "strings" "sync" "time" @@ -150,9 +151,9 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { metricsmap[topic] = append(metricsmap[topic], metric) } else { buf, err := m.serializer.Serialize(metric) - if err != nil { - return err + log.Printf("D! [outputs.mqtt] Could not serialize metric: %v", err) + continue } err = m.publish(topic, buf) diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index d9fdb0e885f09..ef2c4bbf2813e 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -2,13 +2,13 @@ package nats import ( "fmt" - - nats_client "github.com/nats-io/go-nats" + "log" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" + nats_client "github.com/nats-io/go-nats" ) type NATS struct { @@ -108,7 +108,8 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := n.serializer.Serialize(metric) if err != nil { - return err + log.Printf("D! [outputs.nats] Could not serialize metric: %v", err) + continue } err = n.conn.Publish(n.Subject, buf) diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index c826ab6485cf2..a9e2d94ac0bc0 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -2,12 +2,12 @@ package nsq import ( "fmt" - - "github.com/nsqio/go-nsq" + "log" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" + "github.com/nsqio/go-nsq" ) type NSQ struct { @@ -68,7 +68,8 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := n.serializer.Serialize(metric) if err != nil { - return err + log.Printf("D! [outputs.nsq] Could not serialize metric: %v", err) + continue } err = n.producer.Publish(n.Topic, buf) diff --git a/plugins/outputs/socket_writer/socket_writer.go b/plugins/outputs/socket_writer/socket_writer.go index 8b0f56accbdea..833122dfcf7d2 100644 --- a/plugins/outputs/socket_writer/socket_writer.go +++ b/plugins/outputs/socket_writer/socket_writer.go @@ -1,13 +1,12 @@ package socket_writer import ( + "crypto/tls" "fmt" "log" "net" "strings" - "crypto/tls" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" tlsint "github.com/influxdata/telegraf/internal/tls" @@ -128,8 +127,8 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error { for _, m := range metrics { bs, err := sw.Serialize(m) if err != nil { - //TODO log & keep going with remaining metrics - return err + log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err) + continue } if _, err := sw.Conn.Write(bs); err != nil { //TODO log & keep going with remaining strings diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index e7063cbd2f62a..a675add4b6d2c 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -113,6 +113,9 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { for _, m := range metrics { _, err := s.Write(&s.buf, m) if err != nil { + if _, ok := err.(*MetricError); ok { + continue + } return nil, err } } diff --git a/plugins/serializers/influx/reader.go b/plugins/serializers/influx/reader.go index d0dad8eebb984..55b6c2b4130ec 100644 --- a/plugins/serializers/influx/reader.go +++ b/plugins/serializers/influx/reader.go @@ -53,12 +53,13 @@ func (r *reader) Read(p []byte) (int, error) { r.offset += 1 if err != nil { r.buf.Reset() - if err != nil { - // Since we are serializing multiple metrics, don't fail the - // the entire batch just because of one unserializable metric. - log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err) + if _, ok := err.(*MetricError); ok { continue } + // Since we are serializing multiple metrics, don't fail the + // the entire batch just because of one unserializable metric. + log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err) + continue } break } diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index e21e9205ca1ba..cfdb784ccfe73 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -30,6 +30,9 @@ type Serializer interface { // Serialize takes a single telegraf metric and turns it into a byte buffer. // separate metrics should be separated by a newline, and there should be // a newline at the end of the buffer. + // + // New plugins should use SerializeBatch instead to allow for non-line + // delimited metrics. Serialize(metric telegraf.Metric) ([]byte, error) // SerializeBatch takes an array of telegraf metric and serializes it into @@ -41,7 +44,7 @@ type Serializer interface { // Config is a struct that covers the data types needed for all serializer types, // and can be used to instantiate _any_ of the serializers. type Config struct { - // Dataformat can be one of: influx, graphite, or json + // Dataformat can be one of the serializer types listed in NewSerializer. DataFormat string // Support tags in graphite protocol