diff --git a/handler/handler.go b/handler/handler.go index 9f8a055..5718cb5 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -44,8 +44,8 @@ func initLogger(ctx context.Context, request events.APIGatewayProxyRequest, toke if lambdaContext, ok := lambdacontext.FromContext(ctx); ok { awsRequestId = lambdaContext.AwsRequestID } - if awsAccount := strings.Split(request.Headers["X-Amz-Firehose-Source-Arn"], ":"); len(awsAccount) > 4 { - account = awsAccount[4] + if arnStr := strings.Split(request.Headers["X-Amz-Firehose-Source-Arn"], ":"); len(arnStr) > 4 { + account = arnStr[4] } if len(token) >= 5 { logzioIdentifier = token[len(token)-5:] @@ -125,7 +125,7 @@ func extractHeaders(request events.APIGatewayProxyRequest) (string, string, stri } var commonAttributesMap map[string]interface{} if err := json.Unmarshal([]byte(commonAttributes), &commonAttributesMap); err != nil { - return "", "", "" + return requestId, logzioToken, "" } envID := commonAttributesMap["commonAttributes"].(map[string]interface{})["p8s_logzio_name"].(string) fmt.Println("Common attributes: ", commonAttributesMap) @@ -133,6 +133,9 @@ func extractHeaders(request events.APIGatewayProxyRequest) (string, string, stri } func createPrometheusRemoteWriteExporter(log *zap.Logger, LogzioToken, envId string) (exporter.Metrics, error) { + if envId == "" { + envId = "logzio-otlp-metrics-stream" + } cfg := &prometheusremotewriteexporter.Config{ ExternalLabels: map[string]string{"p8s_logzio_name": envId}, ClientConfig: confighttp.ClientConfig{ @@ -140,6 +143,12 @@ func createPrometheusRemoteWriteExporter(log *zap.Logger, LogzioToken, envId str Timeout: 5 * time.Second, Headers: map[string]configopaque.String{"Authorization": configopaque.String(fmt.Sprintf("Bearer %s", LogzioToken))}, }, + TargetInfo: &prometheusremotewriteexporter.TargetInfo{ + Enabled: false, + }, + CreatedMetric: &prometheusremotewriteexporter.CreatedMetric{ + Enabled: false, + }, ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, RemoteWriteQueue: prometheusremotewriteexporter.RemoteWriteQueue{ Enabled: true, @@ -166,19 +175,22 @@ func createPrometheusRemoteWriteExporter(log *zap.Logger, LogzioToken, envId str } func convertResourceAttributes(resourceAttributes pcommon.Map) { + newAttributes := pcommon.NewMap() resourceAttributes.Range(func(k string, v pcommon.Value) bool { - resourceAttributes.PutStr(strings.ToLower(k), strings.ToLower(v.AsString())) + lowerKey := strings.ToLower(k) + newAttributes.PutStr(lowerKey, v.AsString()) + if lowerKey == cloudAccountIdAtt { + newAttributes.PutStr("account", v.AsString()) + } else if lowerKey == cloudRegionAtt { + newAttributes.PutStr("region", v.AsString()) + } return true }) - if accountId, ok := resourceAttributes.Get(cloudAccountIdAtt); ok { - resourceAttributes.PutStr("account", accountId.AsString()) - resourceAttributes.Remove(cloudAccountIdAtt) - } - if region, ok := resourceAttributes.Get(cloudRegionAtt); ok { - resourceAttributes.PutStr("region", region.AsString()) - resourceAttributes.Remove(cloudRegionAtt) - } - resourceAttributes.Remove("aws.exporter.arn") + newAttributes.Remove("cloud.account.id") + newAttributes.Remove("cloud.region") + newAttributes.Remove("aws.exporter.arn") + resourceAttributes.Clear() + newAttributes.CopyTo(resourceAttributes) } func convertAttributes(attributes pcommon.Map) { @@ -196,7 +208,7 @@ func convertAttributes(attributes pcommon.Map) { }) } -func createMinMaxMetrics(metricName string, dp pmetric.SummaryDataPoint) (pmetric.Metric, pmetric.Metric) { +func createMinMaxMetrics(metricName string, dp pmetric.SummaryDataPoint, resourceAttributes pcommon.Map) (pmetric.Metric, pmetric.Metric) { minMetric := pmetric.NewMetric() minMetric.SetName(metricName + minStr) maxMetric := pmetric.NewMetric() @@ -205,6 +217,12 @@ func createMinMaxMetrics(metricName string, dp pmetric.SummaryDataPoint) (pmetri maxDp := maxMetric.SetEmptyGauge().DataPoints().AppendEmpty() minDp.SetTimestamp(dp.StartTimestamp()) maxDp.SetTimestamp(dp.StartTimestamp()) + // Copy resource attributes to min and max metrics + resourceAttributes.Range(func(k string, v pcommon.Value) bool { + minDp.Attributes().PutStr(k, v.AsString()) + return true + }) + // Copy datapoint attributes to min and max metrics dp.Attributes().Range(func(k string, v pcommon.Value) bool { minDp.Attributes().PutStr(k, v.AsString()) maxDp.Attributes().PutStr(k, v.AsString()) @@ -246,7 +264,7 @@ func processRecord(protoBuffer *proto.Buffer, log *zap.Logger) (pmetric.Metrics, for l := 0; l < sm.Summary().DataPoints().Len(); l++ { dp := sm.Summary().DataPoints().At(l) convertAttributes(dp.Attributes()) - minMetric, maxMetric := createMinMaxMetrics(sm.Name(), dp) + minMetric, maxMetric := createMinMaxMetrics(sm.Name(), dp, resourceMetrics.Resource().Attributes()) minMetric.CopyTo(minMaxMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()) maxMetric.CopyTo(minMaxMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()) dp.QuantileValues().RemoveIf(func(qv pmetric.SummaryDataPointValueAtQuantile) bool {