From c328e56b9b8c3faab5c05bc66ee608510eb01191 Mon Sep 17 00:00:00 2001 From: yotamloe Date: Mon, 16 May 2022 16:37:26 +0300 Subject: [PATCH] generate valid response for firehose (RequestId,Timestamp,ErrorMessage) --- handler/handler.go | 75 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 15 deletions(-) diff --git a/handler/handler.go b/handler/handler.go index 7456326..62bf0b5 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -29,9 +29,45 @@ const ( maxStr = "_max" ) -// Lambda response -type response struct { - message string `json:"message"` +type firehoseResponse struct { + RequestId string `json:"requestId"` + Timestamp int64 `json:"timestamp"` + ErrorMessage string `json:"errorMessage"` +} + +func generateValidFirehoseResponse(statusCode int, requestId string, errorMessage string, err error) events.APIGatewayProxyResponse { + if errorMessage != "" { + data := firehoseResponse{ + RequestId: requestId, + Timestamp: time.Now().Unix(), + ErrorMessage: fmt.Sprintf("%s %s", errorMessage, err), + } + jsonData, _ := json.Marshal(data) + return events.APIGatewayProxyResponse{ + Body: string(jsonData), + StatusCode: statusCode, + Headers: map[string]string{ + "content-type": "application/json", + }, + IsBase64Encoded: false, + MultiValueHeaders: map[string][]string{}, + } + } else { + data := firehoseResponse{ + RequestId: requestId, + Timestamp: time.Now().Unix(), + } + jsonData, _ := json.Marshal(data) + return events.APIGatewayProxyResponse{ + Body: string(jsonData), + StatusCode: statusCode, + Headers: map[string]string{ + "content-type": "application/json", + }, + IsBase64Encoded: false, + MultiValueHeaders: map[string][]string{}, + } + } } // Takes a base64 encoded string and returns decoded string @@ -157,7 +193,9 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) ( } if ca != nil { parameterMap := ca["commonAttributes"].(map[string]interface{}) - ListenerUrl = parameterMap["CUSTOM_LISTENER"].(string) + if parameterMap != nil { + ListenerUrl = parameterMap["CUSTOM_LISTENER"].(string) + } } if ListenerUrl == "" { ListenerUrl = getListenerUrl() @@ -168,6 +206,16 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) ( if LogzioToken == "" { LogzioToken = request.Headers["x-amz-firehose-access-key"] } + // get requestId to match firehose response requirements + requestId := request.Headers["X-Amz-Firehose-Request-Id"] + if requestId == "" { + requestId = request.Headers["x-amz-firehose-request-id"] + } + + if LogzioToken == "" { + log.Printf("Cant find access key in 'X-Amz-Firehose-Access-Key' or 'x-amz-firehose-access-key' headers") + return generateValidFirehoseResponse(400, requestId, "Cant find access key in 'X-Amz-Firehose-Access-Key' or 'x-amz-firehose-access-key' headers", nil), nil + } // Initializing prometheus remote write exporter cfg := &prometheusremotewriteexporter.Config{ @@ -193,19 +241,19 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) ( metricsExporter, err := prometheusremotewriteexporter.NewPRWExporter(cfg, buildInfo) if err != nil { log.Printf("Error while creating metrics exporter: %s", err) - return events.APIGatewayProxyResponse{}, err + return generateValidFirehoseResponse(500, requestId, "Error while creating metrics exporter:", err), err } err = metricsExporter.Start(ctx, componenttest.NewNopHost()) if err != nil { log.Printf("Error while starting metrics exporter: %s", err) - return events.APIGatewayProxyResponse{}, err + return generateValidFirehoseResponse(500, requestId, "Error while starting metrics exporter:", err), err } log.Println("Starting to parse request body") var body map[string]interface{} err = json.Unmarshal([]byte(request.Body), &body) if err != nil { log.Printf("Error while unmarshalling request body: %s", err) - return events.APIGatewayProxyResponse{}, err + return generateValidFirehoseResponse(500, requestId, "Error while unmarshalling request body:", err), err } /* api request body example structure: @@ -233,13 +281,13 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) ( err = protoBuffer.DecodeMessage(ExportMetricsServiceRequest) if err != nil { log.Printf("Error decoding data: %s", err) - return events.APIGatewayProxyResponse{}, err + return generateValidFirehoseResponse(500, requestId, "Error decoding data:", err), err } // Converting otlp proto message to proto bytes protoBytes, err := proto.Marshal(ExportMetricsServiceRequest) if err != nil { log.Printf("Error while converting otlp proto message to proto bytes: %s", err) - return events.APIGatewayProxyResponse{}, err + return generateValidFirehoseResponse(500, requestId, "Error while converting otlp proto message to proto bytes:", err), err } // Converting otlp proto bytes to pdata.metrics metrics, err := pdata.MetricsFromOtlpProtoBytes(protoBytes) @@ -273,11 +321,8 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) ( err = metricsExporter.Shutdown(ctx) if err != nil { log.Printf("Error while shutting down exporter: %s", err) - return events.APIGatewayProxyResponse{}, err - } - resp := &response{ - message: "Done", + return generateValidFirehoseResponse(500, requestId, "Error while shutting down exporter:", err), err } - resBody, err := json.Marshal(resp) - return events.APIGatewayProxyResponse{Body: string(resBody), StatusCode: 200}, nil + + return generateValidFirehoseResponse(200, requestId, "", nil), nil }