-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathhttp.go
222 lines (195 loc) · 5.31 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package toolbelt
import (
"bytes"
"context"
"fmt"
"log"
"log/slog"
"net/http"
"strings"
"time"
"github.com/CAFxX/httpcompression"
"github.com/cenkalti/backoff"
"github.com/go-rod/rod"
"github.com/go-rod/rod/lib/launcher"
)
func RunHotReload(port int, onStartPath string) CtxErrFunc {
return func(ctx context.Context) error {
onStartPath = strings.TrimPrefix(onStartPath, "/")
localHost := fmt.Sprintf("http://localhost:%d", port)
localURLToLoad := fmt.Sprintf("%s/%s", localHost, onStartPath)
// Make sure page is ready before we start
backoff := backoff.NewExponentialBackOff()
for {
if _, err := http.Get(localURLToLoad); err == nil {
break
}
d := backoff.NextBackOff()
log.Printf("Server not ready. Retrying in %v", d)
time.Sleep(d)
}
// Launch browser in user mode, so we can reuse the same browser session
wsURL := launcher.NewUserMode().MustLaunch()
browser := rod.New().ControlURL(wsURL).MustConnect().NoDefaultDevice()
// Get the current pages
pages, err := browser.Pages()
if err != nil {
return fmt.Errorf("failed to get pages: %w", err)
}
var page *rod.Page
for _, p := range pages {
info, err := p.Info()
if err != nil {
return fmt.Errorf("failed to get page info: %w", err)
}
// If we already have the page open, just reload it
if strings.HasPrefix(info.URL, localHost) {
p.MustActivate().MustReload()
page = p
break
}
}
if page == nil {
// Otherwise, open a new page
page = browser.MustPage(localURLToLoad)
}
slog.Info("page loaded", "url", localURLToLoad, "page", page.TargetID)
return nil
}
}
func CompressMiddleware() func(next http.Handler) http.Handler {
compress, err := httpcompression.DefaultAdapter()
if err != nil {
panic(err)
}
return compress
}
type ServerSentEventsHandler struct {
w http.ResponseWriter
flusher http.Flusher
usingCompression bool
compressionMinBytes int
shouldLogPanics bool
hasPanicked bool
}
func NewSSE(w http.ResponseWriter, r *http.Request) *ServerSentEventsHandler {
flusher, ok := w.(http.Flusher)
if !ok {
panic("response writer does not support flushing")
}
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
flusher.Flush()
return &ServerSentEventsHandler{
w: w,
flusher: flusher,
usingCompression: len(r.Header.Get("Accept-Encoding")) > 0,
compressionMinBytes: 256,
shouldLogPanics: true,
}
}
type SSEEvent struct {
Id string
Event string
Data []string
Retry time.Duration
SkipMinBytesCheck bool
}
type SSEEventOption func(*SSEEvent)
func WithSSEId(id string) SSEEventOption {
return func(e *SSEEvent) {
e.Id = id
}
}
func WithSSEEvent(event string) SSEEventOption {
return func(e *SSEEvent) {
e.Event = event
}
}
func WithSSERetry(retry time.Duration) SSEEventOption {
return func(e *SSEEvent) {
e.Retry = retry
}
}
func WithSSESkipMinBytesCheck(skip bool) SSEEventOption {
return func(e *SSEEvent) {
e.SkipMinBytesCheck = skip
}
}
func (sse *ServerSentEventsHandler) Send(data string, opts ...SSEEventOption) {
sse.SendMultiData([]string{data}, opts...)
}
func (sse *ServerSentEventsHandler) SendMultiData(dataArr []string, opts ...SSEEventOption) {
if sse.hasPanicked && len(dataArr) > 0 {
return
}
defer func() {
// Can happen if the client closes the connection or
// other middleware panics during flush (such as compression)
// Not ideal, but we can't do much about it
if r := recover(); r != nil && sse.shouldLogPanics {
sse.hasPanicked = true
log.Printf("recovered from panic: %v", r)
}
}()
evt := SSEEvent{
Id: fmt.Sprintf("%d", NextID()),
Event: "",
Data: dataArr,
Retry: time.Second,
}
for _, opt := range opts {
opt(&evt)
}
totalSize := 0
if evt.Event != "" {
evtFmt := fmt.Sprintf("event: %s\n", evt.Event)
eventSize, err := sse.w.Write([]byte(evtFmt))
if err != nil {
panic(fmt.Sprintf("failed to write event: %v", err))
}
totalSize += eventSize
}
if evt.Id != "" {
idFmt := fmt.Sprintf("id: %s\n", evt.Id)
idSize, err := sse.w.Write([]byte(idFmt))
if err != nil {
panic(fmt.Sprintf("failed to write id: %v", err))
}
totalSize += idSize
}
if evt.Retry.Milliseconds() > 0 {
retryFmt := fmt.Sprintf("retry: %d\n", evt.Retry.Milliseconds())
retrySize, err := sse.w.Write([]byte(retryFmt))
if err != nil {
panic(fmt.Sprintf("failed to write retry: %v", err))
}
totalSize += retrySize
}
newLineBuf := []byte("\n")
lastDataIdx := len(evt.Data) - 1
for i, d := range evt.Data {
dataFmt := fmt.Sprintf("data: %s", d)
dataSize, err := sse.w.Write([]byte(dataFmt))
if err != nil {
panic(fmt.Sprintf("failed to write data: %v", err))
}
totalSize += dataSize
if i != lastDataIdx {
if !evt.SkipMinBytesCheck {
newlineSuffixCount := 3
if sse.usingCompression && totalSize+newlineSuffixCount < sse.compressionMinBytes {
bufSize := sse.compressionMinBytes - totalSize - newlineSuffixCount
buf := bytes.Repeat([]byte(" "), bufSize)
if _, err := sse.w.Write(buf); err != nil {
panic(fmt.Sprintf("failed to write data: %v", err))
}
}
}
}
sse.w.Write(newLineBuf)
}
sse.w.Write([]byte("\n\n"))
sse.flusher.Flush()
}