-
Notifications
You must be signed in to change notification settings - Fork 22
/
queryerMultiOp.go
executable file
·133 lines (113 loc) · 3.51 KB
/
queryerMultiOp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package graphql
import (
"context"
"encoding/json"
"errors"
"net/http"
"time"
"github.com/graph-gophers/dataloader"
"github.com/mitchellh/mapstructure"
)
// MultiOpQueryer is a queryer that will batch subsequent query on some interval into a single network request
// to a single target
type MultiOpQueryer struct {
MaxBatchSize int
BatchInterval time.Duration
// internals for bundling queries
queryer *NetworkQueryer
loader *dataloader.Loader
}
// NewMultiOpQueryer returns a MultiOpQueryer with the provided parameters
func NewMultiOpQueryer(url string, interval time.Duration, maxBatchSize int) *MultiOpQueryer {
queryer := &MultiOpQueryer{
MaxBatchSize: maxBatchSize,
BatchInterval: interval,
}
// instantiate a dataloader we can use for queries
queryer.loader = dataloader.NewBatchedLoader(
queryer.loadQuery,
dataloader.WithCache(&dataloader.NoCache{}),
dataloader.WithWait(interval),
dataloader.WithBatchCapacity(maxBatchSize),
)
// instantiate a network queryer we can use later
queryer.queryer = &NetworkQueryer{
URL: url,
}
// we're done creating the queryer
return queryer
}
// WithMiddlewares lets the user assign middlewares to the queryer
func (q *MultiOpQueryer) WithMiddlewares(mwares []NetworkMiddleware) Queryer {
q.queryer.Middlewares = mwares
return q
}
// WithHTTPClient lets the user configure the client to use when making network requests
func (q *MultiOpQueryer) WithHTTPClient(client *http.Client) Queryer {
q.queryer.Client = client
return q
}
// Query bundles queries that happen within the given interval into a single network request
// whose body is a list of the operation payload.
func (q *MultiOpQueryer) Query(ctx context.Context, input *QueryInput, receiver interface{}) error {
// process the input
result, err := q.loader.Load(ctx, input)()
if err != nil {
return err
}
unmarshaled, ok := result.(map[string]interface{})
if !ok {
return errors.New("Result from dataloader was not an object")
}
// format the result as needed
// assign the result under the data key to the receiver
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
TagName: "json",
Result: receiver,
})
if err != nil {
return err
}
if err := decoder.Decode(unmarshaled["data"]); err != nil {
return err
}
return q.queryer.ExtractErrors(unmarshaled)
}
func (q *MultiOpQueryer) loadQuery(ctx context.Context, keys dataloader.Keys) []*dataloader.Result {
// a place to store the results
results := []*dataloader.Result{}
// the keys serialize to the correct representation
payload, err := json.Marshal(keys)
if err != nil {
// we need to result the same error for each result
for range keys {
results = append(results, &dataloader.Result{Error: err})
}
return results
}
// send the payload to the server
response, err := q.queryer.SendQuery(ctx, payload)
if err != nil {
// we need to result the same error for each result
for range keys {
results = append(results, &dataloader.Result{Error: err})
}
return results
}
// a place to handle each result
queryResults := []map[string]interface{}{}
err = json.Unmarshal(response, &queryResults)
if err != nil {
// we need to result the same error for each result
for range keys {
results = append(results, &dataloader.Result{Error: err})
}
return results
}
// take the result from the query and turn it into something dataloader is okay with
for _, result := range queryResults {
results = append(results, &dataloader.Result{Data: result})
}
// return the results
return results
}