Skip to content

Commit

Permalink
Merge pull request #6 from logzio/development
Browse files Browse the repository at this point in the history
Improve error handling + tests
  • Loading branch information
yotamloe authored May 30, 2022
2 parents 0c7ecd3 + 8e607d5 commit 748c6b4
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 13 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/auto-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Test Handler
on:
pull_request:
branches: [ master, main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
- name: Test
run: |
cd handler
go test -v -covermode=atomic -coverprofile=coverage.out
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# metric-stream-lambda
Lambda function that receives OTLP (0.7.0) data from AWS metric stream and exports the data to logz.io using prometheus remote write


### How to create function.zip
```
make function
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ go 1.17

require (
github.com/aws/aws-lambda-go v1.13.3
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.3
github.com/prometheus/prometheus v1.8.2-0.20210621150501-ff58416a0b02
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.29.0
go.opentelemetry.io/otel/metric v0.28.0
go.opentelemetry.io/proto/otlp v0.15.0
Expand All @@ -14,9 +18,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/jaegertracing/jaeger v1.23.0 // indirect
github.com/knadh/koanf v1.1.0 // indirect
Expand All @@ -25,10 +27,8 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.29.0 // indirect
github.com/prometheus/prometheus v1.8.2-0.20210621150501-ff58416a0b02 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/stretchr/testify v1.7.1 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.6.0 // indirect
Expand All @@ -42,5 +42,5 @@ require (
google.golang.org/grpc v1.42.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gopkg.in/yaml.v3 v3.0.0 // indirect
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1654,8 +1654,9 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
22 changes: 21 additions & 1 deletion handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ const (
maxStr = "_max"
)

type ErrorCollector []error

func (c *ErrorCollector) Collect(e error) { *c = append(*c, e) }

func (c *ErrorCollector) Length() int {
return len(*c)
}

func (c *ErrorCollector) Error() error {
err := "Collected errors:\n"
for i, e := range *c {
err += fmt.Sprintf("\tError %d: %s\n", i, e.Error())
}
return errors.New(err)
}

type firehoseResponse struct {
RequestId string `json:"requestId"`
Timestamp int64 `json:"timestamp"`
Expand Down Expand Up @@ -185,6 +201,7 @@ func summaryValuesToMetrics(metricsToSendSlice pdata.InstrumentationLibraryMetri
func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
metricCount := 0
dataPointCount := 0
shippingErrors := new(ErrorCollector)
ListenerUrl := ""
// extract parameters
var ca map[string]interface{}
Expand Down Expand Up @@ -313,6 +330,7 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
err = metricsExporter.PushMetrics(ctx, metricsToSend)
if err != nil {
log.Printf("Error while sending metrics: %s", err)
shippingErrors.Collect(err)
} else {
numberOfMetrics, numberOfDataPoints := metricsToSend.MetricAndDataPointCount()
metricCount += numberOfMetrics
Expand All @@ -326,6 +344,8 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
log.Printf("Error while shutting down exporter: %s", err)
return generateValidFirehoseResponse(500, requestId, "Error while shutting down exporter:", err), err
}

if shippingErrors.Length() > 0 {
return generateValidFirehoseResponse(500, requestId, "Error while sending metrics:", shippingErrors.Error()), shippingErrors.Error()
}
return generateValidFirehoseResponse(200, requestId, "", nil), nil
}
96 changes: 90 additions & 6 deletions handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,50 @@ import (
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
pdata "go.opentelemetry.io/collector/consumer/pdata"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"os"
"testing"
)

func TestHandleRequest(t *testing.T) {
var metricCount = 0
handleFunc := func(w http.ResponseWriter, r *http.Request, code int) {
// The following is a handler function that reads the sent httpRequest, unmarshal, and checks if the WriteRequest
// preserves the TimeSeries data correctly
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
require.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
assert.Equal(t, "opentelemetry/0.7", r.Header.Get("User-Agent"))
writeReq := &prompb.WriteRequest{}
var unzipped []byte
dest, err := snappy.Decode(unzipped, body)
require.NoError(t, err)
ok := proto.Unmarshal(dest, writeReq)
require.NoError(t, ok)
require.NotNil(t, writeReq.GetTimeseries())
metricCount += len(writeReq.Timeseries)
w.WriteHeader(code)
}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if handleFunc != nil {
handleFunc(w, r, http.StatusOK)
}
}))
defer server.Close()
jsonFile, err := os.Open("../testdata/customUrlEvent.json")
if err != nil {
fmt.Println(err)
Expand All @@ -22,8 +58,38 @@ func TestHandleRequest(t *testing.T) {
byteValue, _ := ioutil.ReadAll(jsonFile)
request := events.APIGatewayProxyRequest{}
json.Unmarshal(byteValue, &request)
result, _ := HandleRequest(ctx, request)
log.Println(result)
request.Headers["x-amz-firehose-common-attributes"] = fmt.Sprintf("{\"commonAttributes\":{\"LOGZIO_TOKEN\":\"token\",\"CUSTOM_LISTENER\":\"%s\"}}", server.URL)
result, err := HandleRequest(ctx, request)
assert.NoError(t, err)
assert.Equal(t, result.StatusCode, http.StatusOK)
assert.Equal(t, 3688, metricCount)

}

func TestHandleRequestErrors(t *testing.T) {
type getListenerUrlTest struct {
file string
expected int
}
var getListenerUrlTests = []getListenerUrlTest{
{"noValidToken", 500},
{"noToken", 400},
{"malformedBody", 500},
}
for _, test := range getListenerUrlTests {
jsonFile, err := os.Open(fmt.Sprintf("../testdata/%s.json", test.file))
if err != nil {
fmt.Println(err)
}
ctx := context.Background()
byteValue, _ := ioutil.ReadAll(jsonFile)
request := events.APIGatewayProxyRequest{}
json.Unmarshal(byteValue, &request)
result, err := HandleRequest(ctx, request)
assert.Equal(t, test.expected, result.StatusCode)
err = jsonFile.Close()
assert.NoError(t, err)
}
}

func TestCreateMetricFromAttributes(t *testing.T) {
Expand Down Expand Up @@ -53,6 +119,26 @@ func TestCreateMetricFromAttributes(t *testing.T) {
}
}

func TestGetListenerUrl(t *testing.T) {
type getListenerUrlTest struct {
region string
expected string
}
var getListenerUrlTests = []getListenerUrlTest{
{"us-east-1", "https://listener.logz.io:8053"},
{"ca-central-1", "https://listener-ca.logz.io:8053"},
{"eu-central-1", "https://listener-eu.logz.io:8053"},
{"eu-west-2", "https://listener-uk.logz.io:8053"},
{"", "https://listener.logz.io:8053"},
{"not-valid", "https://listener.logz.io:8053"},
}
for _, test := range getListenerUrlTests {
os.Setenv("AWS_REGION", test.region)
output := getListenerUrl()
require.Equal(t, output, test.expected)
}
}

func TestSummaryValuesToMetrics(t *testing.T) {
testMetric := pdata.NewMetric()
testMetric.SetDataType(pdata.MetricDataTypeSummary)
Expand All @@ -70,13 +156,11 @@ func TestSummaryValuesToMetrics(t *testing.T) {
testQuantile99 := testQuantiles.AppendEmpty()
testQuantile99.SetValue(6)
testQuantile99.SetQuantile(0.99)

testResourceAttributes := pdata.NewAttributeMap()
testResourceAttributes.Insert("k", pdata.NewAttributeValueInt(1))

testMetricsToSend := pdata.NewMetrics()
testAggregatedInstrumentationLibraryMetrics := testMetricsToSend.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics()

summaryValuesToMetrics(testAggregatedInstrumentationLibraryMetrics, testMetric, testResourceAttributes)
assert.Equal(t, 5, testAggregatedInstrumentationLibraryMetrics.Len())

}
1 change: 1 addition & 0 deletions testdata/customUrlEvent.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"X-Forwarded-For": "5.102.247.60",
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https",
"X-Amz-Firehose-Access-Key": "token",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br",
"cache-control": "no-cache",
Expand Down
94 changes: 94 additions & 0 deletions testdata/malformedBody.json

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions testdata/noToken.json

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions testdata/noValidToken.json

Large diffs are not rendered by default.

0 comments on commit 748c6b4

Please sign in to comment.