From 1180cb6afba8fffb8ba8707b039d188a43f1b970 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sun, 29 Sep 2024 17:17:47 +0200 Subject: [PATCH] Add custom CodecV2 Signed-off-by: Arve Knudsen --- pkg/ingester/client/buffering_client_test.go | 16 +--- pkg/ingester/ingester.go | 8 +- pkg/ingester/ingester_test.go | 9 ++- pkg/mimirpb/custom.go | 82 ++++++++++++++++++++ pkg/mimirpb/timeseries.go | 7 +- 5 files changed, 103 insertions(+), 19 deletions(-) diff --git a/pkg/ingester/client/buffering_client_test.go b/pkg/ingester/client/buffering_client_test.go index c7f703cdca1..08056f6a0a9 100644 --- a/pkg/ingester/client/buffering_client_test.go +++ b/pkg/ingester/client/buffering_client_test.go @@ -265,24 +265,16 @@ func (ms *mockServer) Push(_ context.Context, r *mimirpb.WriteRequest) (*mimirpb ms.mu.Lock() defer ms.mu.Unlock() - d, err := r.Marshal() - if err != nil { - return nil, fmt.Errorf("marshal WriteRequest: %w", err) - } - var c mimirpb.WriteRequest - if err := c.Unmarshal(d); err != nil { - return nil, fmt.Errorf("unmarshal WriteRequest: %w", err) - } - // Clear unmarshal data, to ensure equality. - c.ClearTimeseriesUnmarshalData() - ms.reqs = append(ms.reqs, &c) + // Clear unmarshal data. We don't need it and it breaks equality check in test. + r.ClearTimeseriesUnmarshalData() + ms.reqs = append(ms.reqs, r) if ms.trackSamples { if ms.samplesPerSeries == nil { ms.samplesPerSeries = map[string][]mimirpb.Sample{} } - for _, ts := range c.Timeseries { + for _, ts := range r.Timeseries { ser := mimirpb.FromLabelAdaptersToLabels(ts.Labels).String() ms.samplesPerSeries[ser] = append(ms.samplesPerSeries[ser], ts.Samples...) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 97a1f83011e..74a73dda62e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -50,6 +50,8 @@ import ( "go.uber.org/atomic" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" "github.com/grafana/mimir/pkg/ingester/activeseries" asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" @@ -3835,7 +3837,11 @@ func (i *Ingester) checkAvailableForPush() error { // PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage. func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error { - err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) }) + err := i.PushWithCleanup(ctx, req, func() { + codec := encoding.GetCodecV2(proto.Name).(*mimirpb.CodecV2) + codec.FreeWriteRequest(req) + mimirpb.ReuseSlice(req.Timeseries) + }) if err != nil { return mapPushErrorToErrorWithStatus(err) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5b8db4c663f..9eec7b45da7 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -58,6 +58,8 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" "github.com/grafana/mimir/pkg/ingester/client" @@ -3211,8 +3213,11 @@ func TestIngester_Push(t *testing.T) { // Push timeseries for idx, req := range testData.reqs { - // Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one. - err := i.PushWithCleanup(ctx, req, func() {}) + // Push metrics to the ingester. + err := i.PushWithCleanup(ctx, req, func() { + codec := encoding.GetCodecV2(proto.Name).(*mimirpb.CodecV2) + codec.FreeWriteRequest(req) + }) // We expect no error on any request except the last one // which may error (and in that case we assert on it) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 55e17374fda..355c2d2b9c7 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -6,10 +6,92 @@ import ( "bytes" "fmt" "math" + "sync" + "unsafe" "github.com/prometheus/prometheus/model/histogram" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" + "google.golang.org/grpc/mem" ) +func init() { + c := encoding.GetCodecV2(proto.Name) + encoding.RegisterCodecV2(&CodecV2{c: c}) +} + +type CodecV2 struct { + c encoding.CodecV2 + + mtx sync.Mutex + refs map[uintptr]mem.BufferSlice +} + +func (*CodecV2) Name() string { + return proto.Name +} + +func (c *CodecV2) Marshal(v any) (out mem.BufferSlice, err error) { + return c.c.Marshal(v) +} + +func (c *CodecV2) Unmarshal(data mem.BufferSlice, v any) error { + if err := c.c.Unmarshal(data, v); err != nil { + return err + } + + switch x := v.(type) { + case *WriteRequest: + data.Ref() + c.mtx.Lock() + defer c.mtx.Unlock() + c.refs[uintptr(unsafe.Pointer(x))] = data + case *PreallocTimeseries: + panic("unmarshaling PreallocTimeseries") + case *LabelAdapter: + panic("unmarshaling label adapter") + } + + return nil +} + +// FreeWriteRequest frees a previously unmarshaled WriteRequest. +func (c *CodecV2) FreeWriteRequest(req *WriteRequest) { + c.mtx.Lock() + defer c.mtx.Unlock() + + ptr := uintptr(unsafe.Pointer(&req)) + c.refs[ptr].Free() + delete(c.refs, ptr) + + for i, ts := range req.Timeseries { + ptr := uintptr(unsafe.Pointer(&req.Timeseries[i])) + c.refs[ptr].Free() + delete(c.refs, ptr) + + for j := range ts.Labels { + ptr := uintptr(unsafe.Pointer(&ts.Labels[j])) + c.refs[ptr].Free() + delete(c.refs, ptr) + } + + for _, e := range ts.Exemplars { + for j := range e.Labels { + ptr := uintptr(unsafe.Pointer(&e.Labels[j])) + c.refs[ptr].Free() + delete(c.refs, ptr) + } + } + } +} + +type UnmarshalerV2 interface { + UnmarshalV2(encoding.CodecV2, mem.BufferSlice) error + Free() +} + +// var _ UnmarshalerV2 = &WriteRequest{} + // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { diff --git a/pkg/mimirpb/timeseries.go b/pkg/mimirpb/timeseries.go index 2c18e69665b..093358009e2 100644 --- a/pkg/mimirpb/timeseries.go +++ b/pkg/mimirpb/timeseries.go @@ -212,8 +212,7 @@ var TimeseriesUnmarshalCachingEnabled = true // if p.skipUnmarshalingExemplars is false. func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { if TimeseriesUnmarshalCachingEnabled { - p.marshalledData = make([]byte, len(dAtA)) - copy(p.marshalledData, dAtA) + p.marshalledData = dAtA } p.TimeSeries = TimeseriesFromPool() p.TimeSeries.SkipUnmarshalingExemplars = p.skipUnmarshalingExemplars @@ -351,7 +350,7 @@ func (bs *LabelAdapter) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - bs.Name = string(dAtA[iNdEx:postIndex]) + bs.Name = yoloString(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 2 { @@ -382,7 +381,7 @@ func (bs *LabelAdapter) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - bs.Value = string(dAtA[iNdEx:postIndex]) + bs.Value = yoloString(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex