Skip to content

Commit

Permalink
Merge branch 'master' into metrics-framework-churner
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 26, 2024
2 parents af7284d + c28f42f commit 53bbb85
Show file tree
Hide file tree
Showing 91 changed files with 3,220 additions and 906 deletions.
10 changes: 0 additions & 10 deletions common/aws/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ type ClientConfig struct {
// FragmentParallelismConstant helps determine the size of the pool of workers to help upload/download files.
// A non-zero value for this parameter adds a constant number of workers. Default is 0.
FragmentParallelismConstant int
// FragmentReadTimeout is used to bound the maximum time to wait for a single fragmented read.
// Default is 30 seconds.
FragmentReadTimeout time.Duration
// FragmentWriteTimeout is used to bound the maximum time to wait for a single fragmented write.
// Default is 30 seconds.
FragmentWriteTimeout time.Duration
}

func ClientFlags(envPrefix string, flagPrefix string) []cli.Flag {
Expand Down Expand Up @@ -120,8 +114,6 @@ func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
EndpointURL: ctx.GlobalString(common.PrefixFlag(flagPrefix, EndpointURLFlagName)),
FragmentParallelismFactor: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismFactorFlagName)),
FragmentParallelismConstant: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismConstantFlagName)),
FragmentReadTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentReadTimeoutFlagName)),
FragmentWriteTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentWriteTimeoutFlagName)),
}
}

Expand All @@ -131,7 +123,5 @@ func DefaultClientConfig() *ClientConfig {
Region: "us-east-2",
FragmentParallelismFactor: 8,
FragmentParallelismConstant: 0,
FragmentReadTimeout: 30 * time.Second,
FragmentWriteTimeout: 30 * time.Second,
}
}
20 changes: 12 additions & 8 deletions common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"golang.org/x/sync/errgroup"
)

const (
defaultBlobBufferSizeByte = 128 * 1024
)

var (
once sync.Once
ref *client
Expand Down Expand Up @@ -106,14 +110,20 @@ func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.L
}

func (s *client) DownloadObject(ctx context.Context, bucket string, key string) ([]byte, error) {
objectSize := defaultBlobBufferSizeByte
size, err := s.HeadObject(ctx, bucket, key)
if err == nil {
objectSize = int(*size)
}
buffer := manager.NewWriteAtBuffer(make([]byte, 0, objectSize))

var partMiBs int64 = 10
downloader := manager.NewDownloader(s.s3Client, func(d *manager.Downloader) {
d.PartSize = partMiBs * 1024 * 1024 // 10MB per part
d.Concurrency = 3 //The number of goroutines to spin up in parallel per call to Upload when sending parts
})

buffer := manager.NewWriteAtBuffer([]byte{})
_, err := downloader.Download(ctx, buffer, &s3.GetObjectInput{
_, err = downloader.Download(ctx, buffer, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
Expand Down Expand Up @@ -223,9 +233,6 @@ func (s *client) FragmentedUploadObject(
}
resultChannel := make(chan error, len(fragments))

ctx, cancel := context.WithTimeout(ctx, s.cfg.FragmentWriteTimeout)
defer cancel()

for _, fragment := range fragments {
fragmentCapture := fragment
s.concurrencyLimiter <- struct{}{}
Expand Down Expand Up @@ -283,9 +290,6 @@ func (s *client) FragmentedDownloadObject(
}
resultChannel := make(chan *readResult, len(fragmentKeys))

ctx, cancel := context.WithTimeout(ctx, s.cfg.FragmentWriteTimeout)
defer cancel()

for i, fragmentKey := range fragmentKeys {
boundFragmentKey := fragmentKey
boundI := i
Expand Down
51 changes: 34 additions & 17 deletions common/metrics/count_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package metrics

import (
"fmt"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var _ CountMetric = &countMetric{}
Expand All @@ -11,6 +13,9 @@ var _ CountMetric = &countMetric{}
type countMetric struct {
Metric

// logger is the logger used to log errors.
logger logging.Logger

// name is the name of the metric.
name string

Expand All @@ -25,13 +30,34 @@ type countMetric struct {
}

// newCountMetric creates a new CountMetric instance.
func newCountMetric(name string, description string, vec *prometheus.CounterVec, labeler *labelMaker) CountMetric {
func newCountMetric(
logger logging.Logger,
registry *prometheus.Registry,
namespace string,
name string,
description string,
labelTemplate any) (CountMetric, error) {

labeler, err := newLabelMaker(labelTemplate)
if err != nil {
return nil, err
}

vec := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_count", name),
},
labeler.getKeys(),
)

return &countMetric{
logger: logger,
name: name,
description: description,
vec: vec,
labeler: labeler,
}
}, nil
}

func (m *countMetric) Name() string {
Expand All @@ -54,31 +80,22 @@ func (m *countMetric) LabelFields() []string {
return m.labeler.getKeys()
}

func (m *countMetric) Increment(label ...any) error {
return m.Add(1, label...)
func (m *countMetric) Increment(label ...any) {
m.Add(1, label...)
}

func (m *countMetric) Add(value float64, label ...any) error {
if m.vec == nil {
return nil
}

if len(label) > 1 {
return fmt.Errorf("too many labels provided, expected 1, got %d", len(label))
}

func (m *countMetric) Add(value float64, label ...any) {
var l any
if len(label) == 1 {
if len(label) > 0 {
l = label[0]
}

values, err := m.labeler.extractValues(l)
if err != nil {
return fmt.Errorf("error extracting values from label for metric %s: %v", m.name, err)
m.logger.Errorf("error extracting values from label for metric %s: %v", m.name, err)
return
}

observer := m.vec.WithLabelValues(values...)
observer.Add(value)

return nil
}
44 changes: 28 additions & 16 deletions common/metrics/gauge_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package metrics

import (
"fmt"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var _ GaugeMetric = &gaugeMetric{}
Expand All @@ -11,6 +13,9 @@ var _ GaugeMetric = &gaugeMetric{}
type gaugeMetric struct {
Metric

// logger is the logger used to log errors.
logger logging.Logger

// name is the name of the metric.
name string

Expand All @@ -29,19 +34,35 @@ type gaugeMetric struct {

// newGaugeMetric creates a new GaugeMetric instance.
func newGaugeMetric(
logger logging.Logger,
registry *prometheus.Registry,
namespace string,
name string,
unit string,
description string,
vec *prometheus.GaugeVec,
labeler *labelMaker) GaugeMetric {
labelTemplate any) (GaugeMetric, error) {

labeler, err := newLabelMaker(labelTemplate)
if err != nil {
return nil, err
}

vec := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_%s", name, unit),
},
labeler.getKeys(),
)

return &gaugeMetric{
logger: logger,
name: name,
unit: unit,
description: description,
vec: vec,
labeler: labeler,
}
}, nil
}

func (m *gaugeMetric) Name() string {
Expand All @@ -64,28 +85,19 @@ func (m *gaugeMetric) LabelFields() []string {
return m.labeler.getKeys()
}

func (m *gaugeMetric) Set(value float64, label ...any) error {
if m.vec == nil {
// metric is not enabled
return nil
}

if len(label) > 1 {
return fmt.Errorf("too many labels provided, expected 1, got %d", len(label))
}

func (m *gaugeMetric) Set(value float64, label ...any) {
var l any
if len(label) == 1 {
if len(label) > 0 {
l = label[0]
}

values, err := m.labeler.extractValues(l)
if err != nil {
return fmt.Errorf("error extracting values from label for metric %s: %v", m.name, err)
m.logger.Errorf("failed to extract values from label: %v", err)
return
}

observer := m.vec.WithLabelValues(values...)

observer.Set(value)
return nil
}
17 changes: 9 additions & 8 deletions common/metrics/label_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type labelMaker struct {
keys []string
emptyValues []string
templateType reflect.Type
labelCount int
}

// newLabelMaker creates a new labelMaker instance given a label template. The label template may be nil.
Expand All @@ -23,6 +24,10 @@ func newLabelMaker(labelTemplate any) (*labelMaker, error) {
}

v := reflect.ValueOf(labelTemplate)
if v.Kind() != reflect.Struct {
return nil, fmt.Errorf("label template must be a struct")
}

t := v.Type()
labeler.templateType = t
for i := 0; i < t.NumField(); i++ {
Expand All @@ -37,6 +42,7 @@ func newLabelMaker(labelTemplate any) (*labelMaker, error) {
}

labeler.emptyValues = make([]string, len(labeler.keys))
labeler.labelCount = len(labeler.keys)

return labeler, nil
}
Expand All @@ -48,13 +54,7 @@ func (l *labelMaker) getKeys() []string {

// extractValues extracts the values from the given label struct.
func (l *labelMaker) extractValues(label any) ([]string, error) {
values := make([]string, 0)

if l.templateType == nil {
return values, nil
}

if label == nil {
if l.templateType == nil || label == nil {
return l.emptyValues, nil
}

Expand All @@ -63,7 +63,8 @@ func (l *labelMaker) extractValues(label any) ([]string, error) {
"label type mismatch, expected %v, got %v", l.templateType, reflect.TypeOf(label))
}

for i := 0; i < l.templateType.NumField(); i++ {
values := make([]string, 0, l.labelCount)
for i := 0; i < l.labelCount; i++ {
v := reflect.ValueOf(label)
values = append(values, v.Field(i).String())
}
Expand Down
Loading

0 comments on commit 53bbb85

Please sign in to comment.