Skip to content

Commit

Permalink
Improve resource attributes handling
Browse files Browse the repository at this point in the history
  • Loading branch information
yotamloe committed Oct 13, 2024
1 parent a252851 commit f70fc71
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func initLogger(ctx context.Context, request events.APIGatewayProxyRequest, toke
if lambdaContext, ok := lambdacontext.FromContext(ctx); ok {
awsRequestId = lambdaContext.AwsRequestID
}
if awsAccount := strings.Split(request.Headers["X-Amz-Firehose-Source-Arn"], ":"); len(awsAccount) > 4 {
account = awsAccount[4]
if arnStr := strings.Split(request.Headers["X-Amz-Firehose-Source-Arn"], ":"); len(arnStr) > 4 {
account = arnStr[4]
}
if len(token) >= 5 {
logzioIdentifier = token[len(token)-5:]
Expand Down Expand Up @@ -125,21 +125,30 @@ func extractHeaders(request events.APIGatewayProxyRequest) (string, string, stri
}
var commonAttributesMap map[string]interface{}
if err := json.Unmarshal([]byte(commonAttributes), &commonAttributesMap); err != nil {
return "", "", ""
return requestId, logzioToken, ""
}
envID := commonAttributesMap["commonAttributes"].(map[string]interface{})["p8s_logzio_name"].(string)
fmt.Println("Common attributes: ", commonAttributesMap)
return requestId, logzioToken, envID
}

func createPrometheusRemoteWriteExporter(log *zap.Logger, LogzioToken, envId string) (exporter.Metrics, error) {
if envId == "" {
envId = "logzio-otlp-metrics-stream"
}
cfg := &prometheusremotewriteexporter.Config{
ExternalLabels: map[string]string{"p8s_logzio_name": envId},
ClientConfig: confighttp.ClientConfig{
Endpoint: getListenerUrl(log),
Timeout: 5 * time.Second,
Headers: map[string]configopaque.String{"Authorization": configopaque.String(fmt.Sprintf("Bearer %s", LogzioToken))},
},
TargetInfo: &prometheusremotewriteexporter.TargetInfo{
Enabled: false,
},
CreatedMetric: &prometheusremotewriteexporter.CreatedMetric{
Enabled: false,
},
ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true},
RemoteWriteQueue: prometheusremotewriteexporter.RemoteWriteQueue{
Enabled: true,
Expand All @@ -166,19 +175,22 @@ func createPrometheusRemoteWriteExporter(log *zap.Logger, LogzioToken, envId str
}

func convertResourceAttributes(resourceAttributes pcommon.Map) {
newAttributes := pcommon.NewMap()
resourceAttributes.Range(func(k string, v pcommon.Value) bool {
resourceAttributes.PutStr(strings.ToLower(k), strings.ToLower(v.AsString()))
lowerKey := strings.ToLower(k)
newAttributes.PutStr(lowerKey, v.AsString())
if lowerKey == cloudAccountIdAtt {
newAttributes.PutStr("account", v.AsString())
} else if lowerKey == cloudRegionAtt {
newAttributes.PutStr("region", v.AsString())
}
return true
})
if accountId, ok := resourceAttributes.Get(cloudAccountIdAtt); ok {
resourceAttributes.PutStr("account", accountId.AsString())
resourceAttributes.Remove(cloudAccountIdAtt)
}
if region, ok := resourceAttributes.Get(cloudRegionAtt); ok {
resourceAttributes.PutStr("region", region.AsString())
resourceAttributes.Remove(cloudRegionAtt)
}
resourceAttributes.Remove("aws.exporter.arn")
newAttributes.Remove("cloud.account.id")
newAttributes.Remove("cloud.region")
newAttributes.Remove("aws.exporter.arn")
resourceAttributes.Clear()
newAttributes.CopyTo(resourceAttributes)
}

func convertAttributes(attributes pcommon.Map) {
Expand All @@ -196,7 +208,7 @@ func convertAttributes(attributes pcommon.Map) {
})
}

func createMinMaxMetrics(metricName string, dp pmetric.SummaryDataPoint) (pmetric.Metric, pmetric.Metric) {
func createMinMaxMetrics(metricName string, dp pmetric.SummaryDataPoint, resourceAttributes pcommon.Map) (pmetric.Metric, pmetric.Metric) {
minMetric := pmetric.NewMetric()
minMetric.SetName(metricName + minStr)
maxMetric := pmetric.NewMetric()
Expand All @@ -205,6 +217,12 @@ func createMinMaxMetrics(metricName string, dp pmetric.SummaryDataPoint) (pmetri
maxDp := maxMetric.SetEmptyGauge().DataPoints().AppendEmpty()
minDp.SetTimestamp(dp.StartTimestamp())
maxDp.SetTimestamp(dp.StartTimestamp())
// Copy resource attributes to min and max metrics
resourceAttributes.Range(func(k string, v pcommon.Value) bool {
minDp.Attributes().PutStr(k, v.AsString())
return true
})
// Copy datapoint attributes to min and max metrics
dp.Attributes().Range(func(k string, v pcommon.Value) bool {
minDp.Attributes().PutStr(k, v.AsString())
maxDp.Attributes().PutStr(k, v.AsString())
Expand Down Expand Up @@ -246,7 +264,7 @@ func processRecord(protoBuffer *proto.Buffer, log *zap.Logger) (pmetric.Metrics,
for l := 0; l < sm.Summary().DataPoints().Len(); l++ {
dp := sm.Summary().DataPoints().At(l)
convertAttributes(dp.Attributes())
minMetric, maxMetric := createMinMaxMetrics(sm.Name(), dp)
minMetric, maxMetric := createMinMaxMetrics(sm.Name(), dp, resourceMetrics.Resource().Attributes())
minMetric.CopyTo(minMaxMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty())
maxMetric.CopyTo(minMaxMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty())
dp.QuantileValues().RemoveIf(func(qv pmetric.SummaryDataPointValueAtQuantile) bool {
Expand Down

0 comments on commit f70fc71

Please sign in to comment.