Skip to content

Commit

Permalink
[Heartbeat] Fix summarizer (#36519)
Browse files Browse the repository at this point in the history
* Cleanup summarizer code

* Separate concerns in summarizer

* Checkpoint

* Fix failing tests

* FMT

* Tweaks

* Make linter happy

* progress

* cleanup docs

* Bring back wrappers tests (partial)

* Restore wrapper tests

* Fix failing tests

* Fix err handling

* Re-init plugins on retry

* Fix error field handling across retries

* Incorporate PR feedback

* Type fix

* URLs now work, tests passing

* Improved err handling

* Test fixes

* Cleanup naming

* Fix handling of step counts / journey/end missing and also fix continuity test

* Fix failing tests around logging / logging behavior

* Rename OnRetry to BeforeRetry

* Move monitor.status calculation for browsers into summarizer

* Cleanup status logic

* More status consolidation

* Fixed failing tests

* Make monitor logger errors more understandable

* Fix retry delay

* Fix retry delay

* Remove spurious 'wrapped:' in logs

* Incorporate pr feedback

* Fix dur

* Fix cmd status

* Fix tests

* Fmt

* Integrate PR feedback

---------

Co-authored-by: Vignesh Shanmugam <[email protected]>
  • Loading branch information
andrewvc and vigneshshanmugam authored Sep 19, 2023
1 parent aa5b983 commit ccfa54a
Show file tree
Hide file tree
Showing 38 changed files with 1,033 additions and 632 deletions.
7 changes: 5 additions & 2 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/ecserr"
"github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"

"github.com/elastic/beats/v7/heartbeat/hbtestllext"

Expand All @@ -49,7 +50,6 @@ import (
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/validator"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/common/x509util"
)

Expand Down Expand Up @@ -172,6 +172,7 @@ func BaseChecks(ip string, status string, typ string) validator.Validator {
}

return lookslike.Compose(
hbtestllext.MaybeHasEventType,
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"ip": ipCheck,
Expand Down Expand Up @@ -223,8 +224,10 @@ func SimpleURLChecks(t *testing.T, scheme string, host string, port uint16) vali

// URLChecks returns a validator for the given URL's fields
func URLChecks(t *testing.T, u *url.URL) validator.Validator {
t.Helper()
require.NotNil(t, u)
return lookslike.MustCompile(map[string]interface{}{
"url": wrappers.URLFields(u),
"url": wraputil.URLFields(u),
})
}

Expand Down
12 changes: 12 additions & 0 deletions heartbeat/hbtestllext/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package hbtestllext

import (
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
)

// MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys.
Expand All @@ -30,3 +31,14 @@ var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{
},
},
})

var MaybeHasEventType = lookslike.MustCompile(map[string]interface{}{
"event": map[string]interface{}{
"type": isdef.Optional(isdef.IsNonEmptyString),
},
"synthetics.type": isdef.Optional(isdef.IsNonEmptyString),
})

var MaybeHasDuration = lookslike.MustCompile(map[string]interface{}{
"monitor.duration.us": IsInt64,
})
15 changes: 10 additions & 5 deletions heartbeat/look/look.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ import (
// RTT formats a round-trip-time given as time.Duration into an
// event field. The duration is stored in `{"us": rtt}`.
func RTT(rtt time.Duration) mapstr.M {
if rtt < 0 {
rtt = 0
}

return mapstr.M{
// cast to int64 since a go duration is a nano, but we want micros
// This makes the types less confusing because other wise the duration
// we get back has the wrong unit
"us": rtt.Microseconds(),
"us": RTTMS(rtt),
}
}

// RTTMS returns the given time.Duration as an int64 in microseconds, with a value of 0
// if input is negative.
func RTTMS(rtt time.Duration) int64 {
if rtt < 0 {
return 0
}
return rtt.Microseconds()
}

// Reason formats an error into an error event field.
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/libbeat/version"
conf "github.com/elastic/elastic-agent-libs/config"

"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/elastic-agent-libs/useragent"
Expand Down Expand Up @@ -116,7 +116,7 @@ func create(

// Assign any execution errors to the error field and
// assign the url field
js[i] = wrappers.WithURLField(u, job)
js[i] = wraputil.WithURLField(u, job)
}

return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil
Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/file"
Expand Down Expand Up @@ -110,7 +111,7 @@ func checkServer(t *testing.T, handlerFunc http.HandlerFunc, useUrls bool) (*htt
func urlChecks(urlStr string) validator.Validator {
u, _ := url.Parse(urlStr)
return lookslike.MustCompile(map[string]interface{}{
"url": wrappers.URLFields(u),
"url": wraputil.URLFields(u),
})
}

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
return plugin.Plugin{}, err
}

j = append(j, wrappers.WithURLField(u, job))
j = append(j, wraputil.WithURLField(u, job))
}

return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil
Expand Down
6 changes: 3 additions & 3 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/heartbeat/reason"
"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (jf *jobFactory) makeJobs() ([]jobs.Job, error) {
if err != nil {
return nil, err
}
jobs = append(jobs, wrappers.WithURLField(url, endpointJob))
jobs = append(jobs, wraputil.WithURLField(url, endpointJob))
}

}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (jf *jobFactory) makeDirectEndpointJob(endpointURL *url.URL) (jobs.Job, err

// makeSocksLookupEndpointJob makes jobs that use a Socks5 proxy to perform DNS lookups
func (jf *jobFactory) makeSocksLookupEndpointJob(endpointURL *url.URL) jobs.Job {
return wrappers.WithURLField(endpointURL,
return wraputil.WithURLField(endpointURL,
jobs.MakeSimpleJob(func(event *beat.Event) error {
hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port())
return jf.dial(event, hostPort, endpointURL)
Expand Down
24 changes: 19 additions & 5 deletions heartbeat/monitors/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -44,6 +45,7 @@ type MonitorRunInfo struct {
Duration int64 `json:"-"`
Steps *int `json:"steps,omitempty"`
Status string `json:"status"`
Attempt int `json:"attempt"`
}

func (m *MonitorRunInfo) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -78,22 +80,33 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
errors := []error{}
monitorID, err := event.GetValue("monitor.id")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.id: %w", err))
}

durationUs, err := event.GetValue("monitor.duration.us")
if err != nil {
errors = append(errors, err)
durationUs = int64(0)
}

monType, err := event.GetValue("monitor.type")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.type: %w", err))
}

status, err := event.GetValue("monitor.status")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.status: %w", err))
}

jsIface, err := event.GetValue("summary")
var attempt int
if err != nil {
errors = append(errors, fmt.Errorf("could not extract summary to add attempt info: %w", err))
} else {
js, ok := jsIface.(*jobsummary.JobSummary)
if ok && js != nil {
attempt = int(js.Attempt)
}
}

if len(errors) > 0 {
Expand All @@ -105,6 +118,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
Type: monType.(string),
Duration: durationUs.(int64),
Status: status.(string),
Attempt: attempt,
}

sc, _ := event.Meta.GetValue(META_STEP_COUNT)
Expand All @@ -119,7 +133,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
func LogRun(event *beat.Event) {
monitor, err := extractRunInfo(event)
if err != nil {
getLogger().Errorw("error gathering information to log event: ", err)
getLogger().Error(fmt.Errorf("error gathering information to log event: %w", err))
return
}

Expand Down
3 changes: 3 additions & 0 deletions heartbeat/monitors/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand All @@ -47,6 +48,7 @@ func TestLogRun(t *testing.T) {
"monitor.duration.us": durationUs,
"monitor.type": "browser",
"monitor.status": "down",
"summary": jobsummary.NewJobSummary(1, 1, "abc"),
}

event := beat.Event{Fields: fields}
Expand All @@ -64,6 +66,7 @@ func TestLogRun(t *testing.T) {
Duration: durationUs,
Status: "down",
Steps: &steps,
Attempt: 1,
}

assert.ElementsMatch(t, []zap.Field{
Expand Down
1 change: 1 addition & 0 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func baseMockEventMonitorValidator(id string, name string, status string) valida

func mockEventMonitorValidator(id string, name string) validator.Validator {
return lookslike.Strict(lookslike.Compose(
hbtestllext.MaybeHasEventType,
baseMockEventMonitorValidator(id, name, "up"),
hbtestllext.MonitorTimespanValidator,
hbtest.SummaryStateChecks(1, 0),
Expand Down
8 changes: 4 additions & 4 deletions heartbeat/monitors/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -114,7 +114,7 @@ func MakeByIPJob(
"monitor": mapstr.M{"ip": addr.String()},
}

return wrappers.WithFields(fields, pingFactory(addr)), nil
return wraputil.WithFields(fields, pingFactory(addr)), nil
}

// MakeByHostJob creates a new Job including host lookup. The pingFactory will be used to
Expand Down Expand Up @@ -165,7 +165,7 @@ func makeByHostAnyIPJob(
resolveRTT := resolveEnd.Sub(resolveStart)

ipFields := resolveIPEvent(ip.String(), resolveRTT)
return wrappers.WithFields(ipFields, pingFactory(ip))(event)
return wraputil.WithFields(ipFields, pingFactory(ip))(event)
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ func makeByHostAllIPJob(
for i, ip := range ips {
addr := &net.IPAddr{IP: ip}
ipFields := resolveIPEvent(ip.String(), resolveRTT)
cont[i] = wrappers.WithFields(ipFields, pingFactory(addr))
cont[i] = wraputil.WithFields(ipFields, pingFactory(addr))
}
// Ideally we would test this invocation. This function however is really hard to to test given all the extra context it takes in
// In a future refactor we could perhaps test that this in correctly invoked.
Expand Down
57 changes: 57 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jobsummary

import (
"fmt"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
)

// JobSummary is the struct that is serialized in the `summary` field in the emitted event.
type JobSummary struct {
Attempt uint16 `json:"attempt"`
MaxAttempts uint16 `json:"max_attempts"`
FinalAttempt bool `json:"final_attempt"`
Up uint16 `json:"up"`
Down uint16 `json:"down"`
Status monitorstate.StateStatus `json:"status"`
RetryGroup string `json:"retry_group"`
}

func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary {
if maxAttempts < 1 {
maxAttempts = 1
}

return &JobSummary{
MaxAttempts: maxAttempts,
Attempt: attempt,
RetryGroup: retryGroup,
}
}

// BumpAttempt swaps the JobSummary object's pointer for a new job summary
// that is a clone of the current one but with the Attempt field incremented.
func (js *JobSummary) BumpAttempt() {
*js = *NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup)
}

func (js *JobSummary) String() string {
return fmt.Sprintf("<JobSummary status=%s attempt=%d/%d, final=%t, up=%d/%d retryGroup=%s>", js.Status, js.Attempt, js.MaxAttempts, js.FinalAttempt, js.Up, js.Down, js.RetryGroup)
}
Loading

0 comments on commit ccfa54a

Please sign in to comment.