-
Notifications
You must be signed in to change notification settings - Fork 239
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2742 from aws-observability/backport/decompression
Backport changes to `confighttp` and `configgrpc`
- Loading branch information
Showing
3 changed files
with
199 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
diff --git a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go | ||
index 87e7b83d7..e64b87142 100644 | ||
--- a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go | ||
+++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go | ||
@@ -12,7 +12,6 @@ import ( | ||
"time" | ||
|
||
"github.com/mostynb/go-grpc-compression/nonclobbering/snappy" | ||
- "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" | ||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" | ||
"go.opentelemetry.io/otel" | ||
"google.golang.org/grpc" | ||
@@ -28,6 +27,7 @@ import ( | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config/configauth" | ||
"go.opentelemetry.io/collector/config/configcompression" | ||
+ grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal" | ||
"go.opentelemetry.io/collector/config/confignet" | ||
"go.opentelemetry.io/collector/config/configopaque" | ||
"go.opentelemetry.io/collector/config/configtelemetry" | ||
@@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err | ||
case configcompression.TypeSnappy: | ||
return snappy.Name, nil | ||
case configcompression.TypeZstd: | ||
- return zstd.Name, nil | ||
+ return grpcInternal.ZstdName, nil | ||
default: | ||
return "", fmt.Errorf("unsupported compression type %q", compressionType) | ||
} | ||
diff --git /dev/null b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go | ||
new file mode 100644 | ||
index 000000000..0718b7353 | ||
--- /dev/null | ||
+++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go | ||
@@ -0,0 +1,83 @@ | ||
+// Copyright The OpenTelemetry Authors | ||
+// Copyright 2017 gRPC authors | ||
+// SPDX-License-Identifier: Apache-2.0 | ||
+ | ||
+package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal" | ||
+ | ||
+import ( | ||
+ "errors" | ||
+ "io" | ||
+ "sync" | ||
+ | ||
+ "github.com/klauspost/compress/zstd" | ||
+ "google.golang.org/grpc/encoding" | ||
+) | ||
+ | ||
+const ZstdName = "zstd" | ||
+ | ||
+func init() { | ||
+ encoding.RegisterCompressor(NewZstdCodec()) | ||
+} | ||
+ | ||
+type writer struct { | ||
+ *zstd.Encoder | ||
+ pool *sync.Pool | ||
+} | ||
+ | ||
+func NewZstdCodec() encoding.Compressor { | ||
+ c := &compressor{} | ||
+ c.poolCompressor.New = func() any { | ||
+ zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024)) | ||
+ return &writer{Encoder: zw, pool: &c.poolCompressor} | ||
+ } | ||
+ return c | ||
+} | ||
+ | ||
+func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { | ||
+ z := c.poolCompressor.Get().(*writer) | ||
+ z.Encoder.Reset(w) | ||
+ return z, nil | ||
+} | ||
+ | ||
+func (z *writer) Close() error { | ||
+ defer z.pool.Put(z) | ||
+ return z.Encoder.Close() | ||
+} | ||
+ | ||
+type reader struct { | ||
+ *zstd.Decoder | ||
+ pool *sync.Pool | ||
+} | ||
+ | ||
+func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { | ||
+ z, inPool := c.poolDecompressor.Get().(*reader) | ||
+ if !inPool { | ||
+ newZ, err := zstd.NewReader(r) | ||
+ if err != nil { | ||
+ return nil, err | ||
+ } | ||
+ return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil | ||
+ } | ||
+ if err := z.Reset(r); err != nil { | ||
+ c.poolDecompressor.Put(z) | ||
+ return nil, err | ||
+ } | ||
+ return z, nil | ||
+} | ||
+ | ||
+func (z *reader) Read(p []byte) (n int, err error) { | ||
+ n, err = z.Decoder.Read(p) | ||
+ if errors.Is(err, io.EOF) { | ||
+ z.pool.Put(z) | ||
+ } | ||
+ return n, err | ||
+} | ||
+ | ||
+func (c *compressor) Name() string { | ||
+ return ZstdName | ||
+} | ||
+ | ||
+type compressor struct { | ||
+ poolCompressor sync.Pool | ||
+ poolDecompressor sync.Pool | ||
+} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go | ||
index 88ecafe78..a700bec84 100644 | ||
--- a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go | ||
+++ b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go | ||
@@ -67,24 +67,26 @@ func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err | ||
} | ||
|
||
type decompressor struct { | ||
- errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) | ||
- base http.Handler | ||
- decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) | ||
+ errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) | ||
+ base http.Handler | ||
+ decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) | ||
+ maxRequestBodySize int64 | ||
} | ||
|
||
// httpContentDecompressor offloads the task of handling compressed HTTP requests | ||
// by identifying the compression format in the "Content-Encoding" header and re-writing | ||
// request body so that the handlers further in the chain can work on decompressed data. | ||
// It supports gzip and deflate/zlib compression. | ||
-func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler { | ||
+func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler { | ||
errHandler := defaultErrorHandler | ||
if eh != nil { | ||
errHandler = eh | ||
} | ||
|
||
d := &decompressor{ | ||
- errHandler: errHandler, | ||
- base: h, | ||
+ maxRequestBodySize: maxRequestBodySize, | ||
+ errHandler: errHandler, | ||
+ base: h, | ||
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){ | ||
"": func(io.ReadCloser) (io.ReadCloser, error) { | ||
// Not a compressed payload. Nothing to do. | ||
@@ -155,7 +157,7 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
// "Content-Length" is set to -1 as the size of the decompressed body is unknown. | ||
r.Header.Del("Content-Length") | ||
r.ContentLength = -1 | ||
- r.Body = newBody | ||
+ r.Body = http.MaxBytesReader(w, newBody, d.maxRequestBodySize) | ||
} | ||
d.base.ServeHTTP(w, r) | ||
} | ||
diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go | ||
index b210fa0dd..71b2f17ee 100644 | ||
--- a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go | ||
+++ b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go | ||
@@ -30,6 +30,7 @@ import ( | ||
) | ||
|
||
const headerContentEncoding = "Content-Encoding" | ||
+const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB | ||
|
||
// ClientConfig defines settings for creating an HTTP client. | ||
type ClientConfig struct { | ||
@@ -269,7 +270,7 @@ type ServerConfig struct { | ||
// Auth for this receiver | ||
Auth *configauth.Authentication `mapstructure:"auth"` | ||
|
||
- // MaxRequestBodySize sets the maximum request body size in bytes | ||
+ // MaxRequestBodySize sets the maximum request body size in bytes. Default: 20MiB. | ||
MaxRequestBodySize int64 `mapstructure:"max_request_body_size"` | ||
|
||
// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers | ||
@@ -340,7 +341,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin | ||
o(serverOpts) | ||
} | ||
|
||
- handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders) | ||
+ if hss.MaxRequestBodySize <= 0 { | ||
+ hss.MaxRequestBodySize = defaultMaxRequestBodySize | ||
+ } | ||
+ | ||
+ handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders) | ||
|
||
if hss.MaxRequestBodySize > 0 { | ||
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize) |