-
Notifications
You must be signed in to change notification settings - Fork 12
/
eager_router.go
148 lines (128 loc) · 4.37 KB
/
eager_router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package fiber
import (
"context"
"github.com/gojek/fiber/errors"
"github.com/gojek/fiber/util"
)
// EagerRouter implements Router interface and performs routing of incoming requests
// based on the routing strategy.
// The reason why it's 'eager' is because it dispatches incoming request by its every
// possible route in parallel and then returns either a response from a primary route
// (defined by the routing strategy) or switches back to one of fallback options.
//
// In a sense, EagerRouter is a Combiner, that aggregates responses from its all routes
// into a single response by selecting this response based on a provided RoutingStrategy
type EagerRouter struct {
*Combiner
}
// NewEagerRouter initializes new EagerRouter
func NewEagerRouter(id string) *EagerRouter {
if id == "" {
id = "eager-router_" + util.UID()
}
return &EagerRouter{
Combiner: NewCombiner(id),
}
}
// SetStrategy sets routing strategy for this router
func (router *EagerRouter) SetStrategy(strategy RoutingStrategy) {
router.WithFanIn(&eagerRouterFanIn{
BaseFanIn{},
&baseRoutingStrategy{RoutingStrategy: strategy},
router})
}
// EagerRouter's specific FanIn implementation
// It receives the channel with responses from all possible router routes and asynchronously
// retrieves information about primary route and the order of fallbacks to be used.
//
// This FanIn doesn't wait for responses from all of the routes, but returns a response
// as soon, as the preferred order of routes is known and a successful response from
// the primary route is received.
// In case if the response from the primary route is not successful, then the first successful
// response from fallback routes will be sent back.
// If primary route AND all fallback routes responded with not non-successful responses, the error
// response will be created and sent back.
type eagerRouterFanIn struct {
BaseFanIn
strategy *baseRoutingStrategy
router *EagerRouter
}
func (fanIn *eagerRouterFanIn) Aggregate(
ctx context.Context,
req Request,
queue ResponseQueue,
) Response {
// use routing strategy to fetch primary route and fallbacks
// publish the ordered routes into a channel
routesOrderCh := fanIn.strategy.getRoutesOrder(ctx, req, fanIn.router.GetRoutes())
out := make(chan Response, 1)
go func() {
defer close(out)
var (
// map to temporary store responseQueue from the routes
responses = make(map[string]Response)
// routes, ordered according to their priority
// would be initialized from a routesOrderCh channel
routes []Component
// response labels
labels Labels = NewLabelsMap()
// index of current primary route
currentRouteIdx int
responseCh = queue.Iter()
masterResponse Response
)
for masterResponse == nil {
select {
case resp, ok := <-responseCh:
if ok {
responses[resp.BackendName()] = resp
} else {
responseCh = nil
}
case routesOrderResponse, ok := <-routesOrderCh:
if ok {
labels = routesOrderResponse.Labels
if routesOrderResponse.Err != nil {
masterResponse = NewErrorResponse(errors.NewFiberError(req.Protocol(), routesOrderResponse.Err))
} else {
routes = routesOrderResponse.Components
}
} else {
routesOrderCh = nil
}
case <-ctx.Done():
if routes == nil {
// timeout exceeded, but no routes received. Sending error response
masterResponse = NewErrorResponse(errors.ErrRouterStrategyTimeoutExceeded(req.Protocol()))
} else {
// timeout exceeded
responseCh = nil
}
}
if routes != nil {
for ; currentRouteIdx < len(routes); currentRouteIdx++ {
if currMasterResponse, exist := responses[routes[currentRouteIdx].ID()]; exist {
if currMasterResponse.IsSuccess() {
// preferred response found
masterResponse = currMasterResponse
break
}
} else if responseCh != nil {
// response from preferred route is not ready; continue listening for new responseQueue
break
}
}
// all expected routes tried, no OK response received from either of them
if currentRouteIdx >= len(routes) {
if len(routes) == 0 {
masterResponse = NewErrorResponse(errors.ErrRouterStrategyReturnedEmptyRoutes(req.Protocol()))
} else {
masterResponse = NewErrorResponse(errors.ErrNoValidResponseFromRoutes(req.Protocol()))
}
}
}
}
out <- masterResponse.WithLabels(labels)
}()
return <-out
}