Skip to content

Commit

Permalink
feat: queue size datadog tracking (#1491)
Browse files Browse the repository at this point in the history
Kuberpults queue size is now tracked on datadog.
New metric: kuberpult.request_queue_size

---------

Co-authored-by: Sven Urbanski <[email protected]>
  • Loading branch information
1 parent 5cb4e21 commit 0da192f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 16 deletions.
13 changes: 6 additions & 7 deletions services/cd-service/pkg/repository/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"context"
"fmt"
"github.com/freiheit-com/kuberpult/pkg/logger"
"go.uber.org/zap"
)

type queue struct {
Expand Down Expand Up @@ -60,9 +59,10 @@ func (q *queue) add(ctx context.Context, transformers []Transformer) <-chan erro
result: resultChannel,
}

defer q.GaugeQueueSize(ctx)

select {
case q.transformerBatches <- e:
GaugeQueueSize(ctx, len(q.transformerBatches))
return resultChannel
default:
//Channel is full, we don't want to put anything else there.
Expand All @@ -78,12 +78,11 @@ func makeQueueN(size uint) queue {
}
}

func GaugeQueueSize(ctx context.Context, queueSize int) {
func (q *queue) GaugeQueueSize(ctx context.Context) {
if ddMetrics != nil {
err := ddMetrics.Gauge("request_queue_size", float64(queueSize), []string{}, 1)

if err != nil {
logger.FromContext(ctx).Error("Error gauging queue size metric: ", zap.Error(err))
queueSize := len(q.transformerBatches)
if err := ddMetrics.Gauge("request_queue_size", float64(queueSize), []string{}, 1); err != nil {
logger.FromContext(ctx).Warn("Error gauging queue size datadog metric.")
}
}
}
6 changes: 3 additions & 3 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,17 +554,17 @@ func (r *repository) useRemote(callback func(*git.Remote) error) error {
}
}

func (r *repository) drainQueue() []transformerBatch {
func (r *repository) drainQueue(ctx context.Context) []transformerBatch {
if r.config.MaximumCommitsPerPush < 2 {
return nil
}
limit := r.config.MaximumCommitsPerPush - 1
transformerBatches := []transformerBatch{}
defer r.queue.GaugeQueueSize(ctx)
for uint(len(transformerBatches)) < limit {
select {
case f := <-r.queue.transformerBatches:
// Check that the item is not already cancelled
GaugeQueueSize(f.ctx, len(r.queue.transformerBatches))
select {
case <-f.ctx.Done():
f.finish(f.ctx.Err())
Expand Down Expand Up @@ -623,7 +623,7 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c
}()

// Try to fetch more items from the queue in order to push more things together
transformerBatches = append(transformerBatches, r.drainQueue()...)
transformerBatches = append(transformerBatches, r.drainQueue(ctx)...)

var pushSuccess = true

Expand Down
72 changes: 66 additions & 6 deletions services/cd-service/pkg/repository/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5876,12 +5876,13 @@ func setupRepositoryTestWithPath(t *testing.T) (Repository, string) {
repo, err := New(
testutil.MakeTestContext(),
RepositoryConfig{
URL: remoteDir,
Path: localDir,
CommitterEmail: "[email protected]",
CommitterName: "kuberpult",
WriteCommitData: true,
ArgoCdGenerateFiles: true,
URL: remoteDir,
Path: localDir,
CommitterEmail: "[email protected]",
CommitterName: "kuberpult",
WriteCommitData: true,
MaximumCommitsPerPush: 5,
ArgoCdGenerateFiles: true,
},
)
if err != nil {
Expand Down Expand Up @@ -6390,6 +6391,65 @@ func TestUpdateDatadogMetricsInternal(t *testing.T) {
}
}

func TestDatadogQueueMetric(t *testing.T) {
tcs := []struct {
Name string
changes *TransformerResult
transformers []Transformer
expectedGauges int
}{
{
Name: "Changes are sent as one event",
transformers: []Transformer{
&CreateEnvironment{
Environment: "envA",
Config: config.EnvironmentConfig{Upstream: &config.EnvironmentConfigUpstream{Latest: true}},
},
&CreateApplicationVersion{
Application: "app1",
Manifests: map[string]string{
"envA": "envA-manifest-1",
},
WriteCommitData: false,
},
&CreateApplicationVersion{
Application: "app2",
Manifests: map[string]string{
"envA": "envA-manifest-2",
},
WriteCommitData: false,
},
},
expectedGauges: 2,
},
}
for _, tc := range tcs {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
//t.Parallel() // do not run in parallel because of the global var `ddMetrics`!
ctx := WithTimeNow(testutil.MakeTestContext(), time.Unix(0, 0))
var mockClient = &MockClient{}
var client statsd.ClientInterface = mockClient
ddMetrics = client
repo := setupRepositoryTest(t)

err := repo.Apply(ctx, tc.transformers...)

if err != nil {
t.Fatalf("Expected no error: %v", err)
}

if tc.expectedGauges != len(mockClient.gauges) {
// Don't compare the value of the gauge, only the number of gauges,
// because we cannot be sure at this point what the size of the queue was during measurement
msg := fmt.Sprintf("expected %d gauges but got %d\n",
tc.expectedGauges, len(mockClient.gauges))
t.Fatalf(msg)
}
})
}
}

func TestUpdateDatadogEventsInternal(t *testing.T) {
tcs := []struct {
Name string
Expand Down

0 comments on commit 0da192f

Please sign in to comment.