From 788df8d2968d1315ed5717a858c56df51eebd246 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Wed, 4 Jan 2023 12:01:46 +0300 Subject: [PATCH] Pass incomplete alerts through analyzer proxy routine. (#145) --- pkg/analysis/analyzer.go | 80 +++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/pkg/analysis/analyzer.go b/pkg/analysis/analyzer.go index 06c19247..eaa1e0d9 100644 --- a/pkg/analysis/analyzer.go +++ b/pkg/analysis/analyzer.go @@ -57,44 +57,7 @@ func (a *Analyzer) analyze(alerts chan<- entities.Alert, pollingResult entities. if err != nil { return errors.Wrap(err, "failed to analyze nodes statements") } - statusSplit := statements.SplitByNodeStatus() - for _, nodeStatements := range statusSplit { - nodeStatements.SortByNodeAsc() - } - routines := [...]func(in chan<- entities.Alert) error{ - func(in chan<- entities.Alert) error { - criterion := criteria.NewIncompleteCriterion(a.es, a.opts.IncompleteCriteriaOpts, a.zap) - return criterion.Analyze(alerts, statusSplit[entities.Incomplete]) - }, - func(in chan<- entities.Alert) error { - for _, statement := range statusSplit[entities.InvalidHeight] { - in <- &entities.InvalidHeightAlert{NodeStatement: statement} - } - return nil - }, - func(in chan<- entities.Alert) error { - criterion := criteria.NewUnreachableCriterion(a.es, a.opts.UnreachableCriteriaOpts, a.zap) - return criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.Unreachable]) - }, - func(in chan<- entities.Alert) error { - criterion := criteria.NewHeightCriterion(a.opts.HeightCriteriaOpts, a.zap) - criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.OK]) - return nil - }, - func(in chan<- entities.Alert) error { - criterion := criteria.NewStateHashCriterion(a.es, a.opts.StateHashCriteriaOpts, a.zap) - return criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.OK]) - }, - func(in chan<- entities.Alert) error { - criterion, err := criteria.NewBaseTargetCriterion(a.opts.BaseTargetCriterionOpts) - if err != nil { - return err - } - criterion.Analyze(in, pollingResult.Timestamp(), statusSplit[entities.OK]) - return nil - }, - } var ( wg = new(sync.WaitGroup) criteriaOut = make(chan entities.Alert) @@ -108,8 +71,9 @@ func (a *Analyzer) analyze(alerts chan<- entities.Alert, pollingResult entities. }() // run criterion routines + routines := a.criteriaRoutines(statements, pollingResult.Timestamp()) wg.Add(len(routines)) - for _, f := range &routines { + for _, f := range routines { go func(f func(in chan<- entities.Alert) error) { defer wg.Done() if err := f(criteriaOut); err != nil { @@ -148,6 +112,46 @@ func (a *Analyzer) analyze(alerts chan<- entities.Alert, pollingResult entities. return nil } +func (a *Analyzer) criteriaRoutines(statements entities.NodeStatements, timestamp int64) []func(in chan<- entities.Alert) error { + statusSplit := statements.SplitByNodeStatus() + for _, nodeStatements := range statusSplit { + nodeStatements.SortByNodeAsc() + } + return []func(in chan<- entities.Alert) error{ + func(in chan<- entities.Alert) error { + criterion := criteria.NewIncompleteCriterion(a.es, a.opts.IncompleteCriteriaOpts, a.zap) + return criterion.Analyze(in, statusSplit[entities.Incomplete]) + }, + func(in chan<- entities.Alert) error { + for _, statement := range statusSplit[entities.InvalidHeight] { + in <- &entities.InvalidHeightAlert{NodeStatement: statement} + } + return nil + }, + func(in chan<- entities.Alert) error { + criterion := criteria.NewUnreachableCriterion(a.es, a.opts.UnreachableCriteriaOpts, a.zap) + return criterion.Analyze(in, timestamp, statusSplit[entities.Unreachable]) + }, + func(in chan<- entities.Alert) error { + criterion := criteria.NewHeightCriterion(a.opts.HeightCriteriaOpts, a.zap) + criterion.Analyze(in, timestamp, statusSplit[entities.OK]) + return nil + }, + func(in chan<- entities.Alert) error { + criterion := criteria.NewStateHashCriterion(a.es, a.opts.StateHashCriteriaOpts, a.zap) + return criterion.Analyze(in, timestamp, statusSplit[entities.OK]) + }, + func(in chan<- entities.Alert) error { + criterion, err := criteria.NewBaseTargetCriterion(a.opts.BaseTargetCriterionOpts) + if err != nil { + return err + } + criterion.Analyze(in, timestamp, statusSplit[entities.OK]) + return nil + }, + } +} + func (a *Analyzer) Start(notifications <-chan entities.NodesGatheringNotification) <-chan entities.Alert { out := make(chan entities.Alert) go func(alerts chan<- entities.Alert) {