Skip to content

Commit

Permalink
Revert "Upgrade to google.golang.org/grpc v1.66.2 / modify certain pr…
Browse files Browse the repository at this point in the history
…otobuf messages to retain their unmarshaling buffer (#9401)"

This reverts commit accccf4.
  • Loading branch information
leizor committed Nov 1, 2024
1 parent ac26447 commit 8a4faff
Show file tree
Hide file tree
Showing 95 changed files with 2,501 additions and 3,979 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.30.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
google.golang.org/grpc v1.66.2
google.golang.org/grpc v1.66.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -316,3 +316,7 @@ replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-aler
// - https://github.com/grafana/franz-go/pull/3
// - https://github.com/grafana/franz-go/pull/4
replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
1,182 changes: 1,121 additions & 61 deletions go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,6 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := NewParsedRequest(req)
pushReq.AddCleanup(func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})

Expand Down
3 changes: 0 additions & 3 deletions pkg/frontend/querymiddleware/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions pkg/frontend/querymiddleware/model.pb.go.expdiff

This file was deleted.

23 changes: 3 additions & 20 deletions pkg/ingester/client/buffering_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -70,12 +68,7 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
require.Equal(t, requestsToSend, reqs)
})

t.Run("push with pooling", func(t *testing.T) {
Expand All @@ -92,12 +85,7 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
require.Equal(t, requestsToSend, reqs)

// Verify that pool was used.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down Expand Up @@ -161,12 +149,7 @@ func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T)
_, err := bufferingClient.Push(ctx, req)
require.NoError(t, err)

diff := cmp.Diff([]*mimirpb.WriteRequest{req}, serv.requests(), cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
require.Equal(t, serv.requests(), []*mimirpb.WriteRequest{req})

// Verify that all buffers from the pool were returned.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down
15 changes: 0 additions & 15 deletions pkg/ingester/client/ingester.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 0 additions & 54 deletions pkg/ingester/client/ingester.pb.go.expdiff

This file was deleted.

5 changes: 1 addition & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3869,10 +3869,7 @@ func (i *Ingester) checkAvailableForPush() error {

// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
err := i.PushWithCleanup(ctx, req, func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
if err != nil {
return mapPushErrorToErrorWithStatus(err)
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3292,10 +3292,8 @@ func TestIngester_Push(t *testing.T) {

// Push timeseries
for idx, req := range testData.reqs {
// Push metrics to the ingester.
err := i.PushWithCleanup(ctx, req, func() {
req.FreeBuffer()
})
// Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one.
err := i.PushWithCleanup(ctx, req, func() {})

// We expect no error on any request except the last one
// which may error (and in that case we assert on it)
Expand Down Expand Up @@ -5532,7 +5530,7 @@ func TestIngester_QueryStream_StreamingWithManySamples(t *testing.T) {
IsEndOfSeriesStream: true,
}

require.EqualExportedValues(t, seriesLabelsMsg, *resp)
require.Equal(t, seriesLabelsMsg, *resp)

recvMsgs := 0
series := 0
Expand Down
76 changes: 0 additions & 76 deletions pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,8 @@ import (
"math"

"github.com/prometheus/prometheus/model/histogram"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
protobufproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)

func init() {
c := encoding.GetCodecV2(proto.Name)
encoding.RegisterCodecV2(&codecV2{codec: c})
}

// codecV2 customizes gRPC unmarshalling.
type codecV2 struct {
codec encoding.CodecV2
}

var _ encoding.CodecV2 = &codecV2{}

func messageV2Of(v any) protobufproto.Message {
switch v := v.(type) {
case protoadapt.MessageV1:
return protoadapt.MessageV2Of(v)
case protoadapt.MessageV2:
return v
default:
panic(fmt.Errorf("unrecognized message type %T", v))
}
}

func (c *codecV2) Marshal(v any) (mem.BufferSlice, error) {
return c.codec.Marshal(v)
}

// Unmarshal customizes gRPC unmarshalling.
// If v wraps BufferHolder, its SetBuffer method is called with the unmarshalling buffer.
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
vv := messageV2Of(v)
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
// Decrement buf's reference count. Note though that if v wraps BufferHolder,
// we increase buf's reference count first so it doesn't go to zero.
defer buf.Free()

if err := protobufproto.Unmarshal(buf.ReadOnlyData(), vv); err != nil {
return err
}

if holder, ok := v.(interface {
SetBuffer(mem.Buffer)
}); ok {
buf.Ref()
holder.SetBuffer(buf)
}

return nil
}

func (c *codecV2) Name() string {
return c.codec.Name()
}

// BufferHolder is a base type for protobuf messages that keep unsafe references to the unmarshalling buffer.
// Implementations of this interface should keep a reference to said buffer.
type BufferHolder struct {
buffer mem.Buffer
}

func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *BufferHolder) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

// MinTimestamp returns the minimum timestamp (milliseconds) among all series
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
func (m *WriteRequest) MinTimestamp() int64 {
Expand Down
25 changes: 0 additions & 25 deletions pkg/mimirpb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
)

func TestWriteRequest_MinTimestamp(t *testing.T) {
Expand Down Expand Up @@ -162,25 +159,3 @@ func TestIsFloatHistogram(t *testing.T) {
})
}
}

func TestCodecV2_Unmarshal(t *testing.T) {
c := codecV2{codec: fakeCodecV2{}}

var origReq WriteRequest
data, err := c.Marshal(&origReq)
require.NoError(t, err)

var req WriteRequest
require.NoError(t, c.Unmarshal(data, &req))

require.NotNil(t, req.buffer)
req.FreeBuffer()
}

type fakeCodecV2 struct {
encoding.CodecV2
}

func (c fakeCodecV2) Marshal(v any) (mem.BufferSlice, error) {
return encoding.GetCodecV2(proto.Name).Marshal(v)
}
3 changes: 0 additions & 3 deletions pkg/mimirpb/mimir.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8a4faff

Please sign in to comment.