Skip to content

Commit

Permalink
w
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Jan 10, 2025
1 parent 3c9485e commit c6aae18
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 50 deletions.
1 change: 1 addition & 0 deletions core/services/llo/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func NewReportCodecs(lggr logger.Logger, donID uint32) map[llotypes.ReportFormat

codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr, donID)
codecs[llotypes.ReportFormatEVMABIEncodeUnpacked] = evm.NewReportCodecEVMABIEncodeUnpacked(lggr, donID)

return codecs
}
17 changes: 8 additions & 9 deletions core/services/llo/evm/report_codec_evm_abi_encode_unpacked.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type ReportFormatEVMABIEncodeOpts struct {
//
// EXAMPLE
//
// [{"streamID":123,"multiplier":10000,"type":"uint192"}, ...]
// [{"streamID":123,"multiplier":"10000","type":"uint192"}, ...]
//
// See definition of ABIEncoder struct for more details.
//
Expand All @@ -57,7 +57,6 @@ type ReportFormatEVMABIEncodeOpts struct {
ABI []ABIEncoder `json:"abi"`
}

// TODO: test Decode/Encode
func (r *ReportFormatEVMABIEncodeOpts) Decode(opts []byte) error {
return json.Unmarshal(opts, r)
}
Expand Down Expand Up @@ -111,9 +110,10 @@ func (r ReportCodecEVMABIEncodeUnpacked) Encode(ctx context.Context, report llo.
ExpiresAt: report.ObservationTimestampSeconds + opts.ExpirationWindow,
}

// TODO: Enable with verbose logging?
// r.Logger.Debugw("Encoding report", "report", report, "opts", opts, "nativePrice", nativePrice, "linkPrice", linkPrice, "quote", quote, "multiplier", multiplier, "rf", rf)

baseReport, err := r.buildBaseReport(ctx, rf)
header, err := r.buildHeader(ctx, rf)
if err != nil {
return nil, fmt.Errorf("failed to build base report; %w", err)
}
Expand All @@ -123,14 +123,14 @@ func (r ReportCodecEVMABIEncodeUnpacked) Encode(ctx context.Context, report llo.
return nil, fmt.Errorf("failed to build payload; %w", err)
}

return append(baseReport, payload...), nil
return append(header, payload...), nil
}

// baseSchema represents the fixed base schema that remains unchanged for all
// BaseSchema represents the fixed base schema that remains unchanged for all
// EVMABIEncodeUnpacked reports.
//
// An arbitrary payload will be appended to this.
var baseSchema = getBaseSchema()
var BaseSchema = getBaseSchema()

func getBaseSchema() abi.Arguments {
mustNewType := func(t string) abi.Type {
Expand All @@ -150,8 +150,7 @@ func getBaseSchema() abi.Arguments {
})
}

// TODO: Call it "header"
func (r ReportCodecEVMABIEncodeUnpacked) buildBaseReport(ctx context.Context, rf EVMBaseReportFields) ([]byte, error) {
func (r ReportCodecEVMABIEncodeUnpacked) buildHeader(ctx context.Context, rf EVMBaseReportFields) ([]byte, error) {
var merr error
if rf.LinkFee == nil {
merr = errors.Join(merr, errors.New("linkFee may not be nil"))
Expand All @@ -166,7 +165,7 @@ func (r ReportCodecEVMABIEncodeUnpacked) buildBaseReport(ctx context.Context, rf
if merr != nil {
return nil, merr
}
b, err := baseSchema.Pack(rf.FeedID, rf.ValidFromTimestamp, rf.Timestamp, rf.NativeFee, rf.LinkFee, rf.ExpiresAt)
b, err := BaseSchema.Pack(rf.FeedID, rf.ValidFromTimestamp, rf.Timestamp, rf.NativeFee, rf.LinkFee, rf.ExpiresAt)
if err != nil {
return nil, fmt.Errorf("failed to pack base report blob; %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/llo/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (okr *onchainKeyring) MaxSignatureLength() (n int) {

func (okr *onchainKeyring) Sign(digest types.ConfigDigest, seqNr uint64, r ocr3types.ReportWithInfo[llotypes.ReportInfo]) (signature []byte, err error) {
switch r.Info.ReportFormat {
case llotypes.ReportFormatEVMPremiumLegacy:
case llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatEVMABIEncodeUnpacked:
rf := r.Info.ReportFormat
if key, exists := okr.keys[rf]; exists {
// NOTE: Must use legacy Sign method for compatibility with v0.3 report verification
Expand All @@ -101,7 +101,7 @@ func (okr *onchainKeyring) Sign(digest types.ConfigDigest, seqNr uint64, r ocr3t

func (okr *onchainKeyring) Verify(key types.OnchainPublicKey, digest types.ConfigDigest, seqNr uint64, r ocr3types.ReportWithInfo[llotypes.ReportInfo], signature []byte) bool {
switch r.Info.ReportFormat {
case llotypes.ReportFormatEVMPremiumLegacy:
case llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatEVMABIEncodeUnpacked:
rf := r.Info.ReportFormat
if verifier, exists := okr.keys[rf]; exists {
// NOTE: Must use legacy Verify method for compatibility with v0.3 report verification
Expand Down
18 changes: 12 additions & 6 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,18 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
defer cancelFn()
return s.transmit(ctx, t)
}(ctx)

lggr := s.lggr.With("transmission", t, "response", res, "transmissionHash", fmt.Sprintf("%x", t.Hash()))
if req != nil {
lggr = s.lggr.With("req.Payload", req.Payload, "req.ReportFormat", req.ReportFormat)
}

if ctx.Err() != nil {
// only canceled on transmitter close so we can exit
return false
} else if err != nil {
s.transmitConnectionErrorCount.Inc()
s.lggr.Errorw("Transmit report failed", "err", err, "req.Payload", req.Payload, "req.ReportFormat", req.ReportFormat, "transmission", t)
lggr.Errorw("Transmit report failed", "err", err)
if ok := s.q.Push(t); !ok {
s.lggr.Error("Failed to push report to transmit queue; queue is closed")
return false
Expand All @@ -276,7 +282,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
b.Reset()
if res.Error == "" {
s.transmitSuccessCount.Inc()
s.lggr.Debugw("Transmit report success", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res)
lggr.Debug("Transmit report success")
} else {
// We don't need to retry here because the mercury server
// has confirmed it received the report. We only need to retry
Expand All @@ -285,17 +291,17 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
case DuplicateReport:
s.transmitSuccessCount.Inc()
s.transmitDuplicateCount.Inc()
s.lggr.Debugw("Transmit report success; duplicate report", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res)
lggr.Debug("Transmit report success; duplicate report")
default:
promTransmitServerErrorCount.WithLabelValues(donIDStr, s.url, strconv.FormatInt(int64(res.Code), 10)).Inc()
s.lggr.Errorw("Transmit report failed; mercury server returned error", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "response", res, "transmission", t, "err", res.Error, "code", res.Code)
lggr.Errorw("Transmit report failed; mercury server returned error", "err", res.Error, "code", res.Code)
}
}

select {
case s.deleteQueue <- t.Hash():
default:
s.lggr.Criticalw("Delete queue is full", "transmission", t, "transmissionHash", fmt.Sprintf("%x", t.Hash()))
lggr.Criticalw("Delete queue is full")
}
return true
}()
Expand All @@ -309,7 +315,7 @@ func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitReq
switch t.Report.Info.ReportFormat {
case llotypes.ReportFormatJSON:
payload, err = s.jsonPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs)
case llotypes.ReportFormatEVMPremiumLegacy:
case llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatEVMABIEncodeUnpacked:
payload, err = s.evmPremiumLegacyPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs)
default:
return nil, nil, fmt.Errorf("Transmit failed; don't know how to Pack unsupported report format: %q", t.Report.Info.ReportFormat)
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ func (d *Delegate) newServicesLLO(
// Also re-use EVM keys for signing the retirement report. This isn't
// required, just seems easiest since it's the only key type available for
// now.
for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement} {
for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement, llotypes.ReportFormatEVMABIEncodeUnpacked} {
if _, exists := kbm[rf]; !exists {
// Use the first if unspecified
kbs, err3 := d.ks.GetAllOfType("evm")
Expand Down
52 changes: 47 additions & 5 deletions core/services/ocr2/plugins/llo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,29 @@ observationSource = """
))
}

func addStreamSpec(
t *testing.T,
node Node,
name string,
streamID *uint32,
observationSource string,
) (id int32) {
optionalStreamID := ""
if streamID != nil {
optionalStreamID = fmt.Sprintf("streamID = %d\n", *streamID)
}
specTOML := fmt.Sprintf(`
type = "stream"
schemaVersion = 1
name = "%s"
%s
observationSource = """
%s
"""
`, name, optionalStreamID, observationSource)
return node.AddStreamJob(t, specTOML)
}

func addQuoteStreamJob(
t *testing.T,
node Node,
Expand Down Expand Up @@ -331,7 +354,7 @@ transmitterID = "%x"
))
}

func createBridge(t *testing.T, name string, i int, p decimal.Decimal, borm bridges.ORM) (bridgeName string) {
func createSingleDecimalBridge(t *testing.T, name string, i int, p decimal.Decimal, borm bridges.ORM) (bridgeName string) {
ctx := testutils.Context(t)
bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
b, err := io.ReadAll(req.Body)
Expand All @@ -355,6 +378,22 @@ func createBridge(t *testing.T, name string, i int, p decimal.Decimal, borm brid
return bridgeName
}

func createBridge(t *testing.T, bridgeName string, resultJSON string, borm bridges.ORM) {
ctx := testutils.Context(t)
bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusOK)
resp := fmt.Sprintf(`{"result": %s}`, resultJSON)
_, err := res.Write([]byte(resp))
require.NoError(t, err)
}))
t.Cleanup(bridge.Close)
u, _ := url.Parse(bridge.URL)
require.NoError(t, borm.CreateBridgeType(ctx, &bridges.BridgeType{
Name: bridges.BridgeName(bridgeName),
URL: models.WebURL(*u),
}))
}

func addOCRJobsEVMPremiumLegacy(
t *testing.T,
streams []Stream,
Expand Down Expand Up @@ -386,7 +425,7 @@ func addOCRJobsEVMPremiumLegacy(
name = "linkprice"
}
name = fmt.Sprintf("%s-%d-%d", name, strm.id, j)
bmBridge := createBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM())
bmBridge := createSingleDecimalBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM())
jobID := addSingleDecimalStreamJob(
t,
node,
Expand All @@ -395,9 +434,9 @@ func addOCRJobsEVMPremiumLegacy(
)
jobIDs[i][strm.id] = jobID
} else {
bmBridge := createBridge(t, fmt.Sprintf("benchmarkprice-%d-%d", strm.id, j), i, strm.baseBenchmarkPrice, node.App.BridgeORM())
bidBridge := createBridge(t, fmt.Sprintf("bid-%d-%d", strm.id, j), i, strm.baseBid, node.App.BridgeORM())
askBridge := createBridge(t, fmt.Sprintf("ask-%d-%d", strm.id, j), i, strm.baseAsk, node.App.BridgeORM())
bmBridge := createSingleDecimalBridge(t, fmt.Sprintf("benchmarkprice-%d-%d", strm.id, j), i, strm.baseBenchmarkPrice, node.App.BridgeORM())
bidBridge := createSingleDecimalBridge(t, fmt.Sprintf("bid-%d-%d", strm.id, j), i, strm.baseBid, node.App.BridgeORM())
askBridge := createSingleDecimalBridge(t, fmt.Sprintf("ask-%d-%d", strm.id, j), i, strm.baseAsk, node.App.BridgeORM())
jobID := addQuoteStreamJob(
t,
node,
Expand All @@ -424,3 +463,6 @@ func addOCRJobsEVMPremiumLegacy(
}
return jobIDs
}

func addDexBasedAssetStreamSpec() {
}
Loading

0 comments on commit c6aae18

Please sign in to comment.