-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector-kafka.go
98 lines (81 loc) · 2.55 KB
/
collector-kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package zipkintracer
import (
"context"
"github.com/Shopify/sarama"
"github.com/apache/thrift/lib/go/thrift"
"github.com/cyril1929/zipkin-go-opentracing/thrift/gen-go/zipkincore"
)
// defaultKafkaTopic sets the standard Kafka topic our Collector will publish
// on. The default topic for zipkin-receiver-kafka is "zipkin", see:
// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka
const defaultKafkaTopic = "zipkin"
// KafkaCollector implements Collector by publishing spans to a Kafka
// broker.
type KafkaCollector struct {
producer sarama.AsyncProducer
logger Logger
topic string
}
// KafkaOption sets a parameter for the KafkaCollector
type KafkaOption func(c *KafkaCollector)
// KafkaLogger sets the logger used to report errors in the collection
// process. By default, a no-op logger is used, i.e. no errors are logged
// anywhere. It's important to set this option.
func KafkaLogger(logger Logger) KafkaOption {
return func(c *KafkaCollector) { c.logger = logger }
}
// KafkaProducer sets the producer used to produce to Kafka.
func KafkaProducer(p sarama.AsyncProducer) KafkaOption {
return func(c *KafkaCollector) { c.producer = p }
}
// KafkaTopic sets the kafka topic to attach the collector producer on.
func KafkaTopic(t string) KafkaOption {
return func(c *KafkaCollector) { c.topic = t }
}
// NewKafkaCollector returns a new Kafka-backed Collector. addrs should be a
// slice of TCP endpoints of the form "host:port".
func NewKafkaCollector(addrs []string, options ...KafkaOption) (Collector, error) {
c := &KafkaCollector{
logger: NewNopLogger(),
topic: defaultKafkaTopic,
}
for _, option := range options {
option(c)
}
if c.producer == nil {
p, err := sarama.NewAsyncProducer(addrs, nil)
if err != nil {
return nil, err
}
c.producer = p
}
go c.logErrors()
return c, nil
}
func (c *KafkaCollector) logErrors() {
for pe := range c.producer.Errors() {
_ = c.logger.Log("msg", pe.Msg, "err", pe.Err, "result", "failed to produce msg")
}
}
// Collect implements Collector.
func (c *KafkaCollector) Collect(s *zipkincore.Span) error {
c.producer.Input() <- &sarama.ProducerMessage{
Topic: c.topic,
Key: nil,
Value: sarama.ByteEncoder(kafkaSerialize(s)),
}
return nil
}
// Close implements Collector.
func (c *KafkaCollector) Close() error {
return c.producer.Close()
}
func kafkaSerialize(s *zipkincore.Span) []byte {
ctx := context.Background()
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
if err := s.Write(ctx, p); err != nil {
panic(err)
}
return t.Buffer.Bytes()
}