-
Notifications
You must be signed in to change notification settings - Fork 11
/
batch.go
96 lines (81 loc) · 1.68 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package cwlogger
import (
"sort"
"time"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)
const (
maxBatchByteSize = 1048576
maxBatchLength = 10000
logEventOverhead = 26
)
type batch struct {
logEvents []*cloudwatchlogs.InputLogEvent
size int
}
func newBatch() *batch {
return &batch{
logEvents: []*cloudwatchlogs.InputLogEvent{},
}
}
func (b *batch) add(logEvent *cloudwatchlogs.InputLogEvent) (ok bool) {
size := len(*logEvent.Message) + logEventOverhead
if size+b.size <= maxBatchByteSize && len(b.logEvents) < maxBatchLength {
b.logEvents = append(b.logEvents, logEvent)
b.size += size
return true
}
return false
}
func (b *batch) Len() int {
return len(b.logEvents)
}
func (b *batch) Less(i, j int) bool {
return *b.logEvents[i].Timestamp < *b.logEvents[j].Timestamp
}
func (b *batch) Swap(i, j int) {
b.logEvents[i], b.logEvents[j] = b.logEvents[j], b.logEvents[i]
}
type batcher struct {
input chan *cloudwatchlogs.InputLogEvent
output chan []*cloudwatchlogs.InputLogEvent
}
func newBatcher() *batcher {
b := &batcher{
input: make(chan *cloudwatchlogs.InputLogEvent),
output: make(chan []*cloudwatchlogs.InputLogEvent),
}
go b.worker()
return b
}
func (br *batcher) flush() {
close(br.input)
}
func (br *batcher) worker() {
b := newBatch()
timeout := time.After(time.Second)
flush := func() {
if len(b.logEvents) > 0 {
sort.Sort(b)
br.output <- b.logEvents
b = newBatch()
}
timeout = time.After(time.Second)
}
for {
select {
case logEvent, ok := <-br.input:
if !ok {
flush()
close(br.output)
return
}
if ok := b.add(logEvent); !ok {
flush()
b.add(logEvent)
}
case <-timeout:
flush()
}
}
}