Skip to content

Commit

Permalink
otelcolclient cancels gRPC context when closed
Browse files Browse the repository at this point in the history
We don't expect this to actually be invoked currently, although we'd like
it to be.

The forwarder agent does not currently cancel contexts on shutdown as it
is not setup to run shutdown code when a signal is received.

Signed-off-by: Carson Long <[email protected]>
  • Loading branch information
acrmp committed Aug 3, 2023
1 parent a1e57ea commit 6ca518b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
23 changes: 17 additions & 6 deletions src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
)

type Client struct {
// The client API for the OTel Collector metrics service.
// The client API for the OTel Collector metrics service
msc colmetricspb.MetricsServiceClient
// Context passed to gRPC
ctx context.Context
// Cancel func invoked on shutdown
cancel func()
// The logger to use for errors
l *log.Logger
}
Expand All @@ -32,7 +36,13 @@ func New(addr string, tlsConfig *tls.Config, l *log.Logger) (*Client, error) {
return nil, err
}

return &Client{msc: colmetricspb.NewMetricsServiceClient(cc), l: l}, nil
ctx, cancel := context.WithCancel(context.Background())
return &Client{
msc: colmetricspb.NewMetricsServiceClient(cc),
ctx: ctx,
cancel: cancel,
l: l,
}, nil
}

// Write translates an envelope to OTLP and forwards it to the connected OTel
Expand All @@ -55,15 +65,16 @@ func (c *Client) Write(e *loggregator_v2.Envelope) error {
return err
}

// Close flushes any buffers and closes any connections.
// Close cancels the underlying context.
func (c *Client) Close() error {
c.cancel()
return nil
}

// writeCounter translates a loggregator v2 Counter to OTLP and forwards it.
func (c *Client) writeCounter(e *loggregator_v2.Envelope) error {
atts := attributes(e)
resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{
resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
Expand Down Expand Up @@ -123,7 +134,7 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error {
})
}

resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{
resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
Expand All @@ -143,7 +154,7 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error {
// writeTimer translates a loggregator v2 Timer to OTLP and forwards it.
func (c *Client) writeTimer(e *loggregator_v2.Envelope) error {
atts := attributes(e)
resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{
resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
Expand Down
19 changes: 17 additions & 2 deletions src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ var _ = Describe("Client", func() {
response: &colmetricspb.ExportMetricsServiceResponse{},
responseErr: nil,
}
ctx, cancel := context.WithCancel(context.Background())
c = Client{
msc: spyMSC,
l: log.New(GinkgoWriter, "", 0),
msc: spyMSC,
ctx: ctx,
cancel: cancel,
l: log.New(GinkgoWriter, "", 0),
}
})

Expand Down Expand Up @@ -566,15 +569,27 @@ var _ = Describe("Client", func() {
})
})
})

Describe("Close", func() {
It("cancels the context", func() {
envelope := &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}
Expect(c.Write(envelope)).ToNot(HaveOccurred())

Expect(c.Close()).ToNot(HaveOccurred())
Eventually(spyMSC.ctx.Done()).Should(BeClosed())
})
})
})

type spyMetricsServiceClient struct {
requests chan *colmetricspb.ExportMetricsServiceRequest
response *colmetricspb.ExportMetricsServiceResponse
responseErr error
ctx context.Context
}

func (c *spyMetricsServiceClient) Export(ctx context.Context, in *colmetricspb.ExportMetricsServiceRequest, opts ...grpc.CallOption) (*colmetricspb.ExportMetricsServiceResponse, error) {
c.requests <- in
c.ctx = ctx
return c.response, c.responseErr
}

0 comments on commit 6ca518b

Please sign in to comment.