-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
144 lines (109 loc) · 2.23 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package infake
import (
"fmt"
"io"
"log"
"os"
"sync"
"github.com/influxdata/influxdb/client/v2"
)
type Consumer interface {
Consume(<-chan Point) error
}
func NewConsumer(cfg OutputConfig) (Consumer, error) {
switch cfg.Type {
case "", "stdout":
return IoWriterConsumer{os.Stdout}, nil
case "http", "udp":
var c InfluxDBConsumer
var err error
c = InfluxDBConsumer{
BatchPointsConfig: cfg.BatchPoints,
BatchSize: cfg.BatchSize,
MaxConcurrency: cfg.MaxConcurrency,
}
if cfg.Type == "http" {
c.Client, err = client.NewHTTPClient(cfg.HTTP)
} else if cfg.Type == "udp" {
c.Client, err = client.NewUDPClient(cfg.UDP)
}
if err != nil {
return nil, err
}
return c, nil
}
return nil, fmt.Errorf("unknown output type: %q", cfg.Type)
}
type IoWriterConsumer struct {
io.Writer
}
func (c IoWriterConsumer) Consume(pts <-chan Point) error {
for p := range pts {
fmt.Fprintf(c.Writer, "%s\n", p)
}
return nil
}
type InfluxDBConsumer struct {
Client client.Client
BatchPointsConfig client.BatchPointsConfig
BatchSize uint
MaxConcurrency uint
}
func (c InfluxDBConsumer) Consume(pts <-chan Point) error {
bps := make(chan client.BatchPoints)
errc := make(chan error, 1)
defer close(errc)
go func() {
defer close(bps)
var bp client.BatchPoints
var consumed uint
var err error
batchSize := c.BatchSize
if batchSize < 1 {
batchSize = 1
}
for p := range pts {
if bp == nil {
bp, err = client.NewBatchPoints(c.BatchPointsConfig)
if err != nil {
errc <- err
return
}
}
if consumed >= batchSize {
bps <- bp
bp = nil
consumed = 0
} else {
bp.AddPoint(p.Point)
consumed += 1
}
}
if consumed > 0 && bp != nil {
bps <- bp
}
errc <- nil
}()
maxConcurrency := int(c.MaxConcurrency)
if maxConcurrency < 1 {
maxConcurrency = 1
}
var wg sync.WaitGroup
wg.Add(maxConcurrency)
for i := 0; i < maxConcurrency; i++ {
go func() {
c.consumeBatchPoints(bps)
wg.Done()
}()
}
wg.Wait()
return <-errc
}
func (c InfluxDBConsumer) consumeBatchPoints(bps <-chan client.BatchPoints) {
for bp := range bps {
err := c.Client.Write(bp)
if err != nil {
log.Println(err)
}
}
}