diff --git a/app/sidecar_query_server.go b/app/sidecar_query_server.go index 8f04898e..b0696f95 100644 --- a/app/sidecar_query_server.go +++ b/app/sidecar_query_server.go @@ -203,7 +203,7 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo // HTTP handlers poolsHttpDelivery.NewPoolsHandler(e, poolsUseCase) - passthroughHttpDelivery.NewPassthroughHandler(e, passthroughUseCase, orderBookUseCase) + passthroughHttpDelivery.NewPassthroughHandler(e, passthroughUseCase, orderBookUseCase, logger) systemhttpdelivery.NewSystemHandler(e, config, logger, chainInfoUseCase) if err := tokenshttpdelivery.NewTokensHandler(e, *config.Pricing, tokensUseCase, pricingSimpleRouterUsecase, logger); err != nil { return nil, err diff --git a/delivery/http/event.go b/delivery/http/event.go new file mode 100644 index 00000000..e158e7bc --- /dev/null +++ b/delivery/http/event.go @@ -0,0 +1,98 @@ +package http + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/labstack/echo/v4" +) + +// Event represents Server-Sent Event. +// SSE explanation: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format +type Event struct { + // ID is used to set the EventSource object's last event ID value. + ID []byte + // Data field is for the message. When the EventSource receives multiple consecutive lines + // that begin with data:, it concatenates them, inserting a newline character between each one. + // Trailing newlines are removed. + Data []byte + // Event is a string identifying the type of event described. If this is specified, an event + // will be dispatched on the browser to the listener for the specified event name; the website + // source code should use addEventListener() to listen for named events. The onmessage handler + // is called if no event name is specified for a message. + Event []byte + // Retry is the reconnection time. If the connection to the server is lost, the browser will + // wait for the specified time before attempting to reconnect. This must be an integer, specifying + // the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored. + Retry []byte + // Comment line can be used to prevent connections from timing out; a server can send a comment + // periodically to keep the connection alive. + Comment []byte +} + +// MarshalTo marshals Event to given Writer +func (ev *Event) MarshalTo(w io.Writer) error { + // Marshalling part is taken from: https://github.com/r3labs/sse/blob/c6d5381ee3ca63828b321c16baa008fd6c0b4564/http.go#L16 + if len(ev.Data) == 0 && len(ev.Comment) == 0 { + return nil + } + + if len(ev.Data) > 0 { + if _, err := fmt.Fprintf(w, "id: %s\n", ev.ID); err != nil { + return err + } + + sd := bytes.Split(ev.Data, []byte("\n")) + for i := range sd { + if _, err := fmt.Fprintf(w, "data: %s\n", sd[i]); err != nil { + return err + } + } + + if len(ev.Event) > 0 { + if _, err := fmt.Fprintf(w, "event: %s\n", ev.Event); err != nil { + return err + } + } + + if len(ev.Retry) > 0 { + if _, err := fmt.Fprintf(w, "retry: %s\n", ev.Retry); err != nil { + return err + } + } + } + + if len(ev.Comment) > 0 { + if _, err := fmt.Fprintf(w, ": %s\n", ev.Comment); err != nil { + return err + } + } + + if _, err := fmt.Fprint(w, "\n"); err != nil { + return err + } + + return nil +} + +// WriteEvent writes the given data to the given ResponseWriter as an Event. +func WriteEvent(w *echo.Response, data any) error { + b, err := json.Marshal(data) + if err != nil { + return err + } + + event := Event{ + Data: b, + } + + if err := event.MarshalTo(w); err != nil { + return err + } + + w.Flush() + + return nil +} diff --git a/delivery/http/http.go b/delivery/http/http.go index 15cd837d..82073dc3 100644 --- a/delivery/http/http.go +++ b/delivery/http/http.go @@ -2,6 +2,7 @@ package http import ( "github.com/labstack/echo/v4" + "github.com/osmosis-labs/sqs/validator" ) // RequestUnmarshaler is any type capable to unmarshal data from HTTP request to itself. @@ -13,3 +14,17 @@ type RequestUnmarshaler interface { func UnmarshalRequest(c echo.Context, m RequestUnmarshaler) error { return m.UnmarshalHTTPRequest(c) } + +// ParseRequest encapsulates the request unmarshalling and validation logic. +// It unmarshals the request and validates it if the request implements the Validator interface. +func ParseRequest(c echo.Context, req RequestUnmarshaler) error { + if err := UnmarshalRequest(c, req); err != nil { + return err + } + + v, ok := req.(validator.Validator) + if !ok { + return nil + } + return validator.Validate(v) +} diff --git a/delivery/http/trace.go b/delivery/http/trace.go new file mode 100644 index 00000000..f4c91a18 --- /dev/null +++ b/delivery/http/trace.go @@ -0,0 +1,23 @@ +package http + +import ( + "context" + + "github.com/labstack/echo/v4" + "go.opentelemetry.io/otel/trace" +) + +// Span returns the current span from the context. +func Span(c echo.Context) (context.Context, trace.Span) { + ctx := c.Request().Context() + span := trace.SpanFromContext(ctx) + return ctx, span +} + +// RecordSpanError records an error ( if any ) for the span. +func RecordSpanError(ctx context.Context, span trace.Span, err error) { + if err != nil { + span.RecordError(err) + } + // Note: we do not end the span here as it is ended in the middleware. +} diff --git a/domain/mocks/orderbook_usecase_mock.go b/domain/mocks/orderbook_usecase_mock.go index 3928bbe3..65065d59 100644 --- a/domain/mocks/orderbook_usecase_mock.go +++ b/domain/mocks/orderbook_usecase_mock.go @@ -12,9 +12,10 @@ var _ mvc.OrderBookUsecase = &OrderbookUsecaseMock{} // OrderbookUsecaseMock is a mock implementation of the RouterUsecase interface type OrderbookUsecaseMock struct { - ProcessPoolFunc func(ctx context.Context, pool sqsdomain.PoolI) error - GetAllTicksFunc func(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool) - GetActiveOrdersFunc func(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) + ProcessPoolFunc func(ctx context.Context, pool sqsdomain.PoolI) error + GetAllTicksFunc func(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool) + GetActiveOrdersFunc func(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) + GetActiveOrdersStreamFunc func(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult } func (m *OrderbookUsecaseMock) ProcessPool(ctx context.Context, pool sqsdomain.PoolI) error { @@ -37,3 +38,10 @@ func (m *OrderbookUsecaseMock) GetActiveOrders(ctx context.Context, address stri } panic("unimplemented") } + +func (m *OrderbookUsecaseMock) GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult { + if m.GetActiveOrdersStreamFunc != nil { + return m.GetActiveOrdersStreamFunc(ctx, address) + } + panic("unimplemented") +} diff --git a/domain/mvc/orderbook.go b/domain/mvc/orderbook.go index 51b93f1f..4e6cabf4 100644 --- a/domain/mvc/orderbook.go +++ b/domain/mvc/orderbook.go @@ -16,4 +16,9 @@ type OrderBookUsecase interface { // GetOrder returns all active orderbook orders for a given address. GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) + + // GetActiveOrdersStream returns a channel for streaming limit orderbook orders for a given address. + // The caller should range over the channel, but note that channel is never closed since there may be multiple + // sender goroutines. + GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult } diff --git a/domain/orderbook/order.go b/domain/orderbook/order.go index d5c89df8..2248ab27 100644 --- a/domain/orderbook/order.go +++ b/domain/orderbook/order.go @@ -72,6 +72,7 @@ type Asset struct { Decimals int `json:"-"` } +// LimitOrder represents a limit order in the orderbook. type LimitOrder struct { TickId int64 `json:"tick_id"` OrderId int64 `json:"order_id"` @@ -93,3 +94,11 @@ type LimitOrder struct { BaseAsset Asset `json:"base_asset"` PlacedTx *string `json:"placed_tx,omitempty"` } + +// OrderbookResult represents orderbook orders result. +type OrderbookResult struct { + LimitOrders []LimitOrder // The channel on which the orders are delivered. + PoolID uint64 // The PoolID ID. + IsBestEffort bool + Error error +} diff --git a/orderbook/usecase/export_test.go b/orderbook/usecase/export_test.go index f6c419b3..01d01580 100644 --- a/orderbook/usecase/export_test.go +++ b/orderbook/usecase/export_test.go @@ -2,10 +2,17 @@ package orderbookusecase import ( "context" + "time" + "github.com/osmosis-labs/sqs/domain" orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook" ) +// SetFetchActiveOrdersEveryDuration overrides the fetchActiveOrdersDuration for testing purposes +func (o *OrderbookUseCaseImpl) SetFetchActiveOrdersEveryDuration(duration time.Duration) { + fetchActiveOrdersDuration = duration +} + // CreateFormattedLimitOrder is an alias of createFormattedLimitOrder for testing purposes func (o *OrderbookUseCaseImpl) CreateFormattedLimitOrder( poolID uint64, diff --git a/orderbook/usecase/orderbook_usecase.go b/orderbook/usecase/orderbook_usecase.go index c5969daa..8bce75e2 100644 --- a/orderbook/usecase/orderbook_usecase.go +++ b/orderbook/usecase/orderbook_usecase.go @@ -137,6 +137,77 @@ func (o *OrderbookUseCaseImpl) ProcessPool(ctx context.Context, pool sqsdomain.P return nil } +var ( + // fetchActiveOrdersEvery is a duration in which orders are pushed to the client periodically + // This is an arbitrary number selected to avoid spamming the client + fetchActiveOrdersDuration = 10 * time.Second + + // getActiveOrdersStreamChanLen is the length of the channel for active orders stream + // length is arbitrary number selected to avoid blocking + getActiveOrdersStreamChanLen = 50 +) + +// GetActiveOrdersStream implements mvc.OrderBookUsecase. +func (o *OrderbookUseCaseImpl) GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult { + // Result channel + c := make(chan orderbookdomain.OrderbookResult, getActiveOrdersStreamChanLen) + + // Function to fetch orders + fetchOrders := func(ctx context.Context) { + orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs() + if err != nil { + c <- orderbookdomain.OrderbookResult{ + Error: types.FailedGetAllCanonicalOrderbookPoolIDsError{Err: err}, + } + return + } + + for _, orderbook := range orderbooks { + go func(orderbook domain.CanonicalOrderBooksResult) { + limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address) + if len(limitOrders) == 0 && err == nil { + return // skip empty orders + } + + if err != nil { + telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc() + o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", orderbook.PoolID), zap.Any("err", err)) + } + + select { + case c <- orderbookdomain.OrderbookResult{ + PoolID: orderbook.PoolID, + IsBestEffort: isBestEffort, + LimitOrders: limitOrders, + Error: err, + }: + case <-ctx.Done(): + return + } + }(orderbook) + } + } + + // Fetch orders immediately on start + go fetchOrders(ctx) + + // Pull orders periodically based on duration + go func() { + ticker := time.NewTicker(fetchActiveOrdersDuration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + fetchOrders(ctx) + case <-ctx.Done(): + return + } + } + }() + + return c +} + // GetActiveOrders implements mvc.OrderBookUsecase. func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) { orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs() @@ -144,25 +215,17 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri return nil, false, types.FailedGetAllCanonicalOrderbookPoolIDsError{Err: err} } - type orderbookResult struct { - isBestEffort bool - orderbookID uint64 - limitOrders []orderbookdomain.LimitOrder - err error - } - - results := make(chan orderbookResult, len(orderbooks)) + results := make(chan orderbookdomain.OrderbookResult, len(orderbooks)) // Process orderbooks concurrently for _, orderbook := range orderbooks { go func(orderbook domain.CanonicalOrderBooksResult) { limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address) - - results <- orderbookResult{ - isBestEffort: isBestEffort, - orderbookID: orderbook.PoolID, - limitOrders: limitOrders, - err: err, + results <- orderbookdomain.OrderbookResult{ + IsBestEffort: isBestEffort, + PoolID: orderbook.PoolID, + LimitOrders: limitOrders, + Error: err, } }(orderbook) } @@ -174,14 +237,14 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri for i := 0; i < len(orderbooks); i++ { select { case result := <-results: - if result.err != nil { + if result.Error != nil { telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc() - o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("orderbook_id", result.orderbookID), zap.Any("err", result.err)) + o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", result.PoolID), zap.Any("err", result.Error)) } - isBestEffort = isBestEffort || result.isBestEffort + isBestEffort = isBestEffort || result.IsBestEffort - finalResults = append(finalResults, result.limitOrders...) + finalResults = append(finalResults, result.LimitOrders...) case <-ctx.Done(): return nil, false, ctx.Err() } diff --git a/orderbook/usecase/orderbook_usecase_test.go b/orderbook/usecase/orderbook_usecase_test.go index 24711333..1009d41c 100644 --- a/orderbook/usecase/orderbook_usecase_test.go +++ b/orderbook/usecase/orderbook_usecase_test.go @@ -6,6 +6,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" @@ -238,6 +239,209 @@ func (s *OrderbookUsecaseTestSuite) TestProcessPool() { }) } } +func (s *OrderbookUsecaseTestSuite) TestGetActiveOrdersStream() { + testCases := []struct { + name string + address string + setupMocks func(ctx context.Context, cancel context.CancelFunc, usecase *orderbookusecase.OrderbookUseCaseImpl, orderbookrepository *mocks.OrderbookRepositoryMock, grpcclient *mocks.OrderbookGRPCClientMock, poolsUsecase *mocks.PoolsUsecaseMock, tokensusecase *mocks.TokensUsecaseMock, callcount *int) + tickerDuration time.Duration + expectedCallCount int + expectedOrders []orderbookdomain.OrderbookResult + expectedError error + expectedOrderbooks []domain.CanonicalOrderBooksResult + }{ + + { + name: "failed to get all canonical orderbook pool IDs", + address: "osmo1glq2duq5f4x3m88fqwecfrfcuauy8343amy5fm", + setupMocks: func(ctx context.Context, cancel context.CancelFunc, usecase *orderbookusecase.OrderbookUseCaseImpl, orderbookrepository *mocks.OrderbookRepositoryMock, grpcclient *mocks.OrderbookGRPCClientMock, poolsUsecase *mocks.PoolsUsecaseMock, tokensusecase *mocks.TokensUsecaseMock, callcount *int) { + poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = func() ([]domain.CanonicalOrderBooksResult, error) { + return nil, assert.AnError + } + }, + expectedError: &types.FailedGetAllCanonicalOrderbookPoolIDsError{}, + }, + { + name: "skips empty orders", + address: "osmo1npsku4qlqav6udkvgfk9eran4s4edzu69vzdm6", + setupMocks: func(ctx context.Context, cancel context.CancelFunc, usecase *orderbookusecase.OrderbookUseCaseImpl, orderbookrepository *mocks.OrderbookRepositoryMock, grpcclient *mocks.OrderbookGRPCClientMock, poolsUsecase *mocks.PoolsUsecaseMock, tokensusecase *mocks.TokensUsecaseMock, callcount *int) { + poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = s.GetAllCanonicalOrderbookPoolIDsFunc( + nil, + s.NewCanonicalOrderBooksResult(8, "A"), // Non-empty orderbook + s.NewCanonicalOrderBooksResult(1, "B"), // Empty orderbook + ) + + grpcclient.GetActiveOrdersCb = func(ctx context.Context, contractAddress string, ownerAddress string) (orderbookdomain.Orders, uint64, error) { + if contractAddress == "A" { + return orderbookdomain.Orders{s.NewOrder().WithOrderID(5).Order}, 1, nil + } + return nil, 0, nil + } + + tokensusecase.GetMetadataByChainDenomFunc = s.GetMetadataByChainDenomFuncEmptyToken() + + tokensusecase.GetSpotPriceScalingFactorByDenomFunc = s.GetSpotPriceScalingFactorByDenomFunc(1, nil) + + orderbookrepository.GetTickByIDFunc = s.GetTickByIDFunc(s.NewTick("500", 100, "bid"), true) + }, + expectedError: nil, + expectedOrders: []orderbookdomain.OrderbookResult{ + { + + PoolID: 8, + LimitOrders: []orderbookdomain.LimitOrder{ + s.NewLimitOrder().WithOrderID(5).WithOrderbookAddress("A").LimitOrder, // Non-empty orderbook + }, + }, + }, + }, + { + name: "canceled context", + address: "osmo1npsku4qlqav6udkvgfk9eran4s4edzu69vzdm6", + setupMocks: func(ctx context.Context, cancel context.CancelFunc, usecase *orderbookusecase.OrderbookUseCaseImpl, orderbookrepository *mocks.OrderbookRepositoryMock, grpcclient *mocks.OrderbookGRPCClientMock, poolsUsecase *mocks.PoolsUsecaseMock, tokensusecase *mocks.TokensUsecaseMock, callcount *int) { + poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = s.GetAllCanonicalOrderbookPoolIDsFunc( + nil, + s.NewCanonicalOrderBooksResult(1, "F"), + s.NewCanonicalOrderBooksResult(2, "C"), + ) + + grpcclient.GetActiveOrdersCb = func(ctx context.Context, contractAddress string, ownerAddress string) (orderbookdomain.Orders, uint64, error) { + // cancel the context for the F orderbook + if contractAddress == "F" { + go func() { + time.Sleep(1 * time.Second) + defer cancel() + }() + return orderbookdomain.Orders{ + s.NewOrder().WithOrderID(8).Order, + }, 1, nil + } + return orderbookdomain.Orders{ + s.NewOrder().WithOrderID(3).Order, + }, 1, nil + } + + tokensusecase.GetMetadataByChainDenomFunc = s.GetMetadataByChainDenomFuncEmptyToken() + + tokensusecase.GetSpotPriceScalingFactorByDenomFunc = s.GetSpotPriceScalingFactorByDenomFunc(1, nil) + + orderbookrepository.GetTickByIDFunc = s.GetTickByIDFunc(s.NewTick("500", 100, "bid"), true) + }, + expectedError: nil, + expectedOrders: []orderbookdomain.OrderbookResult{ + { + PoolID: 2, + LimitOrders: []orderbookdomain.LimitOrder{ + s.NewLimitOrder().WithOrderID(3).WithOrderbookAddress("C").LimitOrder, + }, + }, + }, + }, + { + name: "ticker should push orders", + address: "osmo1npsku4qlqav6udkvgfk9eran4s4edzu69vzdm6", + setupMocks: func(ctx context.Context, cancel context.CancelFunc, usecase *orderbookusecase.OrderbookUseCaseImpl, orderbookrepository *mocks.OrderbookRepositoryMock, grpcclient *mocks.OrderbookGRPCClientMock, poolsUsecase *mocks.PoolsUsecaseMock, tokensusecase *mocks.TokensUsecaseMock, callcount *int) { + poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = s.GetAllCanonicalOrderbookPoolIDsFunc( + nil, + s.NewCanonicalOrderBooksResult(1, "C"), + ) + + grpcclient.GetActiveOrdersCb = func(ctx context.Context, contractAddress string, ownerAddress string) (orderbookdomain.Orders, uint64, error) { + defer func() { + *callcount++ + }() + return orderbookdomain.Orders{}, 0, nil + } + + tokensusecase.GetMetadataByChainDenomFunc = s.GetMetadataByChainDenomFuncEmptyToken() + + tokensusecase.GetSpotPriceScalingFactorByDenomFunc = s.GetSpotPriceScalingFactorByDenomFunc(1, nil) + + orderbookrepository.GetTickByIDFunc = s.GetTickByIDFunc(s.NewTick("500", 100, "bid"), true) + }, + tickerDuration: time.Second, + expectedCallCount: 2, + expectedError: nil, + }, + { + name: "returns valid orders stream", + address: "osmo1p2pq3dt5xkj39p0420p4mm9l45394xecr00299", + setupMocks: func(ctx context.Context, cancel context.CancelFunc, usecase *orderbookusecase.OrderbookUseCaseImpl, orderbookrepository *mocks.OrderbookRepositoryMock, grpcclient *mocks.OrderbookGRPCClientMock, poolsUsecase *mocks.PoolsUsecaseMock, tokensusecase *mocks.TokensUsecaseMock, callcount *int) { + poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = s.GetAllCanonicalOrderbookPoolIDsFunc(nil, s.NewCanonicalOrderBooksResult(1, "A")) + + grpcclient.GetActiveOrdersCb = s.GetActiveOrdersFunc(orderbookdomain.Orders{ + s.NewOrder().WithOrderID(1).Order, + s.NewOrder().WithOrderID(2).Order, + }, 2, nil) + + tokensusecase.GetMetadataByChainDenomFunc = s.GetMetadataByChainDenomFuncEmptyToken() + + tokensusecase.GetSpotPriceScalingFactorByDenomFunc = s.GetSpotPriceScalingFactorByDenomFunc(1, nil) + + orderbookrepository.GetTickByIDFunc = s.GetTickByIDFunc(s.NewTick("500", 100, "bid"), true) + }, + expectedError: nil, + expectedOrders: []orderbookdomain.OrderbookResult{ + { + PoolID: 1, + LimitOrders: []orderbookdomain.LimitOrder{ + s.NewLimitOrder().WithOrderID(1).WithOrderbookAddress("A").LimitOrder, + s.NewLimitOrder().WithOrderID(2).WithOrderbookAddress("A").LimitOrder, + }, + }, + }, + }, + } + + for _, tc := range testCases { + s.Run(tc.name, func() { + // track the number of times the GetActiveOrdersCb is called + var callcount int + + // Create a context with cancellation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create instances of the mocks + poolsUsecase := mocks.PoolsUsecaseMock{} + orderbookrepositorysitory := mocks.OrderbookRepositoryMock{} + client := mocks.OrderbookGRPCClientMock{} + tokensusecase := mocks.TokensUsecaseMock{} + + // Setup the mocks according to the test case + usecase := orderbookusecase.New(&orderbookrepositorysitory, &client, &poolsUsecase, &tokensusecase, &log.NoOpLogger{}) + if tc.setupMocks != nil { + tc.setupMocks(ctx, cancel, usecase, &orderbookrepositorysitory, &client, &poolsUsecase, &tokensusecase, &callcount) + } + + // Call the method under test + orders := usecase.GetActiveOrdersStream(ctx, tc.address) + + // Wait for the ticker to push the orders + if tc.expectedCallCount > 1 { + usecase.SetFetchActiveOrdersEveryDuration(tc.tickerDuration) + time.Sleep(tc.tickerDuration) + } + + // Collect results from the stream + var actualOrders []orderbookdomain.OrderbookResult + for i := 0; i < len(tc.expectedOrders); i++ { + select { + case <-ctx.Done(): + break + case order := <-orders: + actualOrders = append(actualOrders, order) + } + } + + // Check the expected orders + s.Assert().Equal(tc.expectedOrders, actualOrders) + + // Check expected call count + s.Assert().Equal(tc.expectedCallCount, callcount) + }) + } +} func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() { testCases := []struct { diff --git a/passthrough/delivery/http/passthrough_handler.go b/passthrough/delivery/http/passthrough_handler.go index f328e2d7..235681a7 100644 --- a/passthrough/delivery/http/passthrough_handler.go +++ b/passthrough/delivery/http/passthrough_handler.go @@ -7,16 +7,19 @@ import ( "github.com/osmosis-labs/sqs/domain" "github.com/osmosis-labs/sqs/domain/mvc" _ "github.com/osmosis-labs/sqs/domain/passthrough" + "github.com/osmosis-labs/sqs/log" "github.com/osmosis-labs/sqs/orderbook/types" "github.com/labstack/echo/v4" - "go.opentelemetry.io/otel/trace" + + "go.uber.org/zap" ) // PassthroughHandler is the http handler for passthrough use case type PassthroughHandler struct { PUsecase mvc.PassthroughUsecase OUsecase mvc.OrderBookUsecase + Logger log.Logger } const resourcePrefix = "/passthrough" @@ -26,14 +29,21 @@ func formatPassthroughResource(resource string) string { } // NewPassthroughHandler will initialize the pools/ resources endpoint -func NewPassthroughHandler(e *echo.Echo, ptu mvc.PassthroughUsecase, ou mvc.OrderBookUsecase) { +func NewPassthroughHandler(e *echo.Echo, ptu mvc.PassthroughUsecase, ou mvc.OrderBookUsecase, logger log.Logger) { handler := &PassthroughHandler{ PUsecase: ptu, OUsecase: ou, + Logger: logger, } e.GET(formatPassthroughResource("/portfolio-assets/:address"), handler.GetPortfolioAssetsByAddress) e.GET(formatPassthroughResource("/active-orders"), handler.GetActiveOrders) + e.GET(formatPassthroughResource("/active-orders"), func(c echo.Context) error { + if c.QueryParam("sse") != "" { + return handler.GetActiveOrdersStream(c) // Server-Sent Events (SSE) + } + return handler.GetActiveOrders(c) + }) } // @Summary Returns portfolio assets associated with the given address by category. @@ -61,6 +71,53 @@ func (a *PassthroughHandler) GetPortfolioAssetsByAddress(c echo.Context) error { return c.JSON(http.StatusOK, portfolioAssetsResult) } +func (a *PassthroughHandler) GetActiveOrdersStream(c echo.Context) error { + var ( + req types.GetActiveOrdersRequest + err error + ) + + ctx, span := deliveryhttp.Span(c) + defer func() { + deliveryhttp.RecordSpanError(ctx, span, err) + }() + + if err := deliveryhttp.ParseRequest(c, &req); err != nil { + return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()}) + } + + w := c.Response() + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + ch := a.OUsecase.GetActiveOrdersStream(ctx, req.UserOsmoAddress) + + for { + select { + case <-c.Request().Context().Done(): + return c.NoContent(http.StatusOK) + case orders, ok := <-ch: + if !ok { + return c.NoContent(http.StatusOK) + } + + if orders.Error != nil { + a.Logger.Error("GET "+c.Request().URL.String(), zap.Error(orders.Error)) + } + + err := deliveryhttp.WriteEvent( + w, + types.NewGetAllOrderResponse(orders.LimitOrders, orders.IsBestEffort), + ) + + if err != nil { + a.Logger.Error("GET "+c.Request().URL.String(), zap.Error(err)) + } + } + } +} + // @Summary Returns all active orderbook orders associated with the given address. // @Description The returned data represents all active orders for all orderbooks available for the specified address. // @@ -72,27 +129,18 @@ func (a *PassthroughHandler) GetPortfolioAssetsByAddress(c echo.Context) error { // @Failure 500 {object} domain.ResponseError "Response error" // @Param userOsmoAddress query string true "Osmo wallet address" // @Router /passthrough/active-orders [get] -func (a *PassthroughHandler) GetActiveOrders(c echo.Context) (err error) { - ctx := c.Request().Context() +func (a *PassthroughHandler) GetActiveOrders(c echo.Context) error { + var ( + req types.GetActiveOrdersRequest + err error + ) - span := trace.SpanFromContext(ctx) + ctx, span := deliveryhttp.Span(c) defer func() { - if err != nil { - span.RecordError(err) - // nolint:errcheck // ignore error - c.JSON(domain.GetStatusCode(err), domain.ResponseError{Message: err.Error()}) - } - - // Note: we do not end the span here as it is ended in the middleware. + deliveryhttp.RecordSpanError(ctx, span, err) }() - var req types.GetActiveOrdersRequest - if err := deliveryhttp.UnmarshalRequest(c, &req); err != nil { - return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()}) - } - - // Validate the request - if err := req.Validate(); err != nil { + if err = deliveryhttp.ParseRequest(c, &req); err != nil { return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()}) } diff --git a/passthrough/delivery/http/passthrough_handler_test.go b/passthrough/delivery/http/passthrough_handler_test.go index c07b910e..6a40766f 100644 --- a/passthrough/delivery/http/passthrough_handler_test.go +++ b/passthrough/delivery/http/passthrough_handler_test.go @@ -1,6 +1,7 @@ package http_test import ( + "bytes" "context" "fmt" "net/http" @@ -8,8 +9,10 @@ import ( "strings" "testing" + deliveryhttp "github.com/osmosis-labs/sqs/delivery/http" "github.com/osmosis-labs/sqs/domain/mocks" orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook" + "github.com/osmosis-labs/sqs/log" "github.com/osmosis-labs/sqs/orderbook/types" "github.com/osmosis-labs/sqs/orderbook/usecase/orderbooktesting" passthroughdelivery "github.com/osmosis-labs/sqs/passthrough/delivery/http" @@ -115,3 +118,127 @@ func (s *PassthroughHandlerTestSuite) TestGetActiveOrders() { }) } } + +func (s *PassthroughHandlerTestSuite) TestGetActiveOrdersStream() { + eventData := func(data string) string { + data = strings.ReplaceAll(strings.ReplaceAll(data, "\n", ""), " ", "") + + event := deliveryhttp.Event{ + Data: []byte(data), + } + + w := bytes.NewBuffer(nil) + err := event.MarshalTo(w) + s.Assert().NoError(err) + + return w.String() + } + + testCases := []struct { + name string + queryParams map[string]string + setupMocks func(usecase *mocks.OrderbookUsecaseMock) + expectedStatusCode int + expectedResponse string + }{ + { + name: "validation error: missing userOsmoAddress", + queryParams: map[string]string{}, // missing userOsmoAddress + setupMocks: func(usecase *mocks.OrderbookUsecaseMock) {}, + expectedStatusCode: http.StatusBadRequest, + expectedResponse: fmt.Sprintf(`{"message":"%s"}`+"\n", types.ErrUserOsmoAddressInvalid.Error()), + }, + { + name: "validation error: invalid userOsmoAddress", + queryParams: map[string]string{ + "userOsmoAddress": "notvalid", + }, + setupMocks: func(usecase *mocks.OrderbookUsecaseMock) {}, + expectedStatusCode: http.StatusBadRequest, + expectedResponse: fmt.Sprintf(`{"message":"%s"}`+"\n", types.ErrUserOsmoAddressInvalid.Error()), + }, + { + name: "returns active orders stream", + queryParams: map[string]string{ + "userOsmoAddress": "osmo1ugku28hwyexpljrrmtet05nd6kjlrvr9jz6z00", + }, + setupMocks: func(usecase *mocks.OrderbookUsecaseMock) { + usecase.GetActiveOrdersStreamFunc = func(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult { + ordersCh := make(chan orderbookdomain.OrderbookResult) + go func(c chan orderbookdomain.OrderbookResult) { + c <- orderbookdomain.OrderbookResult{ + LimitOrders: []orderbookdomain.LimitOrder{ + s.NewLimitOrder().WithOrderID(1).LimitOrder, + s.NewLimitOrder().WithOrderID(2).LimitOrder, + }, + IsBestEffort: false, + Error: nil, + } + close(c) + }(ordersCh) + return ordersCh + } + }, + expectedStatusCode: http.StatusOK, + expectedResponse: eventData(s.MustReadFile("../../../orderbook/usecase/orderbooktesting/parsing/active_orders_response.json")), + }, + { + name: "internal server error during stream", + queryParams: map[string]string{ + "userOsmoAddress": "osmo1ev0vtddkl7jlwfawlk06yzncapw2x9quva4wzw", + }, + setupMocks: func(usecase *mocks.OrderbookUsecaseMock) { + ordersCh := make(chan orderbookdomain.OrderbookResult) + usecase.GetActiveOrdersStreamFunc = func(ctx context.Context, address string) <-chan orderbookdomain.OrderbookResult { + go func() { + ordersCh <- orderbookdomain.OrderbookResult{ + LimitOrders: nil, + IsBestEffort: false, + Error: assert.AnError, + } + close(ordersCh) + }() + return ordersCh + } + }, + expectedStatusCode: http.StatusOK, + expectedResponse: "id: \ndata: {\"orders\":[],\"is_best_effort\":false}\n\n", + }, + } + + for _, tc := range testCases { + s.Run(tc.name, func() { + e := echo.New() + req := httptest.NewRequest(echo.GET, "/", nil) + req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON) + q := req.URL.Query() + for k, v := range tc.queryParams { + q.Add(k, v) + } + req.URL.RawQuery = q.Encode() + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + c.Request().Context().Done() + // Set up the mocks + usecase := mocks.OrderbookUsecaseMock{} + if tc.setupMocks != nil { + tc.setupMocks(&usecase) + } + + // Initialize the handler with mocked usecase + handler := passthroughdelivery.PassthroughHandler{ + OUsecase: &usecase, + Logger: &log.NoOpLogger{}, + } + + // Call the method under test + err := handler.GetActiveOrdersStream(c) + s.Assert().NoError(err) + + // Check the response + s.Assert().Equal(tc.expectedStatusCode, rec.Code) + s.Assert().Equal(tc.expectedResponse, rec.Body.String()) + }) + } +} diff --git a/validator/validator.go b/validator/validator.go new file mode 100644 index 00000000..c1b24ae1 --- /dev/null +++ b/validator/validator.go @@ -0,0 +1,11 @@ +package validator + +// Validator is any type capable to validate and having Validate method attached. +type Validator interface { + Validate() error +} + +// Validate validates type v. +func Validate(v Validator) error { + return v.Validate() +}