diff --git a/services/cd-service/pkg/repository/queue.go b/services/cd-service/pkg/repository/queue.go index 0a567c0e4..baad25e8c 100644 --- a/services/cd-service/pkg/repository/queue.go +++ b/services/cd-service/pkg/repository/queue.go @@ -29,7 +29,6 @@ import ( "context" "fmt" "github.com/freiheit-com/kuberpult/pkg/logger" - "go.uber.org/zap" ) type queue struct { @@ -60,9 +59,10 @@ func (q *queue) add(ctx context.Context, transformers []Transformer) <-chan erro result: resultChannel, } + defer q.GaugeQueueSize(ctx) + select { case q.transformerBatches <- e: - GaugeQueueSize(ctx, len(q.transformerBatches)) return resultChannel default: //Channel is full, we don't want to put anything else there. @@ -78,12 +78,11 @@ func makeQueueN(size uint) queue { } } -func GaugeQueueSize(ctx context.Context, queueSize int) { +func (q *queue) GaugeQueueSize(ctx context.Context) { if ddMetrics != nil { - err := ddMetrics.Gauge("request_queue_size", float64(queueSize), []string{}, 1) - - if err != nil { - logger.FromContext(ctx).Error("Error gauging queue size metric: ", zap.Error(err)) + queueSize := len(q.transformerBatches) + if err := ddMetrics.Gauge("request_queue_size", float64(queueSize), []string{}, 1); err != nil { + logger.FromContext(ctx).Warn("Error gauging queue size datadog metric.") } } } diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index 9cc18ec1d..697629d46 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -554,17 +554,17 @@ func (r *repository) useRemote(callback func(*git.Remote) error) error { } } -func (r *repository) drainQueue() []transformerBatch { +func (r *repository) drainQueue(ctx context.Context) []transformerBatch { if r.config.MaximumCommitsPerPush < 2 { return nil } limit := r.config.MaximumCommitsPerPush - 1 transformerBatches := []transformerBatch{} + defer r.queue.GaugeQueueSize(ctx) for uint(len(transformerBatches)) < limit { select { case f := <-r.queue.transformerBatches: // Check that the item is not already cancelled - GaugeQueueSize(f.ctx, len(r.queue.transformerBatches)) select { case <-f.ctx.Done(): f.finish(f.ctx.Err()) @@ -623,7 +623,7 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c }() // Try to fetch more items from the queue in order to push more things together - transformerBatches = append(transformerBatches, r.drainQueue()...) + transformerBatches = append(transformerBatches, r.drainQueue(ctx)...) var pushSuccess = true diff --git a/services/cd-service/pkg/repository/transformer_test.go b/services/cd-service/pkg/repository/transformer_test.go index 5be8138c3..5f5a0821f 100644 --- a/services/cd-service/pkg/repository/transformer_test.go +++ b/services/cd-service/pkg/repository/transformer_test.go @@ -5876,12 +5876,13 @@ func setupRepositoryTestWithPath(t *testing.T) (Repository, string) { repo, err := New( testutil.MakeTestContext(), RepositoryConfig{ - URL: remoteDir, - Path: localDir, - CommitterEmail: "kuberpult@freiheit.com", - CommitterName: "kuberpult", - WriteCommitData: true, - ArgoCdGenerateFiles: true, + URL: remoteDir, + Path: localDir, + CommitterEmail: "kuberpult@freiheit.com", + CommitterName: "kuberpult", + WriteCommitData: true, + MaximumCommitsPerPush: 5, + ArgoCdGenerateFiles: true, }, ) if err != nil { @@ -6390,6 +6391,65 @@ func TestUpdateDatadogMetricsInternal(t *testing.T) { } } +func TestDatadogQueueMetric(t *testing.T) { + tcs := []struct { + Name string + changes *TransformerResult + transformers []Transformer + expectedGauges int + }{ + { + Name: "Changes are sent as one event", + transformers: []Transformer{ + &CreateEnvironment{ + Environment: "envA", + Config: config.EnvironmentConfig{Upstream: &config.EnvironmentConfigUpstream{Latest: true}}, + }, + &CreateApplicationVersion{ + Application: "app1", + Manifests: map[string]string{ + "envA": "envA-manifest-1", + }, + WriteCommitData: false, + }, + &CreateApplicationVersion{ + Application: "app2", + Manifests: map[string]string{ + "envA": "envA-manifest-2", + }, + WriteCommitData: false, + }, + }, + expectedGauges: 2, + }, + } + for _, tc := range tcs { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + //t.Parallel() // do not run in parallel because of the global var `ddMetrics`! + ctx := WithTimeNow(testutil.MakeTestContext(), time.Unix(0, 0)) + var mockClient = &MockClient{} + var client statsd.ClientInterface = mockClient + ddMetrics = client + repo := setupRepositoryTest(t) + + err := repo.Apply(ctx, tc.transformers...) + + if err != nil { + t.Fatalf("Expected no error: %v", err) + } + + if tc.expectedGauges != len(mockClient.gauges) { + // Don't compare the value of the gauge, only the number of gauges, + // because we cannot be sure at this point what the size of the queue was during measurement + msg := fmt.Sprintf("expected %d gauges but got %d\n", + tc.expectedGauges, len(mockClient.gauges)) + t.Fatalf(msg) + } + }) + } +} + func TestUpdateDatadogEventsInternal(t *testing.T) { tcs := []struct { Name string