Skip to content

Commit

Permalink
Merge branch 'k8s-extensions'
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Mar 5, 2020
2 parents 27c4fde + e6c4273 commit 86a6105
Showing 1 changed file with 63 additions and 4 deletions.
67 changes: 63 additions & 4 deletions processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/open-telemetry/opentelemetry-collector/client"
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
Expand All @@ -31,6 +32,7 @@ import (

const (
sourceFormatJaeger string = "jaeger"
sourceFormatZipkin string = "zipkin"
ipLabelName string = "ip"
)

Expand Down Expand Up @@ -108,6 +110,21 @@ func (kp *kubernetesprocessor) ConsumeTraceData(ctx context.Context, td consumer
}
}

// If this was passed using Zipkin format, the information used for tagging might be present
// in each span attribute and should be tagged not on a resource, but rather span level
if podIP == "" && td.SourceFormat == sourceFormatZipkin {
for _, span := range td.Spans {
_ = kp.consumeZipkinSpan(ctx, span)
}
} else {
_ = kp.consumeTraceBatch(podIP, ctx, &td)
}

// TODO: should add to spans that have a resource not the same as the batch?
return kp.nextConsumer.ConsumeTraceData(ctx, td)
}

func (kp *kubernetesprocessor) consumeTraceBatch(podIP string, ctx context.Context, td *consumerdata.TraceData) error {
// Check if the receiver detected client IP.
if podIP == "" {
if c, ok := client.FromContext(ctx); ok {
Expand All @@ -128,12 +145,12 @@ func (kp *kubernetesprocessor) ConsumeTraceData(ctx context.Context, td consumer
// Don't invoke any k8s client functionality in passthrough mode.
// Just tag the IP and forward the batch.
if kp.passthroughMode {
return kp.nextConsumer.ConsumeTraceData(ctx, td)
return nil
}

attrs := kp.getAttributesForPodIP(podIP)
if len(attrs) == 0 {
return kp.nextConsumer.ConsumeTraceData(ctx, td)
return nil
}

if td.Resource == nil {
Expand All @@ -147,8 +164,50 @@ func (kp *kubernetesprocessor) ConsumeTraceData(ctx context.Context, td consumer
td.Resource.Labels[k] = v
}

// TODO: should add to spans that have a resource not the same as the batch?
return kp.nextConsumer.ConsumeTraceData(ctx, td)
return nil
}

func (kp *kubernetesprocessor) consumeZipkinSpan(ctx context.Context, span *tracepb.Span) error {
podIP := ""
if span.Attributes != nil && span.Attributes.AttributeMap != nil {
value := span.Attributes.AttributeMap[ipLabelName]
if value != nil {
podIP = value.GetStringValue().Value
}
}

// Check if the receiver detected client IP.
if podIP == "" {
if c, ok := client.FromContext(ctx); ok {
podIP = c.IP
if span.Attributes != nil && span.Attributes.AttributeMap != nil {
span.Attributes.AttributeMap[ipLabelName] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{
Value: podIP}}}
}
}
}

// Don't invoke any k8s client functionality in passthrough mode.
// Just tag the IP and forward the batch.
if kp.passthroughMode {
return nil
}

attrs := kp.getAttributesForPodIP(podIP)
if len(attrs) == 0 {
return nil
}

for k, v := range attrs {
span.Attributes.AttributeMap[k] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{
Value: v}}}
}

return nil
}

func (kp *kubernetesprocessor) getAttributesForPodIP(ip string) map[string]string {
Expand Down

0 comments on commit 86a6105

Please sign in to comment.