Skip to content

Commit

Permalink
Better mocking
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeckhart committed May 31, 2024
1 parent a6c2b2e commit 955217d
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 32 deletions.
4 changes: 1 addition & 3 deletions pkg/job/appender/cloudwatchdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type cloudwatchDataMapper struct {
resourceTagsOnMetrics []string
}
type cloudwatchDataMapper struct{}

func (c cloudwatchDataMapper) Map(_ context.Context, namespace string, metricConfig *model.MetricConfig, metrics []*model.Metric, resources resourcemetadata.Resources) []*model.CloudwatchData {
batch := make([]*model.CloudwatchData, 0, len(metrics)*len(metricConfig.Statistics))
Expand Down
10 changes: 5 additions & 5 deletions pkg/job/cloudwatchrunner/discoveryjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type Discovery struct {
type DiscoveryJob struct {
Job model.DiscoveryJob
Resources []*model.TaggedResource
}

func (d Discovery) Namespace() string {
func (d DiscoveryJob) Namespace() string {
return d.Job.Type
}

func (d Discovery) CustomTags() []model.Tag {
func (d DiscoveryJob) CustomTags() []model.Tag {
return d.Job.CustomTags
}

func (d Discovery) listMetricsParams() listmetrics.ProcessingParams {
func (d DiscoveryJob) listMetricsParams() listmetrics.ProcessingParams {
return listmetrics.ProcessingParams{
Namespace: d.Job.Type,
Metrics: d.Job.Metrics,
Expand All @@ -28,6 +28,6 @@ func (d Discovery) listMetricsParams() listmetrics.ProcessingParams {
}
}

func (d Discovery) resourceEnrichment() ResourceEnrichment {
func (d DiscoveryJob) resourceEnrichment() ResourceEnrichment {
return resourcemetadata.NewResourceAssociation(d.Job.DimensionsRegexps, d.Job.ExportedTagsOnMetrics, nil)
}
14 changes: 8 additions & 6 deletions pkg/job/cloudwatchrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func NewDefault(logger logging.Logger, factory clients.Factory, params Params, j
cloudwatchClient := factory.GetCloudwatchClient(params.Region, params.Role, params.CloudwatchConcurrency)
lmProcessor := listmetrics.NewDefaultProcessor(logger, cloudwatchClient)
gmdProcessor := getmetricdata.NewDefaultProcessor(logger, cloudwatchClient, params.GetMetricDataMetricsPerQuery, params.CloudwatchConcurrency.GetMetricData)
return New(logger, lmProcessor, gmdProcessor, params, job)
a := appender.New(job.resourceEnrichment().Create(logger))

return New(logger, lmProcessor, gmdProcessor, a, params, job)
}

type Runner struct {
Expand All @@ -54,28 +56,28 @@ type Runner struct {
listMetrics listMetricsProcessor
getMetricData getMetricDataProcessor
params Params
appender *appender.Appender
}

// New allows an injection point for interfaces
func New(logger logging.Logger, listMetrics listMetricsProcessor, getMetricData getMetricDataProcessor, params Params, job Job) *Runner {
func New(logger logging.Logger, listMetrics listMetricsProcessor, getMetricData getMetricDataProcessor, a *appender.Appender, params Params, job Job) *Runner {
return &Runner{
logger: logger,
job: job,
listMetrics: listMetrics,
getMetricData: getMetricData,
appender: a,
params: params,
}
}

func (r *Runner) Run(ctx context.Context) ([]*model.CloudwatchData, error) {
a := appender.New(r.job.resourceEnrichment().Create(r.logger))

err := r.listMetrics.Run(ctx, r.job.listMetricsParams(), a)
err := r.listMetrics.Run(ctx, r.job.listMetricsParams(), r.appender)
if err != nil {
return nil, fmt.Errorf("failed to list metric data: %w", err)
}

getMetricDatas := a.ListAll()
getMetricDatas := r.appender.ListAll()
if len(getMetricDatas) == 0 {
return nil, nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/job/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func getMetricDataForQueries(
defer wg.Done()

err := clientCloudwatch.ListMetrics(ctx, svc.Namespace, metric, discoveryJob.RecentlyActiveOnly, func(page []*model.Metric) {
data := getFilteredMetricDatas(logger, discoveryJob.Type, discoveryJob.ExportedTagsOnMetrics, page, discoveryJob.DimensionNameRequirements, metric, assoc)
data := getFilteredMetricDatas(discoveryJob.Type, discoveryJob.ExportedTagsOnMetrics, page, discoveryJob.DimensionNameRequirements, metric, assoc)

mux.Lock()
getMetricDatas = append(getMetricDatas, data...)
Expand All @@ -116,7 +116,6 @@ func (ns nopAssociator) AssociateMetricToResource(_ *model.Metric) (*model.Tagge
}

func getFilteredMetricDatas(
logger logging.Logger,
namespace string,
tagsOnMetrics []string,
metricsList []*model.Metric,
Expand Down
2 changes: 1 addition & 1 deletion pkg/job/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func Test_getFilteredMetricDatas(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assoc := maxdimassociator.NewAssociator(logging.NewNopLogger(), tt.args.dimensionRegexps, tt.args.resources)
metricDatas := getFilteredMetricDatas(logging.NewNopLogger(), tt.args.namespace, tt.args.tagsOnMetrics, tt.args.metricsList, tt.args.dimensionNameRequirements, tt.args.m, assoc)
metricDatas := getFilteredMetricDatas(tt.args.namespace, tt.args.tagsOnMetrics, tt.args.metricsList, tt.args.dimensionNameRequirements, tt.args.m, assoc)
if len(metricDatas) != len(tt.wantGetMetricsData) {
t.Errorf("len(getFilteredMetricDatas()) = %v, want %v", len(metricDatas), len(tt.wantGetMetricsData))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,30 @@ import (
"errors"
"fmt"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/tagging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type Processor struct {
type Runner struct {
client tagging.Client
logger logging.Logger
}

func NewProcessor(logger logging.Logger, client tagging.Client) *Processor {
return &Processor{
func NewDefaultRunner(logger logging.Logger, clientFactory clients.Factory, region string, role model.Role, concurrency int) *Runner {
taggingClient := clientFactory.GetTaggingClient(region, role, concurrency)
return NewRunner(logger, taggingClient)
}

func NewRunner(logger logging.Logger, client tagging.Client) *Runner {
return &Runner{
logger: logger,
client: client,
}
}

func (p Processor) Run(ctx context.Context, region string, job model.DiscoveryJob) ([]*model.TaggedResource, error) {
func (p Runner) Run(ctx context.Context, region string, job model.DiscoveryJob) ([]*model.TaggedResource, error) {
resources, err := p.client.GetResources(ctx, job, region)
if err != nil {
if errors.Is(err, tagging.ErrExpectedToFindResources) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/job/runnerfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package job

import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/cloudwatchrunner"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type RunnerFactory struct{}

func (r RunnerFactory) NewResourceMetadataRunner(logger logging.Logger, clientFactory clients.Factory, region string, role model.Role, concurrency int) *resourcemetadata.Runner {
return resourcemetadata.NewDefaultRunner(logger, clientFactory, region, role, concurrency)
}

func (r RunnerFactory) NewCloudWatchRunner(logger logging.Logger, factory clients.Factory, params cloudwatchrunner.Params, job cloudwatchrunner.Job) *cloudwatchrunner.Runner {
return cloudwatchrunner.NewDefault(logger, factory, params, job)
}
2 changes: 1 addition & 1 deletion pkg/job/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func ScrapeAwsData(
logger.Error(nil, "Static jobs are not supported by the unified job runner at this time")
return nil, nil
}
runner := NewScrapeRunner(logger, jobsCfg, factory, metricsPerQuery, cloudwatchConcurrency, taggingAPIConcurrency)
runner := NewScrapeRunner(logger, jobsCfg, factory, RunnerFactory{}, metricsPerQuery, cloudwatchConcurrency, taggingAPIConcurrency)
return runner.Run(ctx)
}

Expand Down
29 changes: 19 additions & 10 deletions pkg/job/scraperunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ import (
)

type ScrapeRunner struct {
factory clients.Factory
clientFactory clients.Factory
jobsCfg model.JobsConfig
metricsPerQuery int
cloudwatchConcurrency cloudwatch.ConcurrencyConfig
taggingAPIConcurrency int

roleRegionToAccount map[model.Role]map[string]string
logger logging.Logger
runnerFactory runnerFactory
}

type runnerFactory interface {
NewResourceMetadataRunner(logger logging.Logger, clientFactory clients.Factory, region string, role model.Role, concurrency int) *resourcemetadata.Runner
NewCloudWatchRunner(logger logging.Logger, factory clients.Factory, params cloudwatchrunner.Params, job cloudwatchrunner.Job) *cloudwatchrunner.Runner
}

func NewScrapeRunner(logger logging.Logger,
jobsCfg model.JobsConfig,
factory clients.Factory,
clientFactory clients.Factory,
runnerFactory runnerFactory,
metricsPerQuery int,
cloudwatchConcurrency cloudwatch.ConcurrencyConfig,
taggingAPIConcurrency int,
Expand All @@ -40,7 +47,8 @@ func NewScrapeRunner(logger logging.Logger,
})

return &ScrapeRunner{
factory: factory,
clientFactory: clientFactory,
runnerFactory: runnerFactory,
logger: logger,
jobsCfg: jobsCfg,
metricsPerQuery: metricsPerQuery,
Expand All @@ -58,7 +66,8 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
for region := range regions {
wg.Add(1)
go func(role model.Role, region string) {
accountID, err := sr.factory.GetAccountClient(region, role).GetAccount(ctx)
defer wg.Done()
accountID, err := sr.clientFactory.GetAccountClient(region, role).GetAccount(ctx)
if err != nil {
sr.logger.Error(err, "Failed to get Account", "region", region, "role_arn", role.RoleArn)
} else {
Expand Down Expand Up @@ -96,8 +105,7 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
var jobToRun cloudwatchrunner.Job
jobAction(jobLogger, job,
func(job model.DiscoveryJob) {
taggingClient := sr.factory.GetTaggingClient(region, role, sr.taggingAPIConcurrency)
rmProcessor := resourcemetadata.NewProcessor(jobLogger, taggingClient)
rmProcessor := sr.runnerFactory.NewResourceMetadataRunner(jobLogger, sr.clientFactory, region, role, sr.taggingAPIConcurrency)

jobLogger.Debug("Starting resource discovery")
resources, err := rmProcessor.Run(ctx, region, job)
Expand All @@ -122,7 +130,7 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
}
jobLogger.Debug("Resource discovery finished", "number_of_discovered_resources", len(resources))

jobToRun = cloudwatchrunner.Discovery{Job: job, Resources: resources}
jobToRun = cloudwatchrunner.DiscoveryJob{Job: job, Resources: resources}
}, func(job model.CustomNamespaceJob) {
jobToRun = cloudwatchrunner.CustomNamespaceJob{Job: job}
},
Expand All @@ -134,15 +142,16 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
CloudwatchConcurrency: sr.cloudwatchConcurrency,
GetMetricDataMetricsPerQuery: sr.metricsPerQuery,
}
runner := cloudwatchrunner.NewDefault(jobLogger, sr.factory, runnerParams, jobToRun)
runner := sr.runnerFactory.NewCloudWatchRunner(jobLogger, sr.clientFactory, runnerParams, jobToRun)
metricResult, err := runner.Run(ctx)
if err != nil {
jobLogger.Error(err, "Failed to run job")
return
}

if metricResult == nil {
jobLogger.Info("No metrics data found")
jobLogger.Debug("No metrics data found")
return
}

jobLogger.Debug("Job run finished", "number_of_metrics", len(metricResult))
Expand All @@ -161,8 +170,8 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
metricResults = append(metricResults, result)
}()
})
sr.logger.Debug("Finished job runs", "resource_results", len(resourceResults), "metric_results", len(metricResults))
wg.Wait()
sr.logger.Debug("Finished job runs", "resource_results", len(resourceResults), "metric_results", len(metricResults))
return resourceResults, metricResults
}

Expand Down

0 comments on commit 955217d

Please sign in to comment.