-
Notifications
You must be signed in to change notification settings - Fork 1
/
ForwarderElasticsearch.go
137 lines (126 loc) · 3.91 KB
/
ForwarderElasticsearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package canarytools
import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
"strconv"
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
log "github.com/sirupsen/logrus"
)
// ElasticForwarder sends alerts to elasticsearch
type ElasticForwarder struct {
index string
client *elasticsearch.Client
l *log.Logger
// TODO: TLS!
}
// NewElasticForwarder creates a new ElasticForwarder,
// it verifies configurations and tries to ping the cluster
func NewElasticForwarder(cfg elasticsearch.Config, index string, l *log.Logger) (elasticforwarder *ElasticForwarder, err error) {
elasticforwarder = &ElasticForwarder{}
elasticforwarder.index = index
elasticforwarder.l = l
elasticforwarder.client, err = elasticsearch.NewClient(cfg)
if err != nil {
l.WithFields(log.Fields{
"source": "NewElasticForwarder",
"stage": "forward",
"err": err,
}).Error("NewElasticForwarder error creating client")
return
}
p, err := elasticforwarder.client.Info()
if err != nil || p.IsError() {
l.WithFields(log.Fields{
"source": "NewElasticForwarder",
"stage": "forward",
"err": err,
"status": p.Status(),
}).Error("NewElasticForwarder error getting cluster info")
return nil, errors.New(p.String())
}
defer p.Body.Close() // freakin' leak!
l.WithFields(log.Fields{
"source": "NewElasticForwarder",
"stage": "forward",
"mesage": p,
}).Info("elasticsearch cluster info")
return
}
func (ef ElasticForwarder) Forward(outChan <-chan []byte, incidentAckerChan chan<- []byte) {
for i := range outChan {
var indexname = ef.index + "-" // preparing index name canarychirps-yyyy.MM.dd
var indexsuffix = time.Now().UTC().Format("2006.01.02") // preparing index suffix canarychirps-yyyy.MM.dd
var incidentTime = time.Now().UTC()
// we'll have to unmarshal the incident to extract timestamp,
// then add "@timestamp" to the event, then marshal it again
// we could've done that earlier in the pipeline, but to maintain
// consitency.
var j map[string]interface{} // temp
err := json.Unmarshal(i, &j)
if err != nil {
ef.l.WithFields(log.Fields{
"source": "ElasticForwarder",
"stage": "forward",
"err": err,
}).Error("Forward unmarshaling incident")
}
// getting updated_time
ut, ok := j["updated_time"] // updated_time: "1588805467"
if ok { // we found it
utstring, ok := ut.(string) // prevent panic if not string
if ok { // it's a string
utint, err := strconv.Atoi(utstring) // "1588805467" -> 1588805467
if err == nil { // error converting to int?
incidentTime = time.Unix(int64(utint), 0).UTC()
indexsuffix = incidentTime.Format("2006.01.02")
}
indexname = indexname + indexsuffix // we now have index name
}
}
// add "@timestamp"
j["@timestamp"] = incidentTime.Format("2006-01-02T15:04:05.999Z")
b, err := json.Marshal(j) // we got a json back
if err != nil {
ef.l.WithFields(log.Fields{
"source": "ElasticForwarder",
"stage": "forward",
"err": err,
}).Error("Forward error marshaling incident")
}
buf := bytes.NewReader(b)
if err != nil {
ef.l.WithFields(log.Fields{
"source": "ElasticForwarder",
"stage": "forward",
"err": err,
}).Error("Forward error writing to buffer")
}
// Set up the request object.
req := esapi.IndexRequest{
Index: indexname,
Body: buf,
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), ef.client)
if err != nil || res.IsError() {
ef.l.WithFields(log.Fields{
"source": "ElasticForwarder",
"stage": "forward",
"err": err,
"status": res.Status(),
}).Error("Forward error indexing document")
continue
}
defer res.Body.Close()
// add to incident acker
buf.Seek(0, 0)
i, _ := ioutil.ReadAll(buf)
incidentAckerChan <- i
}
}