-
Notifications
You must be signed in to change notification settings - Fork 105
/
option.go
647 lines (565 loc) · 22 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
// Copyright 2021-2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connect
import (
"compress/gzip"
"context"
"io"
"net/http"
)
// A ClientOption configures a [Client].
//
// In addition to any options grouped in the documentation below, remember that
// any [Option] is also a valid ClientOption.
type ClientOption interface {
applyToClient(*clientConfig)
}
// WithAcceptCompression makes a compression algorithm available to a client.
// Clients ask servers to compress responses using any of the registered
// algorithms. The first registered algorithm is treated as the least
// preferred, and the last registered algorithm is the most preferred.
//
// It's safe to use this option liberally: servers will ignore any
// compression algorithms they don't support. To compress requests, pair this
// option with [WithSendCompression]. To remove support for a
// previously-registered compression algorithm, use WithAcceptCompression with
// nil decompressor and compressor constructors.
//
// Clients accept gzipped responses by default, using a compressor backed by the
// standard library's [gzip] package with the default compression level. Use
// [WithSendGzip] to compress requests with gzip.
//
// Calling WithAcceptCompression with an empty name is a no-op.
func WithAcceptCompression(
name string,
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) ClientOption {
return &compressionOption{
Name: name,
CompressionPool: newCompressionPool(newDecompressor, newCompressor),
}
}
// WithClientOptions composes multiple ClientOptions into one.
func WithClientOptions(options ...ClientOption) ClientOption {
return &clientOptionsOption{options}
}
// WithGRPC configures clients to use the HTTP/2 gRPC protocol.
func WithGRPC() ClientOption {
return &grpcOption{web: false}
}
// WithGRPCWeb configures clients to use the gRPC-Web protocol.
func WithGRPCWeb() ClientOption {
return &grpcOption{web: true}
}
// WithProtoJSON configures a client to send JSON-encoded data instead of
// binary Protobuf. It uses the standard Protobuf JSON mapping as implemented
// by [google.golang.org/protobuf/encoding/protojson]: fields are named using
// lowerCamelCase, zero values are omitted, missing required fields are errors,
// enums are emitted as strings, etc.
func WithProtoJSON() ClientOption {
return WithCodec(&protoJSONCodec{codecNameJSON})
}
// WithSendCompression configures the client to use the specified algorithm to
// compress request messages. If the algorithm has not been registered using
// [WithAcceptCompression], the client will return errors at runtime.
//
// Because some servers don't support compression, clients default to sending
// uncompressed requests.
func WithSendCompression(name string) ClientOption {
return &sendCompressionOption{Name: name}
}
// WithSendGzip configures the client to gzip requests. Since clients have
// access to a gzip compressor by default, WithSendGzip doesn't require
// [WithSendCompression].
//
// Some servers don't support gzip, so clients default to sending uncompressed
// requests.
func WithSendGzip() ClientOption {
return WithSendCompression(compressionGzip)
}
// A HandlerOption configures a [Handler].
//
// In addition to any options grouped in the documentation below, remember that
// any [Option] is also a HandlerOption.
type HandlerOption interface {
applyToHandler(*handlerConfig)
}
// WithCompression configures handlers to support a compression algorithm.
// Clients may send messages compressed with that algorithm and/or request
// compressed responses. The [Compressor] and [Decompressor] produced by the
// supplied constructors must use the same algorithm. Internally, Connect pools
// compressors and decompressors.
//
// By default, handlers support gzip using the standard library's
// [compress/gzip] package at the default compression level. To remove support for
// a previously-registered compression algorithm, use WithCompression with nil
// decompressor and compressor constructors.
//
// Calling WithCompression with an empty name is a no-op.
func WithCompression(
name string,
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) HandlerOption {
return &compressionOption{
Name: name,
CompressionPool: newCompressionPool(newDecompressor, newCompressor),
}
}
// WithHandlerOptions composes multiple HandlerOptions into one.
func WithHandlerOptions(options ...HandlerOption) HandlerOption {
return &handlerOptionsOption{options}
}
// WithRecover adds an interceptor that recovers from panics. The supplied
// function receives the context, [Spec], request headers, and the recovered
// value (which may be nil). It must return an error to send back to the
// client. It may also log the panic, emit metrics, or execute other
// error-handling logic. Handler functions must be safe to call concurrently.
//
// To preserve compatibility with [net/http]'s semantics, this interceptor
// doesn't handle panics with [http.ErrAbortHandler].
//
// By default, handlers don't recover from panics. Because the standard
// library's [http.Server] recovers from panics by default, this option isn't
// usually necessary to prevent crashes. Instead, it helps servers collect
// RPC-specific data during panics and send a more detailed error to
// clients.
func WithRecover(handle func(context.Context, Spec, http.Header, any) error) HandlerOption {
return WithInterceptors(&recoverHandlerInterceptor{handle: handle})
}
// WithRequireConnectProtocolHeader configures the Handler to require requests
// using the Connect RPC protocol to include the Connect-Protocol-Version
// header. This ensures that HTTP proxies and net/http middleware can easily
// identify valid Connect requests, even if they use a common Content-Type like
// application/json. However, it makes ad-hoc requests with tools like cURL
// more laborious. Streaming requests are not affected by this option.
//
// This option has no effect if the client uses the gRPC or gRPC-Web protocols.
func WithRequireConnectProtocolHeader() HandlerOption {
return &requireConnectProtocolHeaderOption{}
}
// WithConditionalHandlerOptions allows procedures in the same service to have
// different configurations: for example, one procedure may need a much larger
// WithReadMaxBytes setting than the others.
//
// WithConditionalHandlerOptions takes a function which may inspect each
// procedure's Spec before deciding which options to apply. Returning a nil
// slice is safe.
func WithConditionalHandlerOptions(conditional func(spec Spec) []HandlerOption) HandlerOption {
return &conditionalHandlerOptions{conditional: conditional}
}
// Option implements both [ClientOption] and [HandlerOption], so it can be
// applied both client-side and server-side.
type Option interface {
ClientOption
HandlerOption
}
// WithSchema provides a parsed representation of the schema for an RPC to a
// client or handler. The supplied schema is exposed as [Spec.Schema]. This
// option is typically added by generated code.
//
// For services using protobuf schemas, the supplied schema should be a
// [protoreflect.MethodDescriptor].
func WithSchema(schema any) Option {
return &schemaOption{Schema: schema}
}
// WithRequestInitializer provides a function that initializes a new message.
// It may be used to dynamically construct request messages. It is called on
// server receives to construct the message to be unmarshaled into. The message
// will be a non nil pointer to the type created by the handler. Use the Schema
// field of the [Spec] to determine the type of the message.
func WithRequestInitializer(initializer func(spec Spec, message any) error) HandlerOption {
return &initializerOption{Initializer: initializer}
}
// WithResponseInitializer provides a function that initializes a new message.
// It may be used to dynamically construct response messages. It is called on
// client receives to construct the message to be unmarshaled into. The message
// will be a non nil pointer to the type created by the client. Use the Schema
// field of the [Spec] to determine the type of the message.
func WithResponseInitializer(initializer func(spec Spec, message any) error) ClientOption {
return &initializerOption{Initializer: initializer}
}
// WithCodec registers a serialization method with a client or handler.
// Handlers may have multiple codecs registered, and use whichever the client
// chooses. Clients may only have a single codec.
//
// By default, handlers and clients support binary Protocol Buffer data using
// [google.golang.org/protobuf/proto]. Handlers also support JSON by default,
// using the standard Protobuf JSON mapping. Users with more specialized needs
// may override the default codecs by registering a new codec under the "proto"
// or "json" names. When supplying a custom "proto" codec, keep in mind that
// some unexported, protocol-specific messages are serialized using Protobuf -
// take care to fall back to the standard Protobuf implementation if
// necessary.
//
// Registering a codec with an empty name is a no-op.
func WithCodec(codec Codec) Option {
return &codecOption{Codec: codec}
}
// WithCompressMinBytes sets a minimum size threshold for compression:
// regardless of compressor configuration, messages smaller than the configured
// minimum are sent uncompressed.
//
// The default minimum is zero. Setting a minimum compression threshold may
// improve overall performance, because the CPU cost of compressing very small
// messages usually isn't worth the small reduction in network I/O.
func WithCompressMinBytes(minBytes int) Option {
return &compressMinBytesOption{Min: minBytes}
}
// WithReadMaxBytes limits the performance impact of pathologically large
// messages sent by the other party. For handlers, WithReadMaxBytes limits the size
// of a message that the client can send. For clients, WithReadMaxBytes limits the
// size of a message that the server can respond with. Limits apply to each Protobuf
// message, not to the stream as a whole.
//
// Setting WithReadMaxBytes to zero allows any message size. Both clients and
// handlers default to allowing any request size.
//
// Handlers may also use [http.MaxBytesHandler] to limit the total size of the
// HTTP request stream (rather than the per-message size). Connect handles
// [http.MaxBytesError] specially, so clients still receive errors with the
// appropriate error code and informative messages.
func WithReadMaxBytes(maxBytes int) Option {
return &readMaxBytesOption{Max: maxBytes}
}
// WithSendMaxBytes prevents sending messages too large for the client/handler
// to handle without significant performance overhead. For handlers, WithSendMaxBytes
// limits the size of a message that the handler can respond with. For clients,
// WithSendMaxBytes limits the size of a message that the client can send. Limits
// apply to each message, not to the stream as a whole.
//
// Setting WithSendMaxBytes to zero allows any message size. Both clients and
// handlers default to allowing any message size.
func WithSendMaxBytes(maxBytes int) Option {
return &sendMaxBytesOption{Max: maxBytes}
}
// WithIdempotency declares the idempotency of the procedure. This can determine
// whether a procedure call can safely be retried, and may affect which request
// modalities are allowed for a given procedure call.
//
// In most cases, you should not need to manually set this. It is normally set
// by the code generator for your schema. For protobuf schemas, it can be set like this:
//
// rpc Ping(PingRequest) returns (PingResponse) {
// option idempotency_level = NO_SIDE_EFFECTS;
// }
func WithIdempotency(idempotencyLevel IdempotencyLevel) Option {
return &idempotencyOption{idempotencyLevel: idempotencyLevel}
}
// WithHTTPGet allows Connect-protocol clients to use HTTP GET requests for
// side-effect free unary RPC calls. Typically, the service schema indicates
// which procedures are idempotent (see [WithIdempotency] for an example
// protobuf schema). The gRPC and gRPC-Web protocols are POST-only, so this
// option has no effect when combined with [WithGRPC] or [WithGRPCWeb].
//
// Using HTTP GET requests makes it easier to take advantage of CDNs, caching
// reverse proxies, and browsers' built-in caching. Note, however, that servers
// don't automatically set any cache headers; you can set cache headers using
// interceptors or by adding headers in individual procedure implementations.
//
// By default, all requests are made as HTTP POSTs.
func WithHTTPGet() ClientOption {
return &enableGet{}
}
// WithInterceptors configures a client or handler's interceptor stack. Repeated
// WithInterceptors options are applied in order, so
//
// WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)
//
// Unary interceptors compose like an onion. The first interceptor provided is
// the outermost layer of the onion: it acts first on the context and request,
// and last on the response and error.
//
// Stream interceptors also behave like an onion: the first interceptor
// provided is the outermost wrapper for the [StreamingClientConn] or
// [StreamingHandlerConn]. It's the first to see sent messages and the last to
// see received messages.
//
// Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:
//
// client.Send() client.Receive()
// | ^
// v |
// A --- --- A
// B --- --- B
// : ... ... :
// Y --- --- Y
// Z --- --- Z
// | ^
// v |
// = = = = = = = = = = = = = = = =
// network
// = = = = = = = = = = = = = = = =
// | ^
// v |
// A --- --- A
// B --- --- B
// : ... ... :
// Y --- --- Y
// Z --- --- Z
// | ^
// v |
// handler.Receive() handler.Send()
// | ^
// | |
// '-> handler logic >-'
//
// Note that in clients, Send handles the request message(s) and Receive
// handles the response message(s). For handlers, it's the reverse. Depending
// on your interceptor's logic, you may need to wrap one method in clients and
// the other in handlers.
func WithInterceptors(interceptors ...Interceptor) Option {
return &interceptorsOption{interceptors}
}
// WithOptions composes multiple Options into one.
func WithOptions(options ...Option) Option {
return &optionsOption{options}
}
type schemaOption struct {
Schema any
}
func (o *schemaOption) applyToClient(config *clientConfig) {
config.Schema = o.Schema
}
func (o *schemaOption) applyToHandler(config *handlerConfig) {
config.Schema = o.Schema
}
type initializerOption struct {
Initializer func(spec Spec, message any) error
}
func (o *initializerOption) applyToHandler(config *handlerConfig) {
config.Initializer = maybeInitializer{initializer: o.Initializer}
}
func (o *initializerOption) applyToClient(config *clientConfig) {
config.Initializer = maybeInitializer{initializer: o.Initializer}
}
type maybeInitializer struct {
initializer func(spec Spec, message any) error
}
func (o maybeInitializer) maybe(spec Spec, message any) error {
if o.initializer != nil {
return o.initializer(spec, message)
}
return nil
}
type clientOptionsOption struct {
options []ClientOption
}
func (o *clientOptionsOption) applyToClient(config *clientConfig) {
for _, option := range o.options {
option.applyToClient(config)
}
}
type codecOption struct {
Codec Codec
}
func (o *codecOption) applyToClient(config *clientConfig) {
if o.Codec == nil || o.Codec.Name() == "" {
return
}
config.Codec = o.Codec
}
func (o *codecOption) applyToHandler(config *handlerConfig) {
if o.Codec == nil || o.Codec.Name() == "" {
return
}
config.Codecs[o.Codec.Name()] = o.Codec
}
type compressionOption struct {
Name string
CompressionPool *compressionPool
}
func (o *compressionOption) applyToClient(config *clientConfig) {
o.apply(&config.CompressionNames, config.CompressionPools)
}
func (o *compressionOption) applyToHandler(config *handlerConfig) {
o.apply(&config.CompressionNames, config.CompressionPools)
}
func (o *compressionOption) apply(configuredNames *[]string, configuredPools map[string]*compressionPool) {
if o.Name == "" {
return
}
if o.CompressionPool == nil {
delete(configuredPools, o.Name)
var names []string
for _, name := range *configuredNames {
if name == o.Name {
continue
}
names = append(names, name)
}
*configuredNames = names
return
}
configuredPools[o.Name] = o.CompressionPool
*configuredNames = append(*configuredNames, o.Name)
}
type compressMinBytesOption struct {
Min int
}
func (o *compressMinBytesOption) applyToClient(config *clientConfig) {
config.CompressMinBytes = o.Min
}
func (o *compressMinBytesOption) applyToHandler(config *handlerConfig) {
config.CompressMinBytes = o.Min
}
type readMaxBytesOption struct {
Max int
}
func (o *readMaxBytesOption) applyToClient(config *clientConfig) {
config.ReadMaxBytes = o.Max
}
func (o *readMaxBytesOption) applyToHandler(config *handlerConfig) {
config.ReadMaxBytes = o.Max
}
type sendMaxBytesOption struct {
Max int
}
func (o *sendMaxBytesOption) applyToClient(config *clientConfig) {
config.SendMaxBytes = o.Max
}
func (o *sendMaxBytesOption) applyToHandler(config *handlerConfig) {
config.SendMaxBytes = o.Max
}
type handlerOptionsOption struct {
options []HandlerOption
}
func (o *handlerOptionsOption) applyToHandler(config *handlerConfig) {
for _, option := range o.options {
option.applyToHandler(config)
}
}
type requireConnectProtocolHeaderOption struct{}
func (o *requireConnectProtocolHeaderOption) applyToHandler(config *handlerConfig) {
config.RequireConnectProtocolHeader = true
}
type idempotencyOption struct {
idempotencyLevel IdempotencyLevel
}
func (o *idempotencyOption) applyToClient(config *clientConfig) {
config.IdempotencyLevel = o.idempotencyLevel
}
func (o *idempotencyOption) applyToHandler(config *handlerConfig) {
config.IdempotencyLevel = o.idempotencyLevel
}
type grpcOption struct {
web bool
}
func (o *grpcOption) applyToClient(config *clientConfig) {
config.Protocol = &protocolGRPC{web: o.web}
}
type enableGet struct{}
func (o *enableGet) applyToClient(config *clientConfig) {
config.EnableGet = true
}
// WithHTTPGetMaxURLSize sets the maximum allowable URL length for GET requests
// made using the Connect protocol. It has no effect on gRPC or gRPC-Web
// clients, since those protocols are POST-only.
//
// Limiting the URL size is useful as most user agents, proxies, and servers
// have limits on the allowable length of a URL. For example, Apache and Nginx
// limit the size of a request line to around 8 KiB, meaning that maximum
// length of a URL is a bit smaller than this. If you run into URL size
// limitations imposed by your network infrastructure and don't know the
// maximum allowable size, or if you'd prefer to be cautious from the start, a
// 4096 byte (4 KiB) limit works with most common proxies and CDNs.
//
// If fallback is set to true and the URL would be longer than the configured
// maximum value, the request will be sent as an HTTP POST instead. If fallback
// is set to false, the request will fail with [CodeResourceExhausted].
//
// By default, Connect-protocol clients with GET requests enabled may send a
// URL of any size.
func WithHTTPGetMaxURLSize(bytes int, fallback bool) ClientOption {
return &getURLMaxBytes{Max: bytes, Fallback: fallback}
}
type getURLMaxBytes struct {
Max int
Fallback bool
}
func (o *getURLMaxBytes) applyToClient(config *clientConfig) {
config.GetURLMaxBytes = o.Max
config.GetUseFallback = o.Fallback
}
type interceptorsOption struct {
Interceptors []Interceptor
}
func (o *interceptorsOption) applyToClient(config *clientConfig) {
config.Interceptor = o.chainWith(config.Interceptor)
}
func (o *interceptorsOption) applyToHandler(config *handlerConfig) {
config.Interceptor = o.chainWith(config.Interceptor)
}
func (o *interceptorsOption) chainWith(current Interceptor) Interceptor {
if len(o.Interceptors) == 0 {
return current
}
if current == nil && len(o.Interceptors) == 1 {
return o.Interceptors[0]
}
if current == nil && len(o.Interceptors) > 1 {
return newChain(o.Interceptors)
}
return newChain(append([]Interceptor{current}, o.Interceptors...))
}
type optionsOption struct {
options []Option
}
func (o *optionsOption) applyToClient(config *clientConfig) {
for _, option := range o.options {
option.applyToClient(config)
}
}
func (o *optionsOption) applyToHandler(config *handlerConfig) {
for _, option := range o.options {
option.applyToHandler(config)
}
}
type sendCompressionOption struct {
Name string
}
func (o *sendCompressionOption) applyToClient(config *clientConfig) {
config.RequestCompressionName = o.Name
}
func withGzip() Option {
return &compressionOption{
Name: compressionGzip,
CompressionPool: newCompressionPool(
func() Decompressor { return &gzip.Reader{} },
func() Compressor { return gzip.NewWriter(io.Discard) },
),
}
}
func withProtoBinaryCodec() Option {
return WithCodec(&protoBinaryCodec{})
}
func withProtoJSONCodecs() HandlerOption {
return WithHandlerOptions(
WithCodec(&protoJSONCodec{codecNameJSON}),
WithCodec(&protoJSONCodec{codecNameJSONCharsetUTF8}),
)
}
type conditionalHandlerOptions struct {
conditional func(spec Spec) []HandlerOption
}
func (o *conditionalHandlerOptions) applyToHandler(config *handlerConfig) {
spec := config.newSpec()
if spec.Procedure == "" {
return // ignore empty specs
}
for _, option := range o.conditional(spec) {
option.applyToHandler(config)
}
}