Skip to content
This repository has been archived by the owner on May 4, 2022. It is now read-only.

Commit

Permalink
Ceilometer events support (#81)
Browse files Browse the repository at this point in the history
* Ceilometer events support

* Add comment to pass linting

* Typo in comment fix

* Update ES and QDR versions

Update the ElasticSearch and QDR versions to better reflect our production environment
with Service Telemetry Framework

* Fix edge case bug where single word event types would not be included in index name

* Test for multi-dimensional payload

* Parse traits as a map instead of []interface{}

ES client does not support []interface{} so we need to transform traits
to a proper map.

* Add lint fix back I put in 5 days ago

* Pass pre-commit checks

* Utilize map object for ES storage instead of string

* Use event_type from payload primarily

This patch makes it to use event_type value from payload for ceilometer's
events index name.

* Test for storing ceilometer events to Elastic Search (#82)

* Simple smoketest for storing Ceilometer events

Co-authored-by: Martin Magr <[email protected]>
Co-authored-by: pleimer <[email protected]>
Co-authored-by: pleimer <[email protected]>
  • Loading branch information
4 people authored Mar 10, 2020
1 parent e16c555 commit ac6bb96
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 80 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ services:

env:
global:
- ELASTIC_VERSION=6.0.1
- QDROUTERD_VERSION=1.7.0
- ELASTIC_VERSION=7.5.1
- QDROUTERD_VERSION=1.8.0
- secure: biIvVwFaTdbJythty17sCNsKKU7R9m4cusvV/jiOTwPMTaXuxMgC4XWFsvwFcLO0Nze2RewswxOg5i6hkLpqPTcZLR9GZdXpRG24nuwImHPpgdbpwX35kUnnK4jLY5kOcwndTVHZoqpetxejBxgtKo839dhA7sfFKx4HI+xti3FJvjZgdc/7A2QABGG5wu08XZDo50p1vx7BR9IxgEnK7i34Gq9qXbeeltYChqXIZ4M1rCpevWgvUcvgtyRkcNAtEXjiVTzb+XocHvyjuLBAwu5xzg5xPjiLHJvo3CvxdPb3VsYNHFmxv1g0vJUMR5dELLydhmcIcWAJQGUcDm3FnYQbd8EEuRHvefSubjG+aagNc/8rDAP0yIvTGpIZ9mwDb5x/ps0urOOGLC34ik33sYz2IGlazn2cM4RaGv78TPgibEdQEBCyY0E0zuKo3QejOlG0qkqKF3joZfS90PuKBBPWSEgFrXp357BRBtz/e45FfrwOUwqDYTlj0ivJ99Cnam+GUnlgkn0a2aVQhwAE9XTtrj+Q8JtnhUeTIskN+/OTsYFn8wVWePRK9LhoosFsOoMzA0QQTRn+O2EAN8V5LWY44ghJkGoLNMrvla4Lwc/oPyMqQvDbwJRXeoJ3Fz38TIcTw6HnxRj5DiBlidbl6GxOy/9K8iohkDQ6Z5OzvuI=

# setup dependencies required for testing
Expand Down
17 changes: 3 additions & 14 deletions internal/pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,26 +310,15 @@ func StartEvents() {
case finishCase:
break processingLoop
default:
var event incoming.EventDataFormat
switch amqpServers[index].DataSource.String() {
case "collectd":
event = &incoming.CollectdEvent{}
case "ceilometer":
// noop for now, gonna panic if configured
//event = incoming.CeilometerEvent{}
log.Printf("Received Ceilometer event:\n%s\n", msg)
case "generic":
// noop for now, gonna panic if configured
//event = incoming.GenericEvent{}
log.Printf("Received generic event:\n%s\n", msg)
}
// NOTE: below will panic for generic data source until the appropriate logic will be implemented
event := incoming.NewFromDataSource(amqpServers[index].DataSource)
amqpServers[index].Server.GetHandler().IncTotalMsgProcessed()
err := event.ParseEvent(msg.String())
if err != nil {
log.Printf("Failed to parse received event:\n- error: %s\n- event: %s\n", err, event)
}

record, err := elasticClient.Create(event.GetIndexName(), EVENTSINDEXTYPE, event.GetSanitized())
record, err := elasticClient.Create(event.GetIndexName(), EVENTSINDEXTYPE, event.GetRawData())
if err != nil {
applicationHealth.ElasticSearchState = 0
log.Printf("Failed to save event to Elasticsearch DB:\n- error: %s\n- event: %s\n", err, event)
Expand Down
216 changes: 216 additions & 0 deletions internal/pkg/events/incoming/ceilometer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package incoming

import (
"encoding/json"
"fmt"
"log"
"regexp"
"strings"
"time"
)

//ceilomterGenericIndex value represents ElasticSearch index name for data from which it
// is not possible to clearly construct indexs name
const ceilometerGenericIndex = "ceilometer_generic"

var (
rexForPayload = regexp.MustCompile(`\"payload\"\s*:\s*\[(.*)\]`)
rexForOsloMessage = regexp.MustCompile(`"oslo.message"\s*:\s*"({.*})"`)
ceilometerAlertSeverity = map[string]string{
"audit": "info",
"info": "info",
"sample": "info",
"warn": "warning",
"warning": "warning",
"critical": "critical",
"error": "critical",
"AUDIT": "info",
"INFO": "info",
"SAMPLE": "info",
"WARN": "warning",
"WARNING": "warning",
"CRITICAL": "critical",
"ERROR": "critical",
}
)

//AlertKeySurrogate translates case for fields for AlertManager
type AlertKeySurrogate struct {
Parsed string
Label string
}

//CeilometerEvent implements EventDataFormat interface and holds event message data from collectd.
type CeilometerEvent struct {
sanitized string
parsed map[string]interface{}
indexName string
}

//GetIndexName returns Elasticsearch index to which this event is or should be saved.
func (evt *CeilometerEvent) GetIndexName() string {
if evt.indexName == "" {
result := ceilometerGenericIndex
// use event_type from payload or fallback to message's event_type if N/A
if payload, ok := evt.parsed["payload"]; ok {
if typedPayload, ok := payload.(map[string]interface{}); ok {
if val, ok := typedPayload["event_type"]; ok {
if strVal, ok := val.(string); ok {
result = strVal
}
}
}
}
if result == ceilometerGenericIndex {
if val, ok := evt.parsed["event_type"]; ok {
if strVal, ok := val.(string); ok {
result = strVal
}
}
}
// replace dotted notation and dashes with underscores
parts := strings.Split(result, ".")
if len(parts) > 1 {
result = strings.Join(parts[:len(parts)-1], "_")
}
result = strings.ReplaceAll(result, "-", "_")
if !strings.HasPrefix(result, "ceilometer_") {
result = fmt.Sprintf("ceilometer_%s", result)
}
evt.indexName = result
}
return evt.indexName
}

//GetRawData returns sanitized and umarshalled event data.
func (evt *CeilometerEvent) GetRawData() interface{} {
return evt.parsed
}

//GetSanitized returns sanitized event data
func (evt *CeilometerEvent) GetSanitized() string {
return evt.sanitized
}

//sanitize search and removes all known issues in received data.
func (evt *CeilometerEvent) sanitize(jsondata string) string {
sanitized := jsondata
// parse only relevant data
sub := rexForOsloMessage.FindStringSubmatch(sanitized)
if len(sub) == 2 {
sanitized = rexForNestedQuote.ReplaceAllString(sub[1], `"`)
} else {
log.Printf("Failed to find oslo.message in Ceilometer event: %s\n", jsondata)
}
// avoid getting payload data wrapped in array
item := rexForPayload.FindStringSubmatch(sanitized)
if len(item) == 2 {
sanitized = rexForPayload.ReplaceAllString(sanitized, fmt.Sprintf(`"payload":%s`, item[1]))
}
return sanitized
}

//ParseEvent sanitizes and unmarshals received event data.
func (evt *CeilometerEvent) ParseEvent(data string) error {
evt.sanitized = evt.sanitize(data)
err := json.Unmarshal([]byte(evt.sanitized), &evt.parsed)
if err != nil {
log.Fatal(err)
return err
}
// transforms traits key into map[string]interface{}
if payload, ok := evt.parsed["payload"]; ok {
newPayload := make(map[string]interface{})
if typedPayload, ok := payload.(map[string]interface{}); ok {
if traitData, ok := typedPayload["traits"]; ok {
if traits, ok := traitData.([]interface{}); ok {
newTraits := make(map[string]interface{})
for _, value := range traits {
if typedValue, ok := value.([]interface{}); ok {
if len(typedValue) != 3 {
return fmt.Errorf("parsed invalid trait (%v) in event: %s", value, data)
}
if traitType, ok := typedValue[1].(float64); ok {
switch traitType {
case 2:
newTraits[typedValue[0].(string)] = typedValue[2].(float64)
default:
newTraits[typedValue[0].(string)] = typedValue[2].(string)
}
} else {
return fmt.Errorf("parsed invalid trait (%v) in event: %s", value, data)
}
} else {
return fmt.Errorf("parsed invalid trait (%v) in event: %s", value, data)
}
}
newPayload["traits"] = newTraits
}
}
for key, value := range typedPayload {
if key != "traits" {
newPayload[key] = value
}
}
fmt.Printf("newPayload: %v\n", newPayload)
}
(*evt).parsed["payload"] = newPayload
}

return nil
}

//GeneratePrometheusAlert generates PrometheusAlert from the event data
func (evt *CeilometerEvent) GeneratePrometheusAlert(generatorURL string) PrometheusAlert {
alert := PrometheusAlert{
Labels: make(map[string]string),
Annotations: make(map[string]string),
GeneratorURL: generatorURL,
}
// set labels
alert.Labels["alertname"] = evt.GetIndexName()
surrogates := []AlertKeySurrogate{
AlertKeySurrogate{"message_id", "messageId"},
AlertKeySurrogate{"publisher_id", "instance"},
AlertKeySurrogate{"event_type", "type"},
}
for _, renameCase := range surrogates {
if value, ok := evt.parsed[renameCase.Parsed]; ok {
alert.Labels[renameCase.Label] = value.(string)
}
}
if value, ok := evt.parsed["priority"]; ok {
if severity, ok := ceilometerAlertSeverity[value.(string)]; ok {
alert.Labels["severity"] = severity
} else {
alert.Labels["severity"] = unknownSeverity
}
} else {
alert.Labels["severity"] = unknownSeverity
}
if value, ok := evt.parsed["publisher_id"].(string); ok {
alert.Labels["sourceName"] = strings.Join([]string{"ceilometer", value}, "@")
}
assimilateMap(evt.parsed["payload"].(map[string]interface{}), &alert.Annotations)
// set timestamp
if value, ok := evt.parsed["timestamp"].(string); ok {
// ensure timestamp is in RFC3339
for _, layout := range []string{time.RFC3339, time.RFC3339Nano, time.ANSIC, isoTimeLayout} {
stamp, err := time.Parse(layout, value)
if err == nil {
alert.StartsAt = stamp.Format(time.RFC3339)
break
}
}
}
// generate SG-relevant data
alert.SetName()
alert.SetSummary()
alert.Labels["alertsource"] = "SmartGateway"
return alert
}

//GeneratePrometheusAlertBody generates alert body for Prometheus Alert manager API
func (evt *CeilometerEvent) GeneratePrometheusAlertBody(generatorURL string) ([]byte, error) {
return json.Marshal(evt.GeneratePrometheusAlert(generatorURL))
}
13 changes: 9 additions & 4 deletions internal/pkg/events/incoming/collectd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"time"
)

//GENERICINDEX value represents ElasticSearch index name for data from which it
//collectdGenericIndex value represents ElasticSearch index name for data from which it
// is not possible to clearly construct indexs name
const GENERICINDEX = "collectd_generic"
const collectdGenericIndex = "collectd_generic"

var (
rexForNestedQuote = regexp.MustCompile(`\\\"`)
Expand All @@ -33,7 +33,7 @@ type CollectdEvent struct {
//GetIndexName returns Elasticsearch index to which this event is or should be saved.
func (evt *CollectdEvent) GetIndexName() string {
if evt.indexName == "" {
result := GENERICINDEX
result := collectdGenericIndex
if val, ok := evt.parsed["labels"]; ok {
switch rec := val.(type) {
case map[string]interface{}:
Expand Down Expand Up @@ -120,7 +120,7 @@ func (evt *CollectdEvent) GeneratePrometheusAlert(generatorURL string) Prometheu
assimilateMap(evt.parsed["annotations"].(map[string]interface{}), &alert.Annotations)
if value, ok := evt.parsed["startsAt"].(string); ok {
// ensure timestamps is in RFC3339
for _, layout := range []string{time.RFC3339, time.RFC3339Nano, time.ANSIC} {
for _, layout := range []string{time.RFC3339, time.RFC3339Nano, time.ANSIC, isoTimeLayout} {
stamp, err := time.Parse(layout, value)
if err == nil {
alert.StartsAt = stamp.Format(time.RFC3339)
Expand All @@ -132,8 +132,13 @@ func (evt *CollectdEvent) GeneratePrometheusAlert(generatorURL string) Prometheu
if value, ok := alert.Labels["severity"]; ok {
if severity, ok := collectdAlertSeverity[value]; ok {
alert.Labels["severity"] = severity
} else {
alert.Labels["severity"] = unknownSeverity
}
} else {
alert.Labels["severity"] = unknownSeverity
}

alert.SetName()
assimilateMap(evt.parsed["annotations"].(map[string]interface{}), &alert.Labels)
alert.SetSummary()
Expand Down
20 changes: 20 additions & 0 deletions internal/pkg/events/incoming/formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ package incoming
import (
"sort"
"strings"

"github.com/infrawatch/smart-gateway/internal/pkg/saconfig"
)

const (
isoTimeLayout = "2006-01-02 15:04:05.000000"
unknownSeverity = "unknown"
)

//EventDataFormat interface for storing event data from various sources
Expand All @@ -17,6 +24,8 @@ type EventDataFormat interface {
ParseEvent(string) error
//GeneratePrometheusAlertBody generates alert body for Prometheus Alert manager API
GeneratePrometheusAlertBody(string) ([]byte, error)
//GeneratePrometheusAlertBody generates alert struct
GeneratePrometheusAlert(string) PrometheusAlert
}

//PrometheusAlert represents data structure used for sending alerts to Prometheus Alert Manager
Expand Down Expand Up @@ -75,3 +84,14 @@ func (alert *PrometheusAlert) SetSummary() {
}
}
}

//NewFromDataSource creates empty EventDataFormat according to given DataSource
func NewFromDataSource(source saconfig.DataSource) EventDataFormat {
switch source {
case saconfig.DataSourceCollectd:
return &CollectdEvent{}
case saconfig.DataSourceCeilometer:
return &CeilometerEvent{}
}
return nil
}
2 changes: 1 addition & 1 deletion internal/pkg/metrics/incoming/formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type MetricDataFormat interface {
ISNew() bool
}

//NewFromDataSource creates empty DataType accorging to given DataSource
//NewFromDataSource creates empty DataType according to given DataSource
func NewFromDataSource(source saconfig.DataSource) MetricDataFormat {
switch source {
case saconfig.DataSourceCollectd:
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/saelastic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ func genUUIDv4() string {
}

//Create ... it can be BodyJson or BodyString.. BodyJson needs struct defined
func (ec *ElasticClient) Create(indexname string, indextype string, jsondata string) (string, error) {
func (ec *ElasticClient) Create(indexname string, indextype string, jsondata interface{}) (string, error) {
ctx := ec.ctx
id := genUUIDv4()
debuges("Debug:Printing body %s\n", jsondata)
result, err := ec.client.Index().
Index(string(indexname)).
Type(string(indextype)).
Id(id).
BodyString(jsondata).
BodyJson(jsondata).
Do(ctx)
if err != nil {
// Handle error
Expand Down
Loading

0 comments on commit ac6bb96

Please sign in to comment.