Skip to content

Commit

Permalink
Merge pull request #4 from logzio/development
Browse files Browse the repository at this point in the history
generate valid response for firehose (RequestId,Timestamp,ErrorMessage)
  • Loading branch information
yotamloe authored May 16, 2022
2 parents 40aa6fc + c328e56 commit 5f2c7d2
Showing 1 changed file with 60 additions and 15 deletions.
75 changes: 60 additions & 15 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,45 @@ const (
maxStr = "_max"
)

// Lambda response
type response struct {
message string `json:"message"`
type firehoseResponse struct {
RequestId string `json:"requestId"`
Timestamp int64 `json:"timestamp"`
ErrorMessage string `json:"errorMessage"`
}

func generateValidFirehoseResponse(statusCode int, requestId string, errorMessage string, err error) events.APIGatewayProxyResponse {
if errorMessage != "" {
data := firehoseResponse{
RequestId: requestId,
Timestamp: time.Now().Unix(),
ErrorMessage: fmt.Sprintf("%s %s", errorMessage, err),
}
jsonData, _ := json.Marshal(data)
return events.APIGatewayProxyResponse{
Body: string(jsonData),
StatusCode: statusCode,
Headers: map[string]string{
"content-type": "application/json",
},
IsBase64Encoded: false,
MultiValueHeaders: map[string][]string{},
}
} else {
data := firehoseResponse{
RequestId: requestId,
Timestamp: time.Now().Unix(),
}
jsonData, _ := json.Marshal(data)
return events.APIGatewayProxyResponse{
Body: string(jsonData),
StatusCode: statusCode,
Headers: map[string]string{
"content-type": "application/json",
},
IsBase64Encoded: false,
MultiValueHeaders: map[string][]string{},
}
}
}

// Takes a base64 encoded string and returns decoded string
Expand Down Expand Up @@ -157,7 +193,9 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
}
if ca != nil {
parameterMap := ca["commonAttributes"].(map[string]interface{})
ListenerUrl = parameterMap["CUSTOM_LISTENER"].(string)
if parameterMap != nil {
ListenerUrl = parameterMap["CUSTOM_LISTENER"].(string)
}
}
if ListenerUrl == "" {
ListenerUrl = getListenerUrl()
Expand All @@ -168,6 +206,16 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
if LogzioToken == "" {
LogzioToken = request.Headers["x-amz-firehose-access-key"]
}
// get requestId to match firehose response requirements
requestId := request.Headers["X-Amz-Firehose-Request-Id"]
if requestId == "" {
requestId = request.Headers["x-amz-firehose-request-id"]
}

if LogzioToken == "" {
log.Printf("Cant find access key in 'X-Amz-Firehose-Access-Key' or 'x-amz-firehose-access-key' headers")
return generateValidFirehoseResponse(400, requestId, "Cant find access key in 'X-Amz-Firehose-Access-Key' or 'x-amz-firehose-access-key' headers", nil), nil
}

// Initializing prometheus remote write exporter
cfg := &prometheusremotewriteexporter.Config{
Expand All @@ -193,19 +241,19 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
metricsExporter, err := prometheusremotewriteexporter.NewPRWExporter(cfg, buildInfo)
if err != nil {
log.Printf("Error while creating metrics exporter: %s", err)
return events.APIGatewayProxyResponse{}, err
return generateValidFirehoseResponse(500, requestId, "Error while creating metrics exporter:", err), err
}
err = metricsExporter.Start(ctx, componenttest.NewNopHost())
if err != nil {
log.Printf("Error while starting metrics exporter: %s", err)
return events.APIGatewayProxyResponse{}, err
return generateValidFirehoseResponse(500, requestId, "Error while starting metrics exporter:", err), err
}
log.Println("Starting to parse request body")
var body map[string]interface{}
err = json.Unmarshal([]byte(request.Body), &body)
if err != nil {
log.Printf("Error while unmarshalling request body: %s", err)
return events.APIGatewayProxyResponse{}, err
return generateValidFirehoseResponse(500, requestId, "Error while unmarshalling request body:", err), err
}
/*
api request body example structure:
Expand Down Expand Up @@ -233,13 +281,13 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
err = protoBuffer.DecodeMessage(ExportMetricsServiceRequest)
if err != nil {
log.Printf("Error decoding data: %s", err)
return events.APIGatewayProxyResponse{}, err
return generateValidFirehoseResponse(500, requestId, "Error decoding data:", err), err
}
// Converting otlp proto message to proto bytes
protoBytes, err := proto.Marshal(ExportMetricsServiceRequest)
if err != nil {
log.Printf("Error while converting otlp proto message to proto bytes: %s", err)
return events.APIGatewayProxyResponse{}, err
return generateValidFirehoseResponse(500, requestId, "Error while converting otlp proto message to proto bytes:", err), err
}
// Converting otlp proto bytes to pdata.metrics
metrics, err := pdata.MetricsFromOtlpProtoBytes(protoBytes)
Expand Down Expand Up @@ -273,11 +321,8 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
err = metricsExporter.Shutdown(ctx)
if err != nil {
log.Printf("Error while shutting down exporter: %s", err)
return events.APIGatewayProxyResponse{}, err
}
resp := &response{
message: "Done",
return generateValidFirehoseResponse(500, requestId, "Error while shutting down exporter:", err), err
}
resBody, err := json.Marshal(resp)
return events.APIGatewayProxyResponse{Body: string(resBody), StatusCode: 200}, nil

return generateValidFirehoseResponse(200, requestId, "", nil), nil
}

0 comments on commit 5f2c7d2

Please sign in to comment.