Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quieten enhanced telem #15793

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,11 @@ func hexStringToDecimal(hexString string) (decimal.Decimal, bool) {
func (e *EnhancedTelemetryService[T]) getObservation(finalResult *pipeline.FinalResult) int64 {
singularResult, err := finalResult.SingularResult()
if err != nil {
e.lggr.Warnf("cannot get singular result, job %d", e.job.ID)
return 0
}

finalResultDecimal, err := utils.ToDecimal(singularResult.Value)
if err != nil {
e.lggr.Warnf("cannot parse singular result from bridge task, job %d", e.job.ID)
return 0
}

Expand All @@ -277,7 +275,6 @@ func (e *EnhancedTelemetryService[T]) getObservation(finalResult *pipeline.Final
func (e *EnhancedTelemetryService[T]) getParsedValue(trrs *pipeline.TaskRunResults, trr pipeline.TaskRunResult) float64 {
parsedValue := getJsonParsedValue(trr, trrs)
if parsedValue == nil {
e.lggr.Warnf("cannot get json parse value, job %d, id %s", e.job.ID, trr.Task.DotID())
return 0
}
return *parsedValue
Expand All @@ -302,23 +299,16 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul
if trr.Task.Type() != pipeline.TaskTypeBridge {
continue
}
var bridgeName string
if b, is := trr.Task.(*pipeline.BridgeTask); is {
bridgeName = b.Name
}

if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", trr.Result.Error, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
bridgeRawResponse, ok := trr.Result.Value.(string)
if !ok {
e.lggr.Warnw(fmt.Sprintf("cannot parse bridge response from bridge task, job=%d, id=%s, name=%q: expected string, got: %v (type %T)", e.job.ID, trr.Task.DotID(), bridgeName, trr.Result.Value, trr.Result.Value), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
value := e.getParsedValue(trrs, trr)
Expand Down Expand Up @@ -635,12 +625,11 @@ func getPricesFromBridgeTaskByTelemetryField(lggr logger.Logger, bridgeTask pipe
func parsePriceFromTask(lggr logger.Logger, trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, id %s: %s", trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
return 0
}
return val
}
Expand Down
15 changes: 4 additions & 11 deletions core/services/ocrcommon/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ func TestGetObservation(t *testing.T) {

obs := e.getObservation(&pipeline.FinalResult{})
assert.Equal(t, obs, int64(0))
assert.Equal(t, logs.Len(), 1)
assert.Contains(t, logs.All()[0].Message, "cannot get singular result")
assert.Equal(t, 0, logs.Len())

finalResult := &pipeline.FinalResult{
Values: []interface{}{"123456"},
Expand Down Expand Up @@ -457,8 +456,7 @@ func TestCollectAndSend(t *testing.T) {
}

wg.Wait()
assert.Equal(t, logs.Len(), 2)
assert.Contains(t, logs.All()[0].Message, "cannot parse bridge response from bridge task")
assert.Equal(t, 0, logs.Len())

badTrrs = &pipeline.TaskRunResults{
pipeline.TaskRunResult{
Expand All @@ -475,9 +473,7 @@ func TestCollectAndSend(t *testing.T) {
RepTimestamp: observationTimestamp,
}
wg.Wait()
assert.Equal(t, 2, logs.Len())
assert.Contains(t, logs.All()[0].Message, "cannot parse bridge response from bridge task")
assert.Contains(t, logs.All()[1].Message, "cannot get json parse value")
assert.Equal(t, 0, logs.Len())
doneCh <- struct{}{}
}

Expand Down Expand Up @@ -707,10 +703,7 @@ func TestGetPricesFromBridgeTaskByOrder(t *testing.T) {
require.Equal(t, benchmarkPrice, float64(0))
require.Equal(t, bid, float64(0))
require.Equal(t, ask, float64(0))
require.Equal(t, 3, logs.Len())
require.Contains(t, logs.All()[0].Message, "cannot parse EA telemetry price to float64, DOT id ds1_benchmark")
require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry price to float64, DOT id ds2_bid")
require.Contains(t, logs.All()[2].Message, "cannot parse EA telemetry price to float64, DOT id ds3_ask")
require.Equal(t, 0, logs.Len())

benchmarkPrice, bid, ask = getPricesFromBridgeTask(lggr, trrsMercuryV1[0], trrsMercuryV2, 2)
require.Equal(t, 123456.123456, benchmarkPrice)
Expand Down
Loading