Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS-412 | Active Orders Query: SSE #518

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/sidecar_query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo

// HTTP handlers
poolsHttpDelivery.NewPoolsHandler(e, poolsUseCase)
passthroughHttpDelivery.NewPassthroughHandler(e, passthroughUseCase, orderBookUseCase)
passthroughHttpDelivery.NewPassthroughHandler(e, passthroughUseCase, orderBookUseCase, logger)
systemhttpdelivery.NewSystemHandler(e, config, logger, chainInfoUseCase)
if err := tokenshttpdelivery.NewTokensHandler(e, *config.Pricing, tokensUseCase, pricingSimpleRouterUsecase, logger); err != nil {
return nil, err
Expand Down
98 changes: 98 additions & 0 deletions delivery/http/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package http

import (
"bytes"
"encoding/json"
"fmt"
"io"

"github.com/labstack/echo/v4"
)

// Event represents Server-Sent Event.
// SSE explanation: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
type Event struct {
// ID is used to set the EventSource object's last event ID value.
ID []byte
// Data field is for the message. When the EventSource receives multiple consecutive lines
// that begin with data:, it concatenates them, inserting a newline character between each one.
// Trailing newlines are removed.
Data []byte
// Event is a string identifying the type of event described. If this is specified, an event
// will be dispatched on the browser to the listener for the specified event name; the website
// source code should use addEventListener() to listen for named events. The onmessage handler
// is called if no event name is specified for a message.
Event []byte
// Retry is the reconnection time. If the connection to the server is lost, the browser will
// wait for the specified time before attempting to reconnect. This must be an integer, specifying
// the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored.
Retry []byte
// Comment line can be used to prevent connections from timing out; a server can send a comment
// periodically to keep the connection alive.
Comment []byte
}
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved

// MarshalTo marshals Event to given Writer
func (ev *Event) MarshalTo(w io.Writer) error {
// Marshalling part is taken from: https://github.com/r3labs/sse/blob/c6d5381ee3ca63828b321c16baa008fd6c0b4564/http.go#L16
if len(ev.Data) == 0 && len(ev.Comment) == 0 {
return nil
}

if len(ev.Data) > 0 {
if _, err := fmt.Fprintf(w, "id: %s\n", ev.ID); err != nil {
return err
}

sd := bytes.Split(ev.Data, []byte("\n"))
for i := range sd {
if _, err := fmt.Fprintf(w, "data: %s\n", sd[i]); err != nil {
return err
}
}

if len(ev.Event) > 0 {
if _, err := fmt.Fprintf(w, "event: %s\n", ev.Event); err != nil {
return err
}
}

if len(ev.Retry) > 0 {
if _, err := fmt.Fprintf(w, "retry: %s\n", ev.Retry); err != nil {
return err
}
}
}

if len(ev.Comment) > 0 {
if _, err := fmt.Fprintf(w, ": %s\n", ev.Comment); err != nil {
return err
}
}

if _, err := fmt.Fprint(w, "\n"); err != nil {
return err
}

return nil
}
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved

// WriteEvent writes the given data to the given ResponseWriter as an Event.
func WriteEvent(w *echo.Response, data any) error {
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved
b, err := json.Marshal(data)
if err != nil {
return err
}

event := Event{
Data: b,
}

if err := event.MarshalTo(w); err != nil {
return err
}

w.Flush()

return nil
}
15 changes: 15 additions & 0 deletions delivery/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"github.com/labstack/echo/v4"
"github.com/osmosis-labs/sqs/validator"
)

// RequestUnmarshaler is any type capable to unmarshal data from HTTP request to itself.
Expand All @@ -13,3 +14,17 @@ type RequestUnmarshaler interface {
func UnmarshalRequest(c echo.Context, m RequestUnmarshaler) error {
return m.UnmarshalHTTPRequest(c)
}

// ParseRequest encapsulates the request unmarshalling and validation logic.
// It unmarshals the request and validates it if the request implements the Validator interface.
func ParseRequest(c echo.Context, req RequestUnmarshaler) error {
if err := UnmarshalRequest(c, req); err != nil {
return err
}

v, ok := req.(validator.Validator)
if !ok {
return nil
}
return validator.Validate(v)
}
23 changes: 23 additions & 0 deletions delivery/http/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package http

import (
"context"

"github.com/labstack/echo/v4"
"go.opentelemetry.io/otel/trace"
)

// Span returns the current span from the context.
func Span(c echo.Context) (context.Context, trace.Span) {
ctx := c.Request().Context()
span := trace.SpanFromContext(ctx)
return ctx, span
}

// RecordSpanError records an error ( if any ) for the span.
func RecordSpanError(ctx context.Context, span trace.Span, err error) {
if err != nil {
span.RecordError(err)
}
// Note: we do not end the span here as it is ended in the middleware.
}
14 changes: 11 additions & 3 deletions domain/mocks/orderbook_usecase_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ var _ mvc.OrderBookUsecase = &OrderbookUsecaseMock{}

// OrderbookUsecaseMock is a mock implementation of the RouterUsecase interface
type OrderbookUsecaseMock struct {
ProcessPoolFunc func(ctx context.Context, pool sqsdomain.PoolI) error
GetAllTicksFunc func(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool)
GetActiveOrdersFunc func(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)
ProcessPoolFunc func(ctx context.Context, pool sqsdomain.PoolI) error
GetAllTicksFunc func(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool)
GetActiveOrdersFunc func(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)
GetActiveOrdersStreamFunc func(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult
}

func (m *OrderbookUsecaseMock) ProcessPool(ctx context.Context, pool sqsdomain.PoolI) error {
Expand All @@ -37,3 +38,10 @@ func (m *OrderbookUsecaseMock) GetActiveOrders(ctx context.Context, address stri
}
panic("unimplemented")
}

func (m *OrderbookUsecaseMock) GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult {
if m.GetActiveOrdersStreamFunc != nil {
return m.GetActiveOrdersStreamFunc(ctx, address)
}
panic("unimplemented")
}
5 changes: 5 additions & 0 deletions domain/mvc/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ type OrderBookUsecase interface {

// GetOrder returns all active orderbook orders for a given address.
GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)

// GetActiveOrdersStream returns a channel for streaming limit orderbook orders for a given address.
// The caller should range over the channel, but note that channel is never closed since there may by multiple
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is then responsible for closing it to prevent leakage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since channels are not like files it's fine to keep those open. Closing is necessary when we want to inform client that there will be no values send, which is simple to achieve with one sender, but difficult with mulitple senders, because sending to close channel would cause a panic. In our scenario sender will have always some values to sent, thus I think adding synchronization to close a channel would only introduce unecessary complexity without clear benefit.

// sender goroutines.
GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult
}
9 changes: 9 additions & 0 deletions domain/orderbook/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Asset struct {
Decimals int `json:"-"`
}

// LimitOrder represents a limit order in the orderbook.
type LimitOrder struct {
TickId int64 `json:"tick_id"`
OrderId int64 `json:"order_id"`
Expand All @@ -93,3 +94,11 @@ type LimitOrder struct {
BaseAsset Asset `json:"base_asset"`
PlacedTx *string `json:"placed_tx,omitempty"`
}

// OrderbookResult represents orderbook orders result.
type OrderbookResult struct {
LimitOrders []LimitOrder // The channel on which the orders are delivered.
PoolID uint64 // The PoolID ID.
IsBestEffort bool
Error error
}
7 changes: 7 additions & 0 deletions orderbook/usecase/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package orderbookusecase

import (
"context"
"time"

"github.com/osmosis-labs/sqs/domain"
orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook"
)

// SetFetchActiveOrdersEveryDuration overrides the fetchActiveOrdersDuration for testing purposes
func (o *OrderbookUseCaseImpl) SetFetchActiveOrdersEveryDuration(duration time.Duration) {
fetchActiveOrdersDuration = duration
}

// CreateFormattedLimitOrder is an alias of createFormattedLimitOrder for testing purposes
func (o *OrderbookUseCaseImpl) CreateFormattedLimitOrder(
poolID uint64,
Expand Down
99 changes: 81 additions & 18 deletions orderbook/usecase/orderbook_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,95 @@ func (o *OrderbookUseCaseImpl) ProcessPool(ctx context.Context, pool sqsdomain.P
return nil
}

var (
// fetchActiveOrdersEvery is a duration in which orders are pushed to the client periodically
// This is an arbitrary number selected to avoid spamming the client
fetchActiveOrdersDuration = 5 * time.Second
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved

// getActiveOrdersStreamChanLen is the length of the channel for active orders stream
// length is arbitrary number selected to avoid blocking
getActiveOrdersStreamChanLen = 50
)

// GetActiveOrdersStream implements mvc.OrderBookUsecase.
func (o *OrderbookUseCaseImpl) GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult {
// Result channel
c := make(chan orderbookdomain.OrderbookResult, getActiveOrdersStreamChanLen)

// Function to fetch orders
fetchOrders := func(ctx context.Context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we could reuse this in GetActiveOrders to reduce code duplication?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think atm code is quite opimised it for duplication: we're reusing common data structures; I think further optmisations would add not significant improvements vs time spent finding right abstractions due smalll differences between fetchOrders and GetActiveOrders. For example in fetchOrders we skip empty results, have additional logging specifc for this method.

orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs()
if err != nil {
c <- orderbookdomain.OrderbookResult{
Error: types.FailedGetAllCanonicalOrderbookPoolIDsError{Err: err},
}
return
}

for _, orderbook := range orderbooks {
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address)
if len(limitOrders) == 0 && err == nil {
return // skip empty orders
}

if err != nil {
telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", orderbook.PoolID), zap.Any("err", err))
}

select {
case c <- orderbookdomain.OrderbookResult{
PoolID: orderbook.PoolID,
IsBestEffort: isBestEffort,
LimitOrders: limitOrders,
Error: err,
}:
case <-ctx.Done():
return
}
}(orderbook)
}
}

// Fetch orders immediately on start
go fetchOrders(ctx)

// Pull orders periodically based on duration
go func() {
ticker := time.NewTicker(fetchActiveOrdersDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fetchOrders(ctx)
case <-ctx.Done():
return
}
}
}()
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved

return c
}
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved

// GetActiveOrders implements mvc.OrderBookUsecase.
func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) {
orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs()
if err != nil {
return nil, false, types.FailedGetAllCanonicalOrderbookPoolIDsError{Err: err}
}

type orderbookResult struct {
isBestEffort bool
orderbookID uint64
limitOrders []orderbookdomain.LimitOrder
err error
}

results := make(chan orderbookResult, len(orderbooks))
results := make(chan orderbookdomain.OrderbookResult, len(orderbooks))

// Process orderbooks concurrently
for _, orderbook := range orderbooks {
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address)

results <- orderbookResult{
isBestEffort: isBestEffort,
orderbookID: orderbook.PoolID,
limitOrders: limitOrders,
err: err,
results <- orderbookdomain.OrderbookResult{
IsBestEffort: isBestEffort,
PoolID: orderbook.PoolID,
LimitOrders: limitOrders,
Error: err,
}
}(orderbook)
}
Expand All @@ -174,14 +237,14 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri
for i := 0; i < len(orderbooks); i++ {
select {
case result := <-results:
if result.err != nil {
if result.Error != nil {
telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("orderbook_id", result.orderbookID), zap.Any("err", result.err))
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", result.PoolID), zap.Any("err", result.Error))
}

isBestEffort = isBestEffort || result.isBestEffort
isBestEffort = isBestEffort || result.IsBestEffort

finalResults = append(finalResults, result.limitOrders...)
finalResults = append(finalResults, result.LimitOrders...)
case <-ctx.Done():
return nil, false, ctx.Err()
}
Expand Down
Loading
Loading