Skip to content

Commit

Permalink
feat: added spot ws service, moved client to common, refactoring (#633)
Browse files Browse the repository at this point in the history
Co-authored-by: Artur Abelian <[email protected]>
  • Loading branch information
vigodsky and Artur Abelian authored Nov 25, 2024
1 parent fce4cf2 commit 0d0d7ba
Show file tree
Hide file tree
Showing 15 changed files with 1,126 additions and 275 deletions.
151 changes: 56 additions & 95 deletions v2/futures/client_ws.go → v2/common/websocket/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package futures
package websocket

import (
"context"
Expand All @@ -13,11 +13,9 @@ import (

"github.com/gorilla/websocket"
"github.com/jpillora/backoff"

"github.com/adshao/go-binance/v2/common"
)

//go:generate mockgen -source client_ws.go -destination mock/client_ws.go -package mock
//go:generate mockgen -source client.go -destination mock/client.go -package mock

const (
// reconnectMinInterval define reconnect min interval
Expand All @@ -33,22 +31,21 @@ var (

// ErrorWsIdAlreadySent defines that request with the same id was already sent
ErrorWsIdAlreadySent = errors.New("ws error: request with same id already sent")

// KeepAlivePingDeadline defines deadline to send ping frame
KeepAlivePingDeadline = 10 * time.Second
)

// messageId define id field of request/response
type messageId struct {
Id string `json:"id"`
}

// ClientWs define API websocket client
type ClientWs struct {
APIKey string
SecretKey string
// client define API websocket client
type client struct {
Debug bool
KeyType string
TimeOffset int64
logger *log.Logger
conn wsConnection
conn Connection
connMu sync.Mutex
reconnectSignal chan struct{}
connectionEstablishedSignal chan struct{}
Expand All @@ -58,18 +55,15 @@ type ClientWs struct {
reconnectCount int64
}

func (c *ClientWs) debug(format string, v ...interface{}) {
func (c *client) debug(format string, v ...interface{}) {
if c.Debug {
c.logger.Println(fmt.Sprintf(format, v...))
}
}

// NewClientWs init ClientWs
func NewClientWs(conn wsConnection, apiKey, secretKey string) (*ClientWs, error) {
client := &ClientWs{
APIKey: apiKey,
SecretKey: secretKey,
KeyType: common.KeyTypeHmac,
// NewClient init client
func NewClient(conn Connection) (Client, error) {
client := &client{
logger: log.New(os.Stderr, "Binance-golang ", log.LstdFlags),
conn: conn,
connMu: sync.Mutex{},
Expand All @@ -86,21 +80,17 @@ func NewClientWs(conn wsConnection, apiKey, secretKey string) (*ClientWs, error)
return client, nil
}

type wsClient interface {
type Client interface {
Write(id string, data []byte) error
WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error)
GetReadChannel() <-chan []byte
GetReadErrorChannel() <-chan error
GetApiKey() string
GetSecretKey() string
GetTimeOffset() int64
GetKeyType() string
GetReconnectCount() int64
Wait(timeout time.Duration)
}

// Write sends data into websocket connection
func (c *ClientWs) Write(id string, data []byte) error {
func (c *client) Write(id string, data []byte) error {
c.connMu.Lock()
defer c.connMu.Unlock()

Expand All @@ -120,7 +110,7 @@ func (c *ClientWs) Write(id string, data []byte) error {

// WriteSync sends data to the websocket connection and waits for a response synchronously
// Should be used separately from the asynchronous Write method (do not send anything in parallel)
func (c *ClientWs) WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) {
func (c *client) WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) {
c.connMu.Lock()
defer c.connMu.Unlock()

Expand Down Expand Up @@ -157,36 +147,20 @@ func (c *ClientWs) WriteSync(id string, data []byte, timeout time.Duration) ([]b
}
}

func (c *ClientWs) GetReadChannel() <-chan []byte {
func (c *client) GetReadChannel() <-chan []byte {
return c.readC
}

func (c *ClientWs) GetReadErrorChannel() <-chan error {
func (c *client) GetReadErrorChannel() <-chan error {
return c.readErrChan
}

func (c *ClientWs) GetApiKey() string {
return c.APIKey
}

func (c *ClientWs) GetSecretKey() string {
return c.SecretKey
}

func (c *ClientWs) GetTimeOffset() int64 {
return c.TimeOffset
}

func (c *ClientWs) GetKeyType() string {
return c.KeyType
}

func (c *ClientWs) Wait(timeout time.Duration) {
func (c *client) Wait(timeout time.Duration) {
c.wait(timeout)
}

// read data from connection
func (c *ClientWs) read() {
func (c *client) read() {
defer func() {
// reading from closed connection 1000 times caused panic
// prevent panic for any case
Expand Down Expand Up @@ -231,7 +205,7 @@ func (c *ClientWs) read() {

// wait until all responses received
// make sure that you are not sending requests
func (c *ClientWs) wait(timeout time.Duration) {
func (c *client) wait(timeout time.Duration) {
doneC := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -260,7 +234,7 @@ func (c *ClientWs) wait(timeout time.Duration) {
}

// handleReconnect waits for reconnect signal and starts reconnect
func (c *ClientWs) handleReconnect() {
func (c *client) handleReconnect() {
for _ = range c.reconnectSignal {
c.debug("reconnect: received signal")

Expand All @@ -285,10 +259,10 @@ func (c *ClientWs) handleReconnect() {
}

// startReconnect starts reconnect loop with increasing delay
func (c *ClientWs) startReconnect(b *backoff.Backoff) *connection {
func (c *client) startReconnect(b *backoff.Backoff) Connection {
for {
atomic.AddInt64(&c.reconnectCount, 1)
conn, err := newConnection()
conn, err := c.conn.RestoreConnection()
if err != nil {
delay := b.Duration()
c.debug("reconnect: error while reconnecting. try in %s", delay.Round(time.Millisecond))
Expand All @@ -301,7 +275,9 @@ func (c *ClientWs) startReconnect(b *backoff.Backoff) *connection {
}

// GetReconnectCount returns reconnect counter value
func (c *ClientWs) GetReconnectCount() int64 { return atomic.LoadInt64(&c.reconnectCount) }
func (c *client) GetReconnectCount() int64 {
return atomic.LoadInt64(&c.reconnectCount)
}

// NewRequestList creates request list
func NewRequestList() RequestList {
Expand Down Expand Up @@ -356,37 +332,47 @@ func (l *RequestList) IsAlreadyInList(id string) bool {
return false
}

// constructor for connection
func newConnection() (*connection, error) {
conn, err := WsApiInitReadWriteConn()
// NewConnection constructor for connection
func NewConnection(
initUnderlyingWsConnFn func() (*websocket.Conn, error),
isKeepAliveNeeded bool,
keepaliveTimeout time.Duration,
) (Connection, error) {
underlyingWsConn, err := initUnderlyingWsConnFn()
if err != nil {
return nil, err
}

wsConn := &connection{
conn: conn,
connectionMu: sync.Mutex{},
lastResponseMu: sync.Mutex{},
conn: underlyingWsConn,
connectionMu: sync.Mutex{},
lastResponseMu: sync.Mutex{},
initUnderlyingWsConnFn: initUnderlyingWsConnFn,
keepaliveTimeout: keepaliveTimeout,
}

if WebsocketKeepalive {
go wsConn.keepAlive(WebsocketTimeoutReadWriteConnection)
if isKeepAliveNeeded {
go wsConn.keepAlive(keepaliveTimeout)
}

return wsConn, nil
}

// instance of single connection with keepalive handler
// connection is an instance of single ws connection with keepalive handler
type connection struct {
conn *websocket.Conn
connectionMu sync.Mutex
lastResponse time.Time
lastResponseMu sync.Mutex
conn *websocket.Conn
connectionMu sync.Mutex
lastResponse time.Time
lastResponseMu sync.Mutex
initUnderlyingWsConnFn func() (*websocket.Conn, error)
keepaliveTimeout time.Duration
isKeepAliveNeeded bool
}

type wsConnection interface {
type Connection interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
RestoreConnection() (Connection, error)
}

// WriteMessage is a thread-safe method for conn.WriteMessage
Expand All @@ -401,6 +387,11 @@ func (c *connection) ReadMessage() (int, []byte, error) {
return c.conn.ReadMessage()
}

// RestoreConnection recreates ws connection with the same underlying connection callback and keepalive timeout
func (c *connection) RestoreConnection() (Connection, error) {
return NewConnection(c.initUnderlyingWsConnFn, c.isKeepAliveNeeded, c.keepaliveTimeout)
}

// keepAlive handles ping-pong for connection
func (c *connection) keepAlive(timeout time.Duration) {
ticker := time.NewTicker(timeout)
Expand Down Expand Up @@ -455,41 +446,11 @@ func (c *connection) ping() error {
c.connectionMu.Lock()
defer c.connectionMu.Unlock()

deadline := time.Now().Add(10 * time.Second)
deadline := time.Now().Add(KeepAlivePingDeadline)
err := c.conn.WriteControl(websocket.PingMessage, []byte{}, deadline)
if err != nil {
return err
}

return nil
}

// NewOrderPlaceWsService init OrderPlaceWsService
func NewOrderPlaceWsService(apiKey, secretKey string) (*OrderPlaceWsService, error) {
conn, err := newConnection()
if err != nil {
return nil, err
}

client, err := NewClientWs(conn, apiKey, secretKey)
if err != nil {
return nil, err
}

return &OrderPlaceWsService{c: client}, nil
}

// NewOrderCancelWsService init OrderCancelWsService
func NewOrderCancelWsService(apiKey, secretKey string) (*OrderCancelWsService, error) {
conn, err := newConnection()
if err != nil {
return nil, err
}

client, err := NewClientWs(conn, apiKey, secretKey)
if err != nil {
return nil, err
}

return &OrderCancelWsService{c: client}, nil
}
Loading

0 comments on commit 0d0d7ba

Please sign in to comment.