diff --git a/services/rollout-service/pkg/service/broadcast.go b/services/rollout-service/pkg/service/broadcast.go index 49f88357f..461f8fcec 100644 --- a/services/rollout-service/pkg/service/broadcast.go +++ b/services/rollout-service/pkg/service/broadcast.go @@ -19,6 +19,7 @@ package service import ( "context" "errors" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "sync" "time" @@ -176,15 +177,21 @@ func (b *Broadcast) DisconnectAll() { b.listener = make(map[chan *BroadcastEvent]struct{}) } -func (b *Broadcast) StreamStatus(req *api.StreamStatusRequest, svc api.RolloutService_StreamStatusServer) error { +func (b *Broadcast) StreamStatus(_ *api.StreamStatusRequest, svc api.RolloutService_StreamStatusServer) error { + ctx := svc.Context() + span, ctx := tracer.StartSpanFromContext(ctx, "StreamStatusConnect") + defer span.Finish() resp, ch, unsubscribe := b.Start() defer unsubscribe() + span.SetTag("numberOfEvents", len(resp)) for _, r := range resp { err := svc.Send(streamStatus(r)) if err != nil { return err } } + span.Finish() + for { select { case r := <-ch: @@ -196,8 +203,8 @@ func (b *Broadcast) StreamStatus(req *api.StreamStatusRequest, svc api.RolloutSe if err != nil { return err } - case <-svc.Context().Done(): - err := svc.Context().Err() + case <-ctx.Done(): + err := ctx.Err() if errors.Is(err, context.Canceled) { return nil }