Skip to content

Commit

Permalink
It works
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeckhart committed May 15, 2024
1 parent fbc27eb commit 697fa2a
Show file tree
Hide file tree
Showing 16 changed files with 814 additions and 361 deletions.
2 changes: 2 additions & 0 deletions pkg/config/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const AwsSdkV2 = "aws-sdk-v2"
// AlwaysReturnInfoMetrics is a feature flag used to enable the return of info metrics even when there are no corresponding CloudWatch metrics
const AlwaysReturnInfoMetrics = "always-return-info-metrics"

const UnifiedJobRunner = "unified-job-runner"

// FeatureFlags is an interface all objects that can tell wether or not a feature flag is enabled can implement.
type FeatureFlags interface {
// IsFeatureEnabled tells if the feature flag identified by flag is enabled.
Expand Down
4 changes: 4 additions & 0 deletions pkg/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func UpdateMetrics(
options.taggingAPIConcurrency,
)

logger.Debug("AWS Data scraped", "resource_results", len(tagsData), "metric_results", len(cloudwatchData))

metrics, observedMetricLabels, err := promutil.BuildMetrics(cloudwatchData, options.labelsSnakeCase, logger)
if err != nil {
logger.Error(err, "Error migrating cloudwatch metrics to prometheus metrics")
Expand All @@ -199,5 +201,7 @@ func UpdateMetrics(
metrics = promutil.EnsureLabelConsistencyAndRemoveDuplicates(metrics, observedMetricLabels)

registry.MustRegister(promutil.NewPrometheusCollector(metrics))

logger.Debug("Registered collected metrics", "metrics_registered", len(metrics))
return nil
}
108 changes: 0 additions & 108 deletions pkg/job/appender/accumulatingappender.go

This file was deleted.

55 changes: 55 additions & 0 deletions pkg/job/appender/accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package appender

import (
"context"
"sync"
"sync/atomic"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type Accumulator struct {
mux sync.Mutex
batches [][]*model.CloudwatchData
flattened []*model.CloudwatchData
done atomic.Bool
}

func NewAccumulator() *Accumulator {
return &Accumulator{
mux: sync.Mutex{},
batches: [][]*model.CloudwatchData{},
done: atomic.Bool{},
}
}

func (a *Accumulator) Append(_ context.Context, batch []*model.CloudwatchData) {
if a.done.Load() {
return
}

a.mux.Lock()
defer a.mux.Unlock()
a.batches = append(a.batches, batch)
}

func (a *Accumulator) Done() {
a.done.CompareAndSwap(false, true)
flattenedLength := 0
for _, batch := range a.batches {
flattenedLength += len(batch)
}

a.flattened = make([]*model.CloudwatchData, 0, flattenedLength)
for _, batch := range a.batches {
a.flattened = append(a.flattened, batch...)
}
}

func (a *Accumulator) ListAll() []*model.CloudwatchData {
if !a.done.Load() {
return nil
}

return a.flattened
}
43 changes: 11 additions & 32 deletions pkg/job/appender/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,28 @@ package appender
import (
"context"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type Resource struct {
// Name is an identifiable value for the resource and is variable dependent on the match made
// It will be the AWS ARN (Amazon Resource Name) if a unique resource was found
// It will be "global" if a unique resource was not found
// CustomNamespaces will have the custom namespace Name
Name string
// Tags is a set of tags associated to the resource
Tags []model.Tag
}

type Resources struct {
staticResource *Resource
associatedResources []*Resource
}

type metricResourceEnricher interface {
Enrich(ctx context.Context, metrics []*model.Metric) ([]*model.Metric, Resources)
}

type ResourceEnrichmentStrategy interface {
new(logger logging.Logger) metricResourceEnricher
resourceTagsOnMetrics() []string
func New(enricher resourcemetadata.MetricResourceEnricher) *Appender {
return &Appender{
resourceEnricher: enricher,
mapper: cloudwatchDataMapper{},
accumulator: NewAccumulator(),
}
}

type Appender struct {
resourceEnricher metricResourceEnricher
resourceEnricher resourcemetadata.MetricResourceEnricher
mapper cloudwatchDataMapper
accumulator *Accumulator
}

func (a Appender) Append(ctx context.Context, namespace string, metricConfig *model.MetricConfig, metrics []*model.Metric) {
metrics, metricResources := a.resourceEnricher.Enrich(ctx, metrics)
a.accumulator.Append(ctx, namespace, metricConfig, metrics, metricResources)
cloudwatchData := a.mapper.Map(ctx, namespace, metricConfig, metrics, metricResources)
a.accumulator.Append(ctx, cloudwatchData)
}

func (a Appender) Done() {
Expand All @@ -48,10 +34,3 @@ func (a Appender) Done() {
func (a Appender) ListAll() []*model.CloudwatchData {
return a.accumulator.ListAll()
}

func New(logger logging.Logger, strategy ResourceEnrichmentStrategy) *Appender {
return &Appender{
resourceEnricher: strategy.new(logger),
accumulator: NewAccumulator(strategy.resourceTagsOnMetrics()),
}
}
48 changes: 48 additions & 0 deletions pkg/job/appender/cloudwatchdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package appender

import (
"context"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type cloudwatchDataMapper struct {
resourceTagsOnMetrics []string
}

func (c cloudwatchDataMapper) Map(_ context.Context, namespace string, metricConfig *model.MetricConfig, metrics []*model.Metric, resources resourcemetadata.Resources) []*model.CloudwatchData {
batch := make([]*model.CloudwatchData, 0, len(metrics)*len(metricConfig.Statistics))
for i, metric := range metrics {
var resource *resourcemetadata.Resource
if resources.StaticResource != nil {
resource = resources.StaticResource
} else {
resource = resources.AssociatedResources[i]
}

for _, stat := range metricConfig.Statistics {
data := &model.CloudwatchData{
MetricName: metricConfig.Name,
Namespace: namespace,
Dimensions: metric.Dimensions,
GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{
Period: metricConfig.Period,
Length: metricConfig.Length,
Delay: metricConfig.Delay,
Statistic: stat,
},
MetricMigrationParams: model.MetricMigrationParams{
NilToZero: metricConfig.NilToZero,
AddCloudwatchTimestamp: metricConfig.AddCloudwatchTimestamp,
},
ResourceName: resource.Name,
Tags: resource.Tags,
GetMetricDataResult: nil,
GetMetricStatisticsResult: nil,
}
batch = append(batch, data)
}
}
return batch
}
92 changes: 0 additions & 92 deletions pkg/job/appender/resourceassociation.go

This file was deleted.

Loading

0 comments on commit 697fa2a

Please sign in to comment.