Skip to content

Commit

Permalink
Merge pull request #111 from sasaki77/invalid-value-handling
Browse files Browse the repository at this point in the history
BUG: fix invalid response handling
  • Loading branch information
sasaki77 authored Nov 7, 2022
2 parents 88148a2 + 336403c commit a4e737e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 8 deletions.
8 changes: 5 additions & 3 deletions pkg/archiverappliance/aaclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -42,8 +43,8 @@ func (client AAclient) ExecuteSingleQuery(target string, qm models.ArchiverQuery
// target: This is the PV to be queried for. As the "query" argument may be a regular expression, the specific PV desired must be specified
queryUrl := buildQueryUrl(target, client.baseURL, qm)
queryResponse, _ := archiverSingleQuery(queryUrl)
parsedResponse, _ := archiverSingleQueryParser(queryResponse)
return parsedResponse, nil
parsedResponse, err := archiverSingleQueryParser(queryResponse)
return parsedResponse, err
}

func buildQueryUrl(target string, baseURL string, qm models.ArchiverQueryModel) string {
Expand Down Expand Up @@ -138,7 +139,8 @@ func archiverSingleQueryParser(jsonAsBytes []byte) (models.SingleData, error) {
jsonErr := json.Unmarshal(jsonAsBytes, &response)
if jsonErr != nil {
log.DefaultLogger.Warn("Conversion of incoming data to JSON has failed", "Error", jsonErr)
return sD, jsonErr
err := fmt.Errorf("response parse error. the response might have invalid data, e.g. infinity or null: %w", jsonErr)
return sD, err
}

if len(response) < 1 {
Expand Down
35 changes: 35 additions & 0 deletions pkg/archiverappliance/aaclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,41 @@ func TestArchiverSingleQueryParserEmpty(t *testing.T) {
}
}

func TestArchiverSingleQueryParserInvalidData(t *testing.T) {
var dataNames = []struct {
name string
fileName string
}{
{
name: "infinity data",
fileName: "../test_data/invalid_value_response.JSON",
},
}

type testData struct {
input []byte
name string
}

var tests []testData
for _, entry := range dataNames {
fileData, err := ioutil.ReadFile(entry.fileName)
if err != nil {
t.Fatalf("Failed to load test data: %v", err)
}
tests = append(tests, testData{input: fileData, name: entry.name})
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
_, err := archiverSingleQueryParser(testCase.input)
if err == nil {
t.Fatalf("An unexpected error has occurred")
}
})
}
}

func TestArchiverRegexQueryParser(t *testing.T) {
var tests = []struct {
input []byte
Expand Down
18 changes: 13 additions & 5 deletions pkg/archiverappliance/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,19 @@ queryCollector:
return response
}

type queryResponse struct {
response models.SingleData
err error
}

func singleQuery(ctx context.Context, qm models.ArchiverQueryModel, client client) backend.DataResponse {
response := backend.DataResponse{}

targetPvList := makeTargetPVList(client, qm.Target, qm.Regex, qm.MaxNumPVs)

// execute the individual queries
responseData := make([]*models.SingleData, 0, len(targetPvList))
responsePipe := make(chan models.SingleData)
responsePipe := make(chan queryResponse)

// Create timeout. If any request routines take longer than timeoutDurationSeconds to execute, they will be dropped.
timeoutDurationSeconds := 30 // units are seconds
Expand All @@ -72,9 +77,9 @@ func singleQuery(ctx context.Context, qm models.ArchiverQueryModel, client clien

// create goroutines for individual requests
for _, targetPv := range targetPvList {
go func(targetPv string, pipe chan models.SingleData) {
parsedResponse, _ := client.ExecuteSingleQuery(targetPv, qm)
pipe <- parsedResponse
go func(targetPv string, pipe chan queryResponse) {
parsedResponse, err := client.ExecuteSingleQuery(targetPv, qm)
pipe <- queryResponse{response: parsedResponse, err: err}
}(targetPv, responsePipe)
}

Expand All @@ -83,7 +88,10 @@ responseCollector:
for range targetPvList {
select {
case response := <-responsePipe:
responseData = append(responseData, &response)
if response.err != nil {
return backend.DataResponse{Error: response.err}
}
responseData = append(responseData, &response.response)
case <-timeoutPipe:
log.DefaultLogger.Warn("Timeout limit for query has been reached")
break responseCollector
Expand Down
54 changes: 54 additions & 0 deletions pkg/archiverappliance/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package archiverappliance
import (
"context"
"encoding/json"
"errors"
"testing"
"time"

Expand All @@ -27,6 +28,10 @@ func (f fakeClient) FetchRegexTargetPVs(regex string, limit int) ([]string, erro
}

func (f fakeClient) ExecuteSingleQuery(target string, qm models.ArchiverQueryModel) (models.SingleData, error) {
if target == "invalid" {
return models.SingleData{}, errors.New("test error")
}

var values []float64
if target == "PV:NAME1" {
values = []float64{0, 1, 2}
Expand Down Expand Up @@ -265,6 +270,55 @@ func TestQuery(t *testing.T) {
}
}

func TestQueryWithInvalidResponse(t *testing.T) {
TIME_FORMAT := "2006-01-02T15:04:05.000-07:00"
var tests = []struct {
name string
ctx context.Context
req *backend.QueryDataRequest
}{
{
name: "invalid response",
req: &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
Interval: testhelper.MultiReturnHelperParseDuration(time.ParseDuration("0s")),
JSON: json.RawMessage(`{
"alias": "",
"aliasPattern": "",
"constant":6.5,
"functions":[],
"hide":false ,
"operator": "",
"refId":"A" ,
"regex":false ,
"target":"invalid" ,
"functions":[
]
}`),
MaxDataPoints: 1000,
QueryType: "",
RefID: "A",
TimeRange: backend.TimeRange{
From: testhelper.MultiReturnHelperParse(time.Parse(TIME_FORMAT, "2021-01-27T14:30:41.678-08:00")),
To: testhelper.MultiReturnHelperParse(time.Parse(TIME_FORMAT, "2021-01-28T14:30:41.678-08:00")),
},
},
},
},
},
}
f := fakeClient{}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
result := Query(testCase.ctx, f, testCase.req)
if result.Responses["A"].Error == nil {
t.Errorf("An unexpected error has occurred")
}
})
}
}

func TestArchiverSingleQuery(t *testing.T) {
t.Skipf("Test not implemented")
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/test_data/invalid_value_response.JSON
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[
{
"meta": {
"name": "PFRVA:NEG:2NDSYS:PLASM:RESIS",
"waveform": false,
"EGU": "ohm",
"PREC": "0"
},
"data": [
{
"millis": 1667786399401,
"val": -Infinity
},
{
"millis": 1667786401400,
"val": null
},
{
"millis": 1667786408400,
"val": -Infinity
},
{
"millis": 1667786409400,
"val": null
}
]
}
]

0 comments on commit a4e737e

Please sign in to comment.