Skip to content

Commit

Permalink
Add custom CodecV2
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Sep 30, 2024
1 parent b9a6e38 commit 554a41b
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 19 deletions.
16 changes: 4 additions & 12 deletions pkg/ingester/client/buffering_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,24 +265,16 @@ func (ms *mockServer) Push(_ context.Context, r *mimirpb.WriteRequest) (*mimirpb
ms.mu.Lock()
defer ms.mu.Unlock()

d, err := r.Marshal()
if err != nil {
return nil, fmt.Errorf("marshal WriteRequest: %w", err)
}
var c mimirpb.WriteRequest
if err := c.Unmarshal(d); err != nil {
return nil, fmt.Errorf("unmarshal WriteRequest: %w", err)
}
// Clear unmarshal data, to ensure equality.
c.ClearTimeseriesUnmarshalData()
ms.reqs = append(ms.reqs, &c)
// Clear unmarshal data. We don't need it and it breaks equality check in test.
r.ClearTimeseriesUnmarshalData()
ms.reqs = append(ms.reqs, r)

if ms.trackSamples {
if ms.samplesPerSeries == nil {
ms.samplesPerSeries = map[string][]mimirpb.Sample{}
}

for _, ts := range c.Timeseries {
for _, ts := range r.Timeseries {
ser := mimirpb.FromLabelAdaptersToLabels(ts.Labels).String()
ms.samplesPerSeries[ser] = append(ms.samplesPerSeries[ser], ts.Samples...)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"

"github.com/grafana/mimir/pkg/ingester/activeseries"
asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
Expand Down Expand Up @@ -3835,7 +3837,11 @@ func (i *Ingester) checkAvailableForPush() error {

// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
err := i.PushWithCleanup(ctx, req, func() {
codec := encoding.GetCodecV2(proto.Name).(*mimirpb.CodecV2)
codec.FreeWriteRequest(req)
mimirpb.ReuseSlice(req.Timeseries)
})
if err != nil {
return mapPushErrorToErrorWithStatus(err)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
"github.com/grafana/mimir/pkg/ingester/client"
Expand Down Expand Up @@ -3211,8 +3213,11 @@ func TestIngester_Push(t *testing.T) {

// Push timeseries
for idx, req := range testData.reqs {
// Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one.
err := i.PushWithCleanup(ctx, req, func() {})
// Push metrics to the ingester.
err := i.PushWithCleanup(ctx, req, func() {
codec := encoding.GetCodecV2(proto.Name).(*mimirpb.CodecV2)
codec.FreeWriteRequest(req)
})

// We expect no error on any request except the last one
// which may error (and in that case we assert on it)
Expand Down
104 changes: 104 additions & 0 deletions pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,114 @@ import (
"bytes"
"fmt"
"math"
"sync"
"unsafe"

"github.com/prometheus/prometheus/model/histogram"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
)

func init() {
c := encoding.GetCodecV2(proto.Name)
encoding.RegisterCodecV2(&CodecV2{
c: c,
refs: map[uintptr]mem.BufferSlice{},
})
}

type CodecV2 struct {
c encoding.CodecV2

mtx sync.Mutex
refs map[uintptr]mem.BufferSlice
}

func (*CodecV2) Name() string {
return proto.Name
}

func (c *CodecV2) Marshal(v any) (out mem.BufferSlice, err error) {
return c.c.Marshal(v)
}

func (c *CodecV2) Unmarshal(data mem.BufferSlice, v any) error {
if err := c.c.Unmarshal(data, v); err != nil {
return err
}

switch fmt.Sprintf("%T", v) {
case "*alertmanagerpb.ReadStateRequest":
case "*client.QueryRequest":
case "*etcdserverpb.RangeResponse":
case "*emptypb.Empty":
case "*frontendv1pb.FrontendToClient":
case "*frontendv1pb.NotifyClientShutdownRequest":
case "*httpgrpc.HTTPRequest":
case "*mimirpb.WriteResponse":
case "*ruler.RulesRequest":
case "*schedulerpb.FrontendToScheduler":
case "*schedulerpb.QuerierToScheduler":
case "*storepb.SeriesRequest":
default:
panic(fmt.Errorf("unrecognizd protobuf message %T", v))
}

/*
switch x := v.(type) {
case *WriteRequest:
data.Ref()
c.mtx.Lock()
defer c.mtx.Unlock()
c.refs[uintptr(unsafe.Pointer(x))] = data
case *PreallocTimeseries:
panic("unmarshaling PreallocTimeseries")
case *LabelAdapter:
panic("unmarshaling label adapter")
}
*/

return nil
}

// FreeWriteRequest frees a previously unmarshaled WriteRequest.
func (c *CodecV2) FreeWriteRequest(req *WriteRequest) {
c.mtx.Lock()
defer c.mtx.Unlock()

ptr := uintptr(unsafe.Pointer(&req))
c.refs[ptr].Free()
delete(c.refs, ptr)

for i, ts := range req.Timeseries {
ptr := uintptr(unsafe.Pointer(&req.Timeseries[i]))
c.refs[ptr].Free()
delete(c.refs, ptr)

for j := range ts.Labels {
ptr := uintptr(unsafe.Pointer(&ts.Labels[j]))
c.refs[ptr].Free()
delete(c.refs, ptr)
}

for _, e := range ts.Exemplars {
for j := range e.Labels {
ptr := uintptr(unsafe.Pointer(&e.Labels[j]))
c.refs[ptr].Free()
delete(c.refs, ptr)
}
}
}
}

type UnmarshalerV2 interface {
UnmarshalV2(encoding.CodecV2, mem.BufferSlice) error
Free()
}

// var _ UnmarshalerV2 = &WriteRequest{}

// MinTimestamp returns the minimum timestamp (milliseconds) among all series
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
func (m *WriteRequest) MinTimestamp() int64 {
Expand Down
7 changes: 3 additions & 4 deletions pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ var TimeseriesUnmarshalCachingEnabled = true
// if p.skipUnmarshalingExemplars is false.
func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
if TimeseriesUnmarshalCachingEnabled {
p.marshalledData = make([]byte, len(dAtA))
copy(p.marshalledData, dAtA)
p.marshalledData = dAtA
}
p.TimeSeries = TimeseriesFromPool()
p.TimeSeries.SkipUnmarshalingExemplars = p.skipUnmarshalingExemplars
Expand Down Expand Up @@ -351,7 +350,7 @@ func (bs *LabelAdapter) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
bs.Name = string(dAtA[iNdEx:postIndex])
bs.Name = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
Expand Down Expand Up @@ -382,7 +381,7 @@ func (bs *LabelAdapter) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
bs.Value = string(dAtA[iNdEx:postIndex])
bs.Value = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
Expand Down

0 comments on commit 554a41b

Please sign in to comment.