-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
98 lines (77 loc) · 2.37 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package xfeed
import (
"context"
"github.com/pkg/errors"
"github.com/x-feed/x-feed-sdk-golang/pkg/logging"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// Client represents X-feed client
type Client struct {
conn *grpc.ClientConn
cfg Config
// context and cancel func used to cancel all operations and gracefully stop client
ctx context.Context
cancel context.CancelFunc
logger logging.Logger
session *Session
}
// NewClient provides casual way for creating the Client instance
func NewClient(cfg Config, logger logging.Logger) (*Client, error) {
client := &Client{
cfg: cfg,
logger: logger,
}
keepaliveCfg := keepalive.ClientParameters{
Time: cfg.InactiveTimeout,
Timeout: cfg.KeepAliveTimeout,
PermitWithoutStream: cfg.PermitWithoutStream,
}
opts := []grpc.DialOption{
grpc.WithInsecure(), //TODO: discuss with x-feed team and fix security
grpc.WithKeepaliveParams(keepaliveCfg),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(cfg.MaxMessageSize),
grpc.MaxCallSendMsgSize(cfg.MaxMessageSize),
),
}
client.ctx, client.cancel = context.WithCancel(context.Background())
var err error
client.conn, err = grpc.DialContext(client.ctx, cfg.ServerURI, opts...)
if err != nil {
return nil, errors.Errorf("grpc dial err: %v", err)
}
client.logger.Debugf("connection successful to host %s", cfg.ServerURI)
go func() {
<-client.ctx.Done()
err := client.conn.Close()
if err != nil {
client.logger.Errorf("connection close error %v", err)
}
}()
var statusChangeHandler = func(ConnectionStatus) {}
if cfg.StatusChangeHandler != nil {
statusChangeHandler = cfg.StatusChangeHandler
}
client.session = &Session{
clientID: cfg.ClientID,
statusChangeHandler: statusChangeHandler,
currentStatus: ConnectionStatus{
EventsStreamStatus: StatusRed,
SettlementsStream: StatusRed,
},
clientConn: client.conn,
requestTimeout: cfg.RequestDeadline,
logger: client.logger,
limiter: rate.NewLimiter(rate.Limit(cfg.RequestRateLimit), cfg.RequestRateLimitBurst),
}
return client, nil
}
// Session returns instance of session in case where grpc connection is ready
func (c *Client) Session() (*Session, error) {
if c == nil || c.session == nil {
return nil, errors.New("client is not initialized")
}
return c.session, nil
}