-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rfq-relayer): relayer supports active quoting #3198
Changes from 40 commits
d93e20f
481f043
fdbc865
77c51e8
d10d56d
d2b1701
f06a64b
f6300a1
2646149
ea61286
dcd264a
02bf53c
a83253f
6febf7b
3ce1bd3
460f5ac
9ec49cb
92b49ec
e85ff62
f4ed5b5
0c0b562
1742fe3
0d7a7c4
4828dfc
ab9513d
a2a2079
69ed171
fbccdf8
4b098f9
135f1ac
b4969e9
bf3adaa
82ed6bb
896d52d
0018269
c3f0eb3
50b969e
64389c4
0c07fe4
719361a
26c9174
ccd24b3
9688fa7
b98ca8c
739472d
7d7f6df
6a5590f
56afeff
bdb7539
ab15e5d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -90,13 +90,14 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request * | |||||||||||||||||||||
wg.Add(1) | ||||||||||||||||||||||
go func(client WsClient) { | ||||||||||||||||||||||
var respStatus db.ActiveQuoteResponseStatus | ||||||||||||||||||||||
var err error | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix variable shadowing of The use of Apply this diff to fix the issue: - resp, err := client.ReceiveQuoteResponse(collectionCtx, requestID)
+ resp, err = client.ReceiveQuoteResponse(collectionCtx, requestID)
|
||||||||||||||||||||||
_, clientSpan := r.handler.Tracer().Start(collectionCtx, "collectRelayerResponses", trace.WithAttributes( | ||||||||||||||||||||||
attribute.String("relayer_address", relayerAddr), | ||||||||||||||||||||||
attribute.String("request_id", requestID), | ||||||||||||||||||||||
)) | ||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||
clientSpan.SetAttributes(attribute.String("status", respStatus.String())) | ||||||||||||||||||||||
metrics.EndSpan(clientSpan) | ||||||||||||||||||||||
metrics.EndSpanWithErr(clientSpan, err) | ||||||||||||||||||||||
}() | ||||||||||||||||||||||
|
||||||||||||||||||||||
defer wg.Done() | ||||||||||||||||||||||
|
@@ -105,6 +106,11 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request * | |||||||||||||||||||||
logger.Errorf("Error receiving quote response: %v", err) | ||||||||||||||||||||||
return | ||||||||||||||||||||||
} | ||||||||||||||||||||||
span.AddEvent("received quote response", trace.WithAttributes( | ||||||||||||||||||||||
attribute.String("relayer_address", relayerAddr), | ||||||||||||||||||||||
attribute.String("request_id", requestID), | ||||||||||||||||||||||
attribute.String("dest_amount", resp.DestAmount), | ||||||||||||||||||||||
)) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use Accessing the parent Apply this diff to fix the issue: - span.AddEvent("received quote response", trace.WithAttributes(
+ clientSpan.AddEvent("received quote response", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
attribute.String("dest_amount", resp.DestAmount),
)) 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||
|
||||||||||||||||||||||
// validate the response | ||||||||||||||||||||||
respStatus = getQuoteResponseStatus(expireCtx, resp) | ||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -211,6 +211,7 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { | |
wsRoute := engine.Group(RFQStreamRoute) | ||
wsRoute.Use(r.AuthMiddleware()) | ||
wsRoute.GET("", func(c *gin.Context) { | ||
fmt.Println("GET /rfq_stream") | ||
r.GetActiveRFQWebsocket(ctx, c) | ||
}) | ||
|
||
|
@@ -275,7 +276,9 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { | |
loggedRequest = &req | ||
} | ||
case RFQRoute, RFQStreamRoute: | ||
fmt.Println("RFQRoute, RFQStreamRoute") | ||
chainsHeader := c.GetHeader(ChainsHeader) | ||
fmt.Printf("got chains header: %s\n", chainsHeader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider using a proper logging framework and removing debug logs. While adding logs can be helpful for debugging, using
Example using the existing if debugMode {
logger.Debug("RFQRoute or RFQStreamRoute hit")
logger.Debugf("Got chains header: %s", chainsHeader)
} |
||
if chainsHeader != "" { | ||
var chainIDs []int | ||
err = json.Unmarshal([]byte(chainsHeader), &chainIDs) | ||
|
@@ -285,10 +288,12 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { | |
} | ||
} | ||
} | ||
fmt.Printf("dest chain ids: %v\n", destChainIDs) | ||
default: | ||
err = fmt.Errorf("unexpected request path: %s", c.Request.URL.Path) | ||
} | ||
if err != nil { | ||
fmt.Printf("error: %v\n", err) | ||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) | ||
c.Abort() | ||
return | ||
|
@@ -299,13 +304,16 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { | |
for _, destChainID := range destChainIDs { | ||
addr, err := r.checkRole(c, destChainID) | ||
if err != nil { | ||
fmt.Printf("error checking role: %v\n", err) | ||
c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) | ||
c.Abort() | ||
return | ||
} | ||
if addressRecovered == nil { | ||
fmt.Printf("no address recovered, setting to %s\n", addr.Hex()) | ||
addressRecovered = &addr | ||
} else if *addressRecovered != addr { | ||
fmt.Printf("relayer address mismatch: %s != %s\n", addressRecovered.Hex(), addr.Hex()) | ||
c.JSON(http.StatusBadRequest, gin.H{"msg": "relayer address mismatch"}) | ||
c.Abort() | ||
return | ||
|
@@ -316,6 +324,7 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { | |
// Store the request in context after binding and validation | ||
c.Set("putRequest", loggedRequest) | ||
c.Set("relayerAddr", addressRecovered.Hex()) | ||
fmt.Println("auth succeeded") | ||
c.Next() | ||
} | ||
} | ||
|
@@ -434,28 +443,34 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { | |
// @Header 101 {string} X-Api-Version "API Version Number - See docs for more info" | ||
// @Router /quote_requests [get]. | ||
func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Context) { | ||
fmt.Println("GetActiveRFQWebsocket") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance logging with proper framework and more context. The current debug print statement can be improved for better logging practices:
Example improvement: logger.WithFields(log.Fields{
"relayerAddr": relayerAddr,
"remoteAddr": c.Request.RemoteAddr,
}).Debug("Handling WebSocket connection for active quote requests") Also, consider wrapping this log in a debug flag check to avoid unnecessary logging in production environments. |
||
ctx, span := r.handler.Tracer().Start(ctx, "GetActiveRFQWebsocket") | ||
defer func() { | ||
metrics.EndSpan(span) | ||
}() | ||
|
||
fmt.Println("upgrading websocket") | ||
ws, err := r.upgrader.Upgrade(c.Writer, c.Request, nil) | ||
if err != nil { | ||
logger.Error("Failed to set websocket upgrade", "error", err) | ||
return | ||
} | ||
fmt.Println("upgraded websocket") | ||
|
||
// use the relayer address as the ID for the connection | ||
rawRelayerAddr, exists := c.Get("relayerAddr") | ||
if !exists { | ||
fmt.Println("no relayer address recovered from signature") | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"}) | ||
return | ||
} | ||
relayerAddr, ok := rawRelayerAddr.(string) | ||
if !ok { | ||
fmt.Println("invalid relayer address type") | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"}) | ||
return | ||
} | ||
fmt.Printf("relayer address: %s\n", relayerAddr) | ||
|
||
span.SetAttributes( | ||
attribute.String("relayer_address", relayerAddr), | ||
|
@@ -464,6 +479,7 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont | |
// only one connection per relayer allowed | ||
_, ok = r.wsClients.Load(relayerAddr) | ||
if ok { | ||
fmt.Println("relayer already connected") | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "relayer already connected"}) | ||
return | ||
} | ||
|
@@ -474,12 +490,16 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont | |
}() | ||
|
||
client := newWsClient(relayerAddr, ws, r.pubSubManager, r.handler) | ||
fmt.Println("registered ws client") | ||
r.wsClients.Store(relayerAddr, client) | ||
span.AddEvent("registered ws client") | ||
fmt.Println("running ws client") | ||
err = client.Run(ctx) | ||
if err != nil { | ||
fmt.Printf("error running ws client: %v\n", err) | ||
logger.Error("Error running websocket client", "error", err) | ||
} | ||
fmt.Println("ws client done") | ||
} | ||
|
||
const ( | ||
|
@@ -533,16 +553,23 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) { | |
var activeQuote *model.QuoteData | ||
if isActiveRFQ { | ||
activeQuote = r.handleActiveRFQ(ctx, &req, requestID) | ||
if activeQuote != nil && activeQuote.DestAmount != nil { | ||
span.SetAttributes(attribute.String("active_quote_dest_amount", *activeQuote.DestAmount)) | ||
} | ||
} | ||
passiveQuote, err := r.handlePassiveRFQ(ctx, &req) | ||
if err != nil { | ||
logger.Error("Error handling passive RFQ", "error", err) | ||
} | ||
if passiveQuote != nil && passiveQuote.DestAmount != nil { | ||
span.SetAttributes(attribute.String("passive_quote_dest_amount", *passiveQuote.DestAmount)) | ||
} | ||
quote, _ := getBestQuote(activeQuote, passiveQuote) | ||
|
||
// construct the response | ||
var resp model.PutUserQuoteResponse | ||
if quote == nil { | ||
span.AddEvent("no quotes found") | ||
resp = model.PutUserQuoteResponse{ | ||
Success: false, | ||
Reason: "no quotes found", | ||
|
@@ -552,6 +579,10 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) { | |
if activeQuote == nil { | ||
quoteType = quoteTypePassive | ||
} | ||
span.SetAttributes( | ||
attribute.String("quote_type", quoteType), | ||
attribute.String("quote_dest_amount", *quote.DestAmount), | ||
) | ||
resp = model.PutUserQuoteResponse{ | ||
Success: true, | ||
Data: *quote, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve logging approach for WebSocket connection details
While the added print statements can be helpful for debugging, using
fmt.Printf
directly is not ideal for production code. Consider the following improvements:logger
variable (which is an instance oflog.Logger
) instead offmt.Printf
. This allows for better log management and potential log level control.Here's a suggested refactor:
Also, consider adding a helper function to sanitize headers before logging to ensure sensitive information is not exposed.
📝 Committable suggestion