Skip to content

Commit

Permalink
Pass incomplete alerts through analyzer proxy routine. (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickeskov authored Jan 4, 2023
1 parent 4cff8f4 commit 788df8d
Showing 1 changed file with 42 additions and 38 deletions.
80 changes: 42 additions & 38 deletions pkg/analysis/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,44 +57,7 @@ func (a *Analyzer) analyze(alerts chan<- entities.Alert, pollingResult entities.
if err != nil {
return errors.Wrap(err, "failed to analyze nodes statements")
}
statusSplit := statements.SplitByNodeStatus()
for _, nodeStatements := range statusSplit {
nodeStatements.SortByNodeAsc()
}

routines := [...]func(in chan<- entities.Alert) error{
func(in chan<- entities.Alert) error {
criterion := criteria.NewIncompleteCriterion(a.es, a.opts.IncompleteCriteriaOpts, a.zap)
return criterion.Analyze(alerts, statusSplit[entities.Incomplete])
},
func(in chan<- entities.Alert) error {
for _, statement := range statusSplit[entities.InvalidHeight] {
in <- &entities.InvalidHeightAlert{NodeStatement: statement}
}
return nil
},
func(in chan<- entities.Alert) error {
criterion := criteria.NewUnreachableCriterion(a.es, a.opts.UnreachableCriteriaOpts, a.zap)
return criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.Unreachable])
},
func(in chan<- entities.Alert) error {
criterion := criteria.NewHeightCriterion(a.opts.HeightCriteriaOpts, a.zap)
criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.OK])
return nil
},
func(in chan<- entities.Alert) error {
criterion := criteria.NewStateHashCriterion(a.es, a.opts.StateHashCriteriaOpts, a.zap)
return criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.OK])
},
func(in chan<- entities.Alert) error {
criterion, err := criteria.NewBaseTargetCriterion(a.opts.BaseTargetCriterionOpts)
if err != nil {
return err
}
criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.OK])
return nil
},
}
var (
wg = new(sync.WaitGroup)
criteriaOut = make(chan entities.Alert)
Expand All @@ -108,8 +71,9 @@ func (a *Analyzer) analyze(alerts chan<- entities.Alert, pollingResult entities.
}()

// run criterion routines
routines := a.criteriaRoutines(statements, pollingResult.Timestamp())
wg.Add(len(routines))
for _, f := range &routines {
for _, f := range routines {
go func(f func(in chan<- entities.Alert) error) {
defer wg.Done()
if err := f(criteriaOut); err != nil {
Expand Down Expand Up @@ -148,6 +112,46 @@ func (a *Analyzer) analyze(alerts chan<- entities.Alert, pollingResult entities.
return nil
}

func (a *Analyzer) criteriaRoutines(statements entities.NodeStatements, timestamp int64) []func(in chan<- entities.Alert) error {
statusSplit := statements.SplitByNodeStatus()
for _, nodeStatements := range statusSplit {
nodeStatements.SortByNodeAsc()
}
return []func(in chan<- entities.Alert) error{
func(in chan<- entities.Alert) error {
criterion := criteria.NewIncompleteCriterion(a.es, a.opts.IncompleteCriteriaOpts, a.zap)
return criterion.Analyze(in, statusSplit[entities.Incomplete])
},
func(in chan<- entities.Alert) error {
for _, statement := range statusSplit[entities.InvalidHeight] {
in <- &entities.InvalidHeightAlert{NodeStatement: statement}
}
return nil
},
func(in chan<- entities.Alert) error {
criterion := criteria.NewUnreachableCriterion(a.es, a.opts.UnreachableCriteriaOpts, a.zap)
return criterion.Analyze(in, timestamp, statusSplit[entities.Unreachable])
},
func(in chan<- entities.Alert) error {
criterion := criteria.NewHeightCriterion(a.opts.HeightCriteriaOpts, a.zap)
criterion.Analyze(in, timestamp, statusSplit[entities.OK])
return nil
},
func(in chan<- entities.Alert) error {
criterion := criteria.NewStateHashCriterion(a.es, a.opts.StateHashCriteriaOpts, a.zap)
return criterion.Analyze(in, timestamp, statusSplit[entities.OK])
},
func(in chan<- entities.Alert) error {
criterion, err := criteria.NewBaseTargetCriterion(a.opts.BaseTargetCriterionOpts)
if err != nil {
return err
}
criterion.Analyze(in, timestamp, statusSplit[entities.OK])
return nil
},
}
}

func (a *Analyzer) Start(notifications <-chan entities.NodesGatheringNotification) <-chan entities.Alert {
out := make(chan entities.Alert)
go func(alerts chan<- entities.Alert) {
Expand Down

0 comments on commit 788df8d

Please sign in to comment.