-
Notifications
You must be signed in to change notification settings - Fork 27
/
handler.go
104 lines (90 loc) · 2.85 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package graphqlws
import (
"net/http"
"sync"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
// HandlerConfig stores the configuration of a GraphQL WebSocket handler.
type HandlerConfig struct {
SubscriptionManager SubscriptionManager
Authenticate AuthenticateFunc
}
// NewHandler creates a WebSocket handler for GraphQL WebSocket connections.
// This handler takes a SubscriptionManager and adds/removes subscriptions
// as they are started/stopped by the client.
func NewHandler(config HandlerConfig) http.Handler {
// Create a WebSocket upgrader that requires clients to implement
// the "graphql-ws" protocol
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
Subprotocols: []string{"graphql-ws"},
}
logger := NewLogger("handler")
subscriptionManager := config.SubscriptionManager
// Create a map (used like a set) to manage client connections
var connections = make(map[Connection]bool)
connlock := sync.Mutex{}
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// Establish a WebSocket connection
var ws, err = upgrader.Upgrade(w, r, nil)
// Bail out if the WebSocket connection could not be established
if err != nil {
logger.Warn("Failed to establish WebSocket connection", err)
return
}
// Close the connection early if it doesn't implement the graphql-ws protocol
if ws.Subprotocol() != "graphql-ws" {
logger.Warn("Connection does not implement the GraphQL WS protocol")
ws.Close()
return
}
// Establish a GraphQL WebSocket connection
conn := NewConnection(ws, ConnectionConfig{
Authenticate: config.Authenticate,
EventHandlers: ConnectionEventHandlers{
Close: func(conn Connection) {
logger.WithFields(log.Fields{
"conn": conn.ID(),
"user": conn.User(),
}).Debug("Closing connection")
subscriptionManager.RemoveSubscriptions(conn)
connlock.Lock()
defer connlock.Unlock()
delete(connections, conn)
},
StartOperation: func(
conn Connection,
opID string,
data *StartMessagePayload,
) []error {
logger.WithFields(log.Fields{
"conn": conn.ID(),
"op": opID,
"user": conn.User(),
}).Debug("Start operation")
return subscriptionManager.AddSubscription(conn, &Subscription{
ID: opID,
Query: data.Query,
Variables: data.Variables,
OperationName: data.OperationName,
Connection: conn,
SendData: func(data *DataMessagePayload) {
conn.SendData(opID, data)
},
})
},
StopOperation: func(conn Connection, opID string) {
subscriptionManager.RemoveSubscription(conn, &Subscription{
ID: opID,
})
},
},
})
connlock.Lock()
defer connlock.Unlock()
connections[conn] = true
},
)
}