Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq-relayer): relayer supports active quoting #3198

Merged
merged 50 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d93e20f
Feat: add active rfq subscription on quoter
dwasse Sep 26, 2024
481f043
Feat: relayer subscribes to active quotes upon starting
dwasse Sep 26, 2024
fdbc865
[goreleaser]
dwasse Sep 26, 2024
77c51e8
Feat: specify ws url in relayer
dwasse Sep 26, 2024
d10d56d
[goreleaser]
dwasse Sep 26, 2024
d2b1701
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
f06a64b
[goreleaser]
dwasse Sep 27, 2024
f6300a1
Fix: build
dwasse Sep 27, 2024
2646149
[goreleaser]
dwasse Sep 27, 2024
ea61286
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
dcd264a
Feat: relayer tracing
dwasse Sep 27, 2024
02bf53c
[goreleaser]
dwasse Sep 27, 2024
a83253f
Feat: use supports_active_quoting instead of ws url
dwasse Sep 27, 2024
6febf7b
[goreleaser]
dwasse Sep 27, 2024
3ce1bd3
WIP: add logs
dwasse Sep 27, 2024
460f5ac
[goreleaser]
dwasse Sep 27, 2024
9ec49cb
WIP: more logs
dwasse Sep 27, 2024
92b49ec
[goreleaser]
dwasse Sep 30, 2024
e85ff62
More logs
dwasse Sep 30, 2024
f4ed5b5
[goreleaser]
dwasse Sep 30, 2024
0c0b562
More logs
dwasse Sep 30, 2024
1742fe3
[goreleaser]
dwasse Sep 30, 2024
0d7a7c4
More logs
dwasse Sep 30, 2024
4828dfc
[goreleaser]
dwasse Sep 30, 2024
ab9513d
Close conn when encountering write err
dwasse Sep 30, 2024
a2a2079
[goreleaser]
dwasse Sep 30, 2024
69ed171
More logs
dwasse Sep 30, 2024
fbccdf8
[goreleaser]
dwasse Sep 30, 2024
4b098f9
More logs
dwasse Sep 30, 2024
135f1ac
[goreleaser]
dwasse Sep 30, 2024
b4969e9
More logs
dwasse Sep 30, 2024
bf3adaa
[goreleaser]
dwasse Sep 30, 2024
82ed6bb
More logs
dwasse Sep 30, 2024
896d52d
[goreleaser]
dwasse Sep 30, 2024
0018269
Logs with ts
dwasse Sep 30, 2024
c3f0eb3
[goreleaser]
dwasse Sep 30, 2024
50b969e
More tracing
dwasse Sep 30, 2024
64389c4
[goreleaser]
dwasse Sep 30, 2024
0c07fe4
Fix: send to reqChan
dwasse Sep 30, 2024
719361a
[goreleaser]
dwasse Sep 30, 2024
26c9174
Check for zero pong time
dwasse Sep 30, 2024
ccd24b3
Fix: make close_at and closed_quote_id optional
dwasse Sep 30, 2024
9688fa7
[goreleaser]
dwasse Sep 30, 2024
b98ca8c
Feat: remove extra fields from responses
dwasse Sep 30, 2024
739472d
[goreleaser]
dwasse Sep 30, 2024
7d7f6df
Fix: skip passive quote
dwasse Sep 30, 2024
6a5590f
[goreleaser]
dwasse Sep 30, 2024
56afeff
Cleanup: remove logs
dwasse Sep 30, 2024
bdb7539
Fix: use correct span
dwasse Sep 30, 2024
ab15e5d
Cleanup: remove logs
dwasse Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/rfq/e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
},
// generated ex-post facto
QuotableTokens: map[string][]string{},
RfqAPIURL: i.apiServer,
RFQAPIURL: i.apiServer,
Signer: signerConfig.SignerConfig{
Type: signerConfig.FileType.String(),
File: filet.TmpFile(i.T(), "", i.relayerWallet.PrivateKeyHex()).Name(),
Expand Down
81 changes: 81 additions & 0 deletions services/rfq/relayer/quoter/quoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quoter

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -31,6 +32,7 @@ import (
"github.com/synapsecns/sanguine/ethergo/signer/signer"
rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client"
"github.com/synapsecns/sanguine/services/rfq/api/model"
"github.com/synapsecns/sanguine/services/rfq/api/rest"
"github.com/synapsecns/sanguine/services/rfq/relayer/inventory"
)

Expand All @@ -42,6 +44,8 @@ var logger = log.Logger("quoter")
type Quoter interface {
// SubmitAllQuotes submits all quotes to the RFQ API.
SubmitAllQuotes(ctx context.Context) (err error)
// SubscribeActiveRFQ subscribes to the RFQ websocket API.
SubscribeActiveRFQ(ctx context.Context) (err error)
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Adding a new method to public interface Quoter may introduce breaking changes

Adding SubscribeActiveRFQ(ctx context.Context) (err error) to the Quoter interface can break existing implementations. Any external packages implementing Quoter will now fail to compile until they implement this new method. Consider the impact on external dependencies and possibly introduce a new interface to extend Quoter.

// ShouldProcess determines if a quote should be processed.
// We do this by either saving all quotes in-memory, and refreshing via GetSelfQuotes() through the API
// The first comparison is does bridge transaction OriginChainID+TokenAddr match with a quote + DestChainID+DestTokenAddr, then we look to see if we have enough amount to relay it + if the price fits our bounds (based on that the Relayer is relaying the destination token for the origin)
Expand Down Expand Up @@ -251,6 +255,83 @@ func (m *Manager) SubmitAllQuotes(ctx context.Context) (err error) {
return m.prepareAndSubmitQuotes(ctx, inv)
}

// SubscribeActiveRFQ subscribes to the RFQ websocket API.
// This function is blocking and will run until the context is cancelled.
func (m *Manager) SubscribeActiveRFQ(ctx context.Context) (err error) {
chainIDs := []int{}
for chainID := range m.config.Chains {
chainIDs = append(chainIDs, chainID)
}
req := model.SubscribeActiveRFQRequest{
ChainIDs: chainIDs,
}
reqChan := make(chan *model.ActiveRFQMessage)
respChan, err := m.rfqClient.SubscribeActiveQuotes(ctx, &req, reqChan)
if err != nil {
return fmt.Errorf("error subscribing to active quotes: %w", err)
}

for {
select {
case <-ctx.Done():
return
case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
Comment on lines +294 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle errors without terminating the subscription loop

Returning an error here will exit the SubscribeActiveRFQ method, terminating the subscription loop. Consider logging the error and continuing to process other messages to ensure that a single failure does not stop the entire subscription.

Apply this diff to adjust error handling:

 			resp, err := m.generateActiveRFQ(ctx, msg)
 			if err != nil {
-				return fmt.Errorf("error generating active RFQ message: %w", err)
+				span.RecordError(err)
+				logger.Error("error generating active RFQ message", "error", err)
+				continue
 			}

Committable suggestion was skipped due to low confidence.

respChan <- resp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential deadlock: Sending responses to respChan instead of reqChan

In the select loop, you are sending responses back to respChan (line 283), which is the channel from which you receive messages. This could cause a deadlock or unexpected behavior. You should send responses to reqChan instead.

Apply this diff to fix the issue:

     case msg := <-respChan:
         resp, err := m.generateActiveRFQ(ctx, msg)
         if err != nil {
             return fmt.Errorf("error generating active RFQ message: %w", err)
         }
-        respChan <- resp
+        reqChan <- resp
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
respChan <- resp
case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
reqChan <- resp

}
}
}

// getActiveRFQ handles an active RFQ message.
func (m *Manager) generateActiveRFQ(ctx context.Context, msg *model.ActiveRFQMessage) (resp *model.ActiveRFQMessage, err error) {
if msg.Op != rest.RequestQuoteOp {
return nil, nil
}

inv, err := m.inventoryManager.GetCommittableBalances(ctx, inventory.SkipDBCache())
if err != nil {
return nil, fmt.Errorf("error getting committable balances: %w", err)
}

var rfqRequest model.WsRFQRequest
err = json.Unmarshal(msg.Content, &rfqRequest)
if err != nil {
return nil, fmt.Errorf("error unmarshalling quote data: %w", err)
}

quoteInput := QuoteInput{
OriginChainID: rfqRequest.Data.OriginChainID,
DestChainID: rfqRequest.Data.DestChainID,
OriginTokenAddr: common.HexToAddress(rfqRequest.Data.OriginTokenAddr),
DestTokenAddr: common.HexToAddress(rfqRequest.Data.DestTokenAddr),
OriginBalance: inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
DestBalance: inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
Comment on lines +334 to +335
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible nil map access without existence checks

Accessing inv maps without checking if the keys exist can lead to a panic if the keys are missing. Before accessing inv[rfqRequest.Data.OriginChainID][...] and inv[rfqRequest.Data.DestChainID][...], ensure that both the chain ID and token address keys exist in the maps.

Apply this diff to safely access the map entries:

     quoteInput := QuoteInput{
         OriginChainID:   rfqRequest.Data.OriginChainID,
         DestChainID:     rfqRequest.Data.DestChainID,
         OriginTokenAddr: common.HexToAddress(rfqRequest.Data.OriginTokenAddr),
         DestTokenAddr:   common.HexToAddress(rfqRequest.Data.DestTokenAddr),
-        OriginBalance:   inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
-        DestBalance:     inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
+        OriginBalance: func() *big.Int {
+            if chainBalances, ok := inv[rfqRequest.Data.OriginChainID]; ok {
+                return chainBalances[common.HexToAddress(rfqRequest.Data.OriginTokenAddr)]
+            }
+            return nil
+        }(),
+        DestBalance: func() *big.Int {
+            if chainBalances, ok := inv[rfqRequest.Data.DestChainID]; ok {
+                return chainBalances[common.HexToAddress(rfqRequest.Data.DestTokenAddr)]
+            }
+            return nil
+        }(),
     }
     if quoteInput.OriginBalance == nil || quoteInput.DestBalance == nil {
         return nil, fmt.Errorf("insufficient inventory balances for the provided chain IDs and token addresses")
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
OriginBalance: inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
DestBalance: inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
OriginBalance: func() *big.Int {
if chainBalances, ok := inv[rfqRequest.Data.OriginChainID]; ok {
return chainBalances[common.HexToAddress(rfqRequest.Data.OriginTokenAddr)]
}
return nil
}(),
DestBalance: func() *big.Int {
if chainBalances, ok := inv[rfqRequest.Data.DestChainID]; ok {
return chainBalances[common.HexToAddress(rfqRequest.Data.DestTokenAddr)]
}
return nil
}(),

}

rawQuote, err := m.generateQuote(ctx, quoteInput)
if err != nil {
return nil, fmt.Errorf("error generating quote: %w", err)
}

rfqResp := model.WsRFQResponse{
RequestID: rfqRequest.RequestID,
DestAmount: rawQuote.DestAmount,
}
respBytes, err := json.Marshal(rfqResp)
if err != nil {
return nil, fmt.Errorf("error serializing response: %w", err)
}
resp = &model.ActiveRFQMessage{
Op: rest.SendQuoteOp,
Content: respBytes,
}

return resp, nil
}

// GetPrice gets the price of a token.
func (m *Manager) GetPrice(parentCtx context.Context, tokenName string) (_ float64, err error) {
ctx, span := m.metricsHandler.Tracer().Start(parentCtx, "GetPrice")
Expand Down
6 changes: 4 additions & 2 deletions services/rfq/relayer/relconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ type Config struct {
BaseChainConfig ChainConfig `yaml:"base_chain_config"`
// OmniRPCURL is the URL of the OmniRPC server.
OmniRPCURL string `yaml:"omnirpc_url"`
// RfqAPIURL is the URL of the RFQ API.
RfqAPIURL string `yaml:"rfq_url"`
// RFQAPIURL is the URL of the RFQ API.
RFQAPIURL string `yaml:"rfq_url"`
// RFQWsURL is the URL of the RFQ websocket.
RFQWsURL *string `yaml:"rfq_ws_url"`
// RelayerAPIPort is the port of the relayer API.
RelayerAPIPort string `yaml:"relayer_api_port"`
// Database is the database config.
Expand Down
11 changes: 8 additions & 3 deletions services/rfq/relayer/relconfig/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,14 @@ func (c Config) GetOmniRPCURL() string {
return c.OmniRPCURL
}

// GetRfqAPIURL returns the RFQ API URL.
func (c Config) GetRfqAPIURL() string {
return c.RfqAPIURL
// GetRFQAPIURL returns the RFQ API URL.
func (c Config) GetRFQAPIURL() string {
return c.RFQAPIURL
}

// GetRFQWsURL returns the RFQ Ws URL.
func (c Config) GetRFQWsURL() *string {
return c.RFQWsURL
}

// GetDatabase returns the database config.
Expand Down
12 changes: 11 additions & 1 deletion services/rfq/relayer/service/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi
priceFetcher := pricer.NewCoingeckoPriceFetcher(cfg.GetHTTPTimeout())
fp := pricer.NewFeePricer(cfg, omniClient, priceFetcher, metricHandler)

apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRfqAPIURL(), nil, sg)
apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRFQAPIURL(), cfg.GetRFQWsURL(), sg)
if err != nil {
return nil, fmt.Errorf("error creating RFQ API client: %w", err)
}
Expand Down Expand Up @@ -219,6 +219,16 @@ func (r *Relayer) Start(ctx context.Context) (err error) {
}
})

if r.cfg.GetRFQWsURL() != nil {
g.Go(func() error {
err = r.quoter.SubscribeActiveRFQ(ctx)
if err != nil {
return fmt.Errorf("could not subscribe to active RFQ: %w", err)
}
return nil
})
}

g.Go(func() error {
for {
select {
Expand Down
Loading