Skip to content

Commit

Permalink
Clean up floating non-sense params
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeckhart committed Jun 3, 2024
1 parent 955217d commit 163ac99
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/config/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const AwsSdkV2 = "aws-sdk-v2"
// AlwaysReturnInfoMetrics is a feature flag used to enable the return of info metrics even when there are no corresponding CloudWatch metrics
const AlwaysReturnInfoMetrics = "always-return-info-metrics"

const UnifiedJobRunner = "unified-job-runner"
const UnifiedScraper = "unified-scraper"

// FeatureFlags is an interface all objects that can tell wether or not a feature flag is enabled can implement.
type FeatureFlags interface {
Expand Down
1 change: 0 additions & 1 deletion pkg/job/cloudwatchrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type listMetricsProcessor interface {
type Params struct {
Region string
Role model.Role
AccountID string
CloudwatchConcurrency cloudwatch.ConcurrencyConfig
GetMetricDataMetricsPerQuery int
}
Expand Down
27 changes: 22 additions & 5 deletions pkg/job/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,35 @@ package job

import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/account"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/cloudwatchrunner"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type RunnerFactory struct{}
type RunnerFactory struct {
clientFactory clients.Factory
resourceMetadataConcurrency int
cloudwatchConcurrency cloudwatch.ConcurrencyConfig
getMetricDataMetricsPerQuery int
}

func (r *RunnerFactory) GetAccountClient(region string, role model.Role) account.Client {
return r.clientFactory.GetAccountClient(region, role)
}

func (r RunnerFactory) NewResourceMetadataRunner(logger logging.Logger, clientFactory clients.Factory, region string, role model.Role, concurrency int) *resourcemetadata.Runner {
return resourcemetadata.NewDefaultRunner(logger, clientFactory, region, role, concurrency)
func (r *RunnerFactory) NewResourceMetadataRunner(logger logging.Logger, region string, role model.Role) *resourcemetadata.Runner {
return resourcemetadata.NewDefaultRunner(logger, r.clientFactory, region, role, r.resourceMetadataConcurrency)
}

func (r RunnerFactory) NewCloudWatchRunner(logger logging.Logger, factory clients.Factory, params cloudwatchrunner.Params, job cloudwatchrunner.Job) *cloudwatchrunner.Runner {
return cloudwatchrunner.NewDefault(logger, factory, params, job)
func (r *RunnerFactory) NewCloudWatchRunner(logger logging.Logger, region string, role model.Role, job cloudwatchrunner.Job) *cloudwatchrunner.Runner {
params := cloudwatchrunner.Params{
Region: region,
Role: role,
CloudwatchConcurrency: r.cloudwatchConcurrency,
GetMetricDataMetricsPerQuery: r.getMetricDataMetricsPerQuery,
}
return cloudwatchrunner.NewDefault(logger, r.clientFactory, params, job)
}
14 changes: 10 additions & 4 deletions pkg/job/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ func ScrapeAwsData(
cloudwatchConcurrency cloudwatch.ConcurrencyConfig,
taggingAPIConcurrency int,
) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) {
if config.FlagsFromCtx(ctx).IsFeatureEnabled(config.UnifiedJobRunner) {
if config.FlagsFromCtx(ctx).IsFeatureEnabled(config.UnifiedScraper) {
if len(jobsCfg.StaticJobs) > 0 {
logger.Error(nil, "Static jobs are not supported by the unified job runner at this time")
logger.Error(nil, "Static jobs are not supported by the unified scraper at this time")
return nil, nil
}
runner := NewScrapeRunner(logger, jobsCfg, factory, RunnerFactory{}, metricsPerQuery, cloudwatchConcurrency, taggingAPIConcurrency)
return runner.Run(ctx)
rf := &RunnerFactory{
clientFactory: factory,
resourceMetadataConcurrency: taggingAPIConcurrency,
cloudwatchConcurrency: cloudwatchConcurrency,
getMetricDataMetricsPerQuery: metricsPerQuery,
}
scraper := NewScraper(logger, jobsCfg, rf)
return scraper.Scrape(ctx)
}

mux := &sync.Mutex{}
Expand Down
82 changes: 31 additions & 51 deletions pkg/job/scraperunner.go → pkg/job/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,30 @@ import (
"fmt"
"sync"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/account"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/cloudwatchrunner"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/resourcemetadata"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type ScrapeRunner struct {
clientFactory clients.Factory
jobsCfg model.JobsConfig
metricsPerQuery int
cloudwatchConcurrency cloudwatch.ConcurrencyConfig
taggingAPIConcurrency int

type Scraper struct {
jobsCfg model.JobsConfig
roleRegionToAccount map[model.Role]map[string]string
logger logging.Logger
runnerFactory runnerFactory
}

type runnerFactory interface {
NewResourceMetadataRunner(logger logging.Logger, clientFactory clients.Factory, region string, role model.Role, concurrency int) *resourcemetadata.Runner
NewCloudWatchRunner(logger logging.Logger, factory clients.Factory, params cloudwatchrunner.Params, job cloudwatchrunner.Job) *cloudwatchrunner.Runner
GetAccountClient(region string, role model.Role) account.Client
NewResourceMetadataRunner(logger logging.Logger, region string, role model.Role) *resourcemetadata.Runner
NewCloudWatchRunner(logger logging.Logger, region string, role model.Role, job cloudwatchrunner.Job) *cloudwatchrunner.Runner
}

func NewScrapeRunner(logger logging.Logger,
func NewScraper(logger logging.Logger,
jobsCfg model.JobsConfig,
clientFactory clients.Factory,
runnerFactory runnerFactory,
metricsPerQuery int,
cloudwatchConcurrency cloudwatch.ConcurrencyConfig,
taggingAPIConcurrency int,
) *ScrapeRunner {
) *Scraper {
roleRegionToAccount := map[model.Role]map[string]string{}
jobConfigVisitor(jobsCfg, func(_ any, role model.Role, region string) {
if _, exists := roleRegionToAccount[role]; !exists {
Expand All @@ -46,56 +37,52 @@ func NewScrapeRunner(logger logging.Logger,
roleRegionToAccount[role] = map[string]string{region: ""}
})

return &ScrapeRunner{
clientFactory: clientFactory,
runnerFactory: runnerFactory,
logger: logger,
jobsCfg: jobsCfg,
metricsPerQuery: metricsPerQuery,
cloudwatchConcurrency: cloudwatchConcurrency,
taggingAPIConcurrency: taggingAPIConcurrency,
roleRegionToAccount: roleRegionToAccount,
return &Scraper{
runnerFactory: runnerFactory,
logger: logger,
jobsCfg: jobsCfg,
roleRegionToAccount: roleRegionToAccount,
}
}

func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) {
func (s Scraper) Scrape(ctx context.Context) ([]model.TaggedResourceResult, []model.CloudwatchMetricResult) {
var wg sync.WaitGroup
mux := &sync.Mutex{}
sr.logger.Debug("Starting account initialization")
for role, regions := range sr.roleRegionToAccount {
s.logger.Debug("Starting account initialization")
for role, regions := range s.roleRegionToAccount {
for region := range regions {
wg.Add(1)
go func(role model.Role, region string) {
defer wg.Done()
accountID, err := sr.clientFactory.GetAccountClient(region, role).GetAccount(ctx)
accountID, err := s.runnerFactory.GetAccountClient(region, role).GetAccount(ctx)
if err != nil {
sr.logger.Error(err, "Failed to get Account", "region", region, "role_arn", role.RoleArn)
s.logger.Error(err, "Failed to get Account", "region", region, "role_arn", role.RoleArn)
} else {
sr.roleRegionToAccount[role][region] = accountID
s.roleRegionToAccount[role][region] = accountID
}
}(role, region)
}
}
wg.Wait()
sr.logger.Debug("Finished account initialization")
s.logger.Debug("Finished account initialization")

metricResults := make([]model.CloudwatchMetricResult, 0)
resourceResults := make([]model.TaggedResourceResult, 0)
sr.logger.Debug("Starting job runs")
jobConfigVisitor(sr.jobsCfg, func(job any, role model.Role, region string) {
s.logger.Debug("Starting job runs")
jobConfigVisitor(s.jobsCfg, func(job any, role model.Role, region string) {
wg.Add(1)
go func() {
defer wg.Done()

var namespace string
jobAction(sr.logger, job, func(job model.DiscoveryJob) {
jobAction(s.logger, job, func(job model.DiscoveryJob) {
namespace = job.Type
}, func(job model.CustomNamespaceJob) {
namespace = job.Namespace
})
jobLogger := sr.logger.With("namespace", namespace, "region", region, "arn", role.RoleArn)
jobLogger := s.logger.With("namespace", namespace, "region", region, "arn", role.RoleArn)

accountID := sr.roleRegionToAccount[role][region]
accountID := s.roleRegionToAccount[role][region]
if accountID == "" {
jobLogger.Error(nil, "Account for job was not found see previous errors")
return
Expand All @@ -105,10 +92,9 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
var jobToRun cloudwatchrunner.Job
jobAction(jobLogger, job,
func(job model.DiscoveryJob) {
rmProcessor := sr.runnerFactory.NewResourceMetadataRunner(jobLogger, sr.clientFactory, region, role, sr.taggingAPIConcurrency)

jobLogger.Debug("Starting resource discovery")
resources, err := rmProcessor.Run(ctx, region, job)
rmRunner := s.runnerFactory.NewResourceMetadataRunner(jobLogger, region, role)
resources, err := rmRunner.Run(ctx, region, job)
if err != nil {
jobLogger.Error(err, "Resource metadata processor failed")
return
Expand Down Expand Up @@ -136,20 +122,14 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
},
)
jobLogger.Debug("Starting cloudwatch metrics runner")
runnerParams := cloudwatchrunner.Params{
Region: region,
Role: role,
CloudwatchConcurrency: sr.cloudwatchConcurrency,
GetMetricDataMetricsPerQuery: sr.metricsPerQuery,
}
runner := sr.runnerFactory.NewCloudWatchRunner(jobLogger, sr.clientFactory, runnerParams, jobToRun)
metricResult, err := runner.Run(ctx)
cwRunner := s.runnerFactory.NewCloudWatchRunner(jobLogger, region, role, jobToRun)
metricResult, err := cwRunner.Run(ctx)
if err != nil {
jobLogger.Error(err, "Failed to run job")
return
}

if metricResult == nil {
if len(metricResult) == 0 {
jobLogger.Debug("No metrics data found")
return
}
Expand All @@ -171,7 +151,7 @@ func (sr ScrapeRunner) Run(ctx context.Context) ([]model.TaggedResourceResult, [
}()
})
wg.Wait()
sr.logger.Debug("Finished job runs", "resource_results", len(resourceResults), "metric_results", len(metricResults))
s.logger.Debug("Finished job runs", "resource_results", len(resourceResults), "metric_results", len(metricResults))
return resourceResults, metricResults
}

Expand Down

0 comments on commit 163ac99

Please sign in to comment.