From d5bde6c05c9aaa0f6fdc5d51cbc1fee3789cded4 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 01:37:33 -0800 Subject: [PATCH 1/7] added type for list of errors --- execute.go | 10 +++++++ execute_test.go | 64 +++++++++++++++++++++++++++++++++++++++- graphql/requests.go | 50 +++++++++++++++++++++++++++---- graphql/requests_test.go | 56 +++++++++++++++++++++++++++++++++++ 4 files changed, 174 insertions(+), 6 deletions(-) diff --git a/execute.go b/execute.go index bfeffb0..cbbc542 100644 --- a/execute.go +++ b/execute.go @@ -614,3 +614,13 @@ type ExecutorFn struct { func (e *ExecutorFn) Execute(plan *QueryPlan, variables map[string]interface{}) (map[string]interface{}, error) { return e.Fn(plan, variables) } + +// ErrExecutor always returnes the internal error. +type ErrExecutor struct { + Error error +} + +// Execute returns the internet error +func (e *ErrExecutor) Execute(plan *QueryPlan, variables map[string]interface{}) (map[string]interface{}, error) { + return nil, e.Error +} diff --git a/execute_test.go b/execute_test.go index 03b6ef0..202d522 100644 --- a/execute_test.go +++ b/execute_test.go @@ -1,6 +1,7 @@ package gateway import ( + "errors" "sync" "testing" @@ -465,6 +466,68 @@ func TestExecutor_insertIntoLists(t *testing.T) { }, result) } +func TestExecutor_multipleErrors(t *testing.T) { + // an executor should return a list of every error that it encounters while executing the plan + + // build a query plan that the executor will follow + result, err := (&ParallelExecutor{}).Execute(&QueryPlan{ + RootStep: &QueryPlanStep{ + Then: []*QueryPlanStep{ + { + // this is equivalent to + // query { values } + ParentType: "Query", + SelectionSet: ast.SelectionSet{ + &ast.Field{ + Name: "values", + Definition: &ast.FieldDefinition{ + Type: ast.ListType(ast.NamedType("String", &ast.Position{}), &ast.Position{}), + }, + }, + }, + // return a known value we can test against + Queryer: &graphql.QueryerFunc{ + func(input *graphql.QueryInput) (interface{}, error) { + return map[string]interface{}{"data": map[string]interface{}{}}, errors.New("message") + }, + }, + }, + { + // this is equivalent to + // query { values } + ParentType: "Query", + SelectionSet: ast.SelectionSet{ + &ast.Field{ + Name: "values", + Definition: &ast.FieldDefinition{ + Type: ast.ListType(ast.NamedType("String", &ast.Position{}), &ast.Position{}), + }, + }, + }, + // return a known value we can test against + Queryer: &graphql.QueryerFunc{ + func(input *graphql.QueryInput) (interface{}, error) { + return map[string]interface{}{"data": map[string]interface{}{}}, graphql.ErrorList{errors.New("message"), errors.New("message")} + }, + }, + }, + }, + }, + }, map[string]interface{}{}) + if err != nil { + t.Errorf("Encountered error executing plan: %v", err.Error()) + } + + // get the result back + values, ok := result["values"] + if !ok { + t.Errorf("Did not get any values back from the execution") + } + + // make sure we got the right values back + assert.Equal(t, []string{"hello", "world"}, values) +} + func TestExecutor_threadsVariables(t *testing.T) { // the variables we'll be threading through fullVariables := map[string]interface{}{ @@ -981,7 +1044,6 @@ func TestFindInsertionPoint_stitchIntoObject(t *testing.T) { } assert.Equal(t, finalInsertionPoint, generatedPoint) - } func TestFindInsertionPoint_handlesNullObjects(t *testing.T) { diff --git a/graphql/requests.go b/graphql/requests.go index b2673fc..64cd96b 100644 --- a/graphql/requests.go +++ b/graphql/requests.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "io/ioutil" "net/http" "reflect" @@ -65,7 +64,7 @@ func (q *QueryerFunc) Query(input *QueryInput, receiver interface{}) error { reflect.ValueOf(receiver).Elem().Set(reflect.ValueOf(response)) // no errors - return nil + return err } // NetworkQueryer sends the query to a url and returns the response @@ -122,6 +121,9 @@ func (q *NetworkQueryer) Query(input *QueryInput, receiver interface{}) error { // if there is an error if _, ok := result["errors"]; ok { + // a list of errors from the response + errList := ErrorList{} + // build up a list of errors errs, ok := result["errors"].([]interface{}) if !ok { @@ -129,7 +131,6 @@ func (q *NetworkQueryer) Query(input *QueryInput, receiver interface{}) error { } // a list of error messages - messages := []string{} for _, err := range errs { obj, ok := err.(map[string]interface{}) if !ok { @@ -141,10 +142,10 @@ func (q *NetworkQueryer) Query(input *QueryInput, receiver interface{}) error { return errors.New("error message was not a string") } - messages = append(messages, message) + errList = append(errList, NewError("", message)) } - return fmt.Errorf(strings.Join(messages, " ")) + return errList } // assign the result under the data key to the receiver @@ -172,3 +173,42 @@ func NewNetworkQueryer(url string) *NetworkQueryer { Client: &http.Client{}, } } + +// ErrorExtensions define fields that extend the standard graphql error shape +type ErrorExtensions struct { + Code string `json:"code"` +} + +// Error represents a graphql error +type Error struct { + Extensions ErrorExtensions `json:"extensions"` + Message string `json:"message"` +} + +func (e *Error) Error() string { + return e.Message +} + +// NewError returns a graphql error with the given code and message +func NewError(code string, message string) *Error { + return &Error{ + Message: message, + Extensions: ErrorExtensions{ + Code: code, + }, + } +} + +// ErrorList represents a list of errors +type ErrorList []error + +// Error returns a string representation of each error +func (list ErrorList) Error() string { + acc := []string{} + + for _, error := range list { + acc = append(acc, error.Error()) + } + + return strings.Join(acc, ". ") +} diff --git a/graphql/requests_test.go b/graphql/requests_test.go index dc3b3e8..f869bde 100644 --- a/graphql/requests_test.go +++ b/graphql/requests_test.go @@ -232,6 +232,62 @@ func TestNewNetworkQueryer(t *testing.T) { assert.Equal(t, "foo", NewNetworkQueryer("foo").URL) } +func TestSerializeError(t *testing.T) { + // marshal the 2 kinds of errors + errWithCode, _ := json.Marshal(NewError("ERROR_CODE", "foo")) + expected, _ := json.Marshal(map[string]interface{}{ + "extensions": map[string]interface{}{ + "code": "ERROR_CODE", + }, + "message": "foo", + }) + + assert.Equal(t, string(expected), string(errWithCode)) +} + +func TestNetworkQueryer_errorList(t *testing.T) { + // create a http client that responds with a known body and verifies the incoming query + client := &http.Client{ + Transport: roundTripFunc(func(req *http.Request) *http.Response { + return &http.Response{ + StatusCode: 200, + // Send response to be tested + Body: ioutil.NopCloser(bytes.NewBuffer([]byte(`{ + "data": null, + "errors": [ + {"message":"hello"} + ] + }`))), + // Must be set to non-nil value or it panics + Header: make(http.Header), + } + }), + } + + // the corresponding query document + query := ` + { + hello(world: "hello") { + world + } + } + ` + + queryer := &NetworkQueryer{ + URL: "hello", + Client: client, + } + + // get the error of the query + err := queryer.Query(&QueryInput{Query: query}, &map[string]interface{}{}) + + _, ok := err.(ErrorList) + if !ok { + t.Errorf("response of queryer was not an error list: %v", err.Error()) + return + } +} + func TestQueryerFunc_success(t *testing.T) { expected := map[string]interface{}{"hello": "world"} From 032dddb6e41ed5d6b12176ac09714f5141e6bc22 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 01:55:52 -0800 Subject: [PATCH 2/7] wait for all steps to finish before terminating excution --- execute.go | 52 +++++++++++++++++++------------------------------ execute_test.go | 19 ++++++++++-------- http.go | 33 ++++++++++++++++++------------- 3 files changed, 51 insertions(+), 53 deletions(-) diff --git a/execute.go b/execute.go index cbbc542..f4f94be 100644 --- a/execute.go +++ b/execute.go @@ -41,11 +41,11 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] // and a channel for errors errCh := make(chan error, 10) - // defer close(errCh) + defer close(errCh) // a channel to close the goroutine closeCh := make(chan bool) - // defer close(closeCh) + defer close(closeCh) // a lock for reading and writing to the result resultLock := &sync.Mutex{} @@ -61,9 +61,11 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] go executeStep(plan, step, []string{}, resultLock, variables, resultCh, errCh, stepWg) } + // the list of errors we have encountered while executing the plan + errs := graphql.ErrorList{} + // start a goroutine to add results to the list go func() { - ConsumptionLoop: for { select { // we have a new result @@ -76,7 +78,6 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] err := executorInsertObject(result, resultLock, payload.InsertionPoint, payload.Result) if err != nil { errCh <- err - continue ConsumptionLoop } log.Debug("Done. ", result) @@ -90,36 +91,16 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] } }() - // there are 2 possible options: - // - either the wait group finishes - // - we get a messsage over the error chan - - // in order to wait for either, let's spawn a go routine - // that waits until all of the steps are built and notifies us when its done - doneCh := make(chan bool) - // defer close(doneCh) + // when the wait group is finished + stepWg.Wait() - go func() { - // when the wait group is finished - stepWg.Wait() - // push a value over the channel - doneCh <- true - }() - - // wait for either the error channel or done channel - select { - // there was an error - case err := <-errCh: - log.Warn(fmt.Sprintf("Ran into execution error: %s", err.Error())) - closeCh <- true - // bubble the error up - return nil, err - // we are done - case <-doneCh: - closeCh <- true - // we're done here - return result, nil + // if we encountered any errors + if len(errs) > 0 { + return result, errs } + + // we didn't encounter any errors + return result, nil } // TODO: ugh... so... many... variables... @@ -160,12 +141,14 @@ func executeStep( pointData, err := executorGetPointData(head) if err != nil { errCh <- err + stepWg.Done() return } // if we dont have an id if pointData.ID == "" { errCh <- fmt.Errorf("Could not find id in path") + stepWg.Done() return } @@ -176,6 +159,7 @@ func executeStep( // if there is no queryer if step.Queryer == nil { errCh <- errors.New("could not find queryer for step") + stepWg.Done() return } // execute the query @@ -188,6 +172,7 @@ func executeStep( if err != nil { log.Warn("Network Error: ", err) errCh <- err + stepWg.Done() return } @@ -204,12 +189,14 @@ func executeStep( extractedResult, err := executorExtractValue(queryResult, resultLock, []string{"node"}) if err != nil { errCh <- err + stepWg.Done() return } resultObj, ok := extractedResult.(map[string]interface{}) if !ok { errCh <- fmt.Errorf("Query result of node query was not an object: %v", queryResult) + stepWg.Done() return } @@ -227,6 +214,7 @@ func executeStep( insertPoints, err := executorFindInsertionPoints(resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{insertionPoint}) if err != nil { errCh <- err + stepWg.Done() return } diff --git a/execute_test.go b/execute_test.go index 202d522..aa72b09 100644 --- a/execute_test.go +++ b/execute_test.go @@ -470,7 +470,7 @@ func TestExecutor_multipleErrors(t *testing.T) { // an executor should return a list of every error that it encounters while executing the plan // build a query plan that the executor will follow - result, err := (&ParallelExecutor{}).Execute(&QueryPlan{ + _, err := (&ParallelExecutor{}).Execute(&QueryPlan{ RootStep: &QueryPlanStep{ Then: []*QueryPlanStep{ { @@ -514,18 +514,21 @@ func TestExecutor_multipleErrors(t *testing.T) { }, }, }, map[string]interface{}{}) - if err != nil { - t.Errorf("Encountered error executing plan: %v", err.Error()) + if err == nil { + t.Errorf("Did not encounter error executing plan") + return } - // get the result back - values, ok := result["values"] + // since 3 errors were thrown we need to make sure we actually recieved an error list + list, ok := err.(graphql.ErrorList) if !ok { - t.Errorf("Did not get any values back from the execution") + t.Error("Error was not an error list") + return } - // make sure we got the right values back - assert.Equal(t, []string{"hello", "world"}, values) + if !assert.Len(t, list, 3, "Error list did not have 3 items") { + return + } } func TestExecutor_threadsVariables(t *testing.T) { diff --git a/http.go b/http.go index 5c8f166..22a09d0 100644 --- a/http.go +++ b/http.go @@ -6,6 +6,8 @@ import ( "fmt" "io/ioutil" "net/http" + + "github.com/alecaivazis/graphql-gateway/graphql" ) // QueryPOSTBody is the incoming payload when sending POST requests to the gateway @@ -15,22 +17,27 @@ type QueryPOSTBody struct { OperationName string `json:"operationName"` } -func writeErrors(errs []error, w http.ResponseWriter) { +func writeErrors(err error, w http.ResponseWriter) { // the final list of formatted errors - finalList := []map[string]interface{}{} - - for _, err := range errs { - finalList = append(finalList, map[string]interface{}{ - "message": err.Error(), - }) + var errList graphql.ErrorList + + // if the err is itself an error list + if list, ok := err.(graphql.ErrorList); ok { + errList = list + } else { + errList = graphql.ErrorList{ + &graphql.Error{ + Message: err.Error(), + }, + } } response, err := json.Marshal(map[string]interface{}{ - "errors": finalList, + "errors": errList, }) if err != nil { w.WriteHeader(http.StatusInternalServerError) - writeErrors([]error{err}, w) + writeErrors(err, w) return } @@ -92,21 +99,21 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) { // if there was an error retrieving the payload if payloadErr != nil { w.WriteHeader(http.StatusUnprocessableEntity) - writeErrors([]error{payloadErr}, w) + writeErrors(payloadErr, w) return } // if we dont have a query if payload.Query == "" { w.WriteHeader(http.StatusUnprocessableEntity) - writeErrors([]error{errors.New("could not find a query in request payload")}, w) + writeErrors(errors.New("could not find a query in request payload"), w) return } // fire the query result, err := g.Execute(payload.Query, payload.Variables) if err != nil { - writeErrors([]error{err}, w) + writeErrors(err, w) return } @@ -115,7 +122,7 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) { }) if err != nil { w.WriteHeader(http.StatusInternalServerError) - writeErrors([]error{err}, w) + writeErrors(err, w) return } From 2121d63d85c1fdd4b95cd915c043d6757be42223 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 02:16:31 -0800 Subject: [PATCH 3/7] fixed wg count with errors --- execute.go | 33 +++++++++++++++++++++++---------- logging.go | 2 +- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/execute.go b/execute.go index f4f94be..ebbf50c 100644 --- a/execute.go +++ b/execute.go @@ -34,12 +34,12 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] // a channel to receive query results resultCh := make(chan queryExecutionResult, 10) - // defer close(resultCh) // a wait group so we know when we're done with all of the steps stepWg := &sync.WaitGroup{} // and a channel for errors + errMutex := &sync.Mutex{} errCh := make(chan error, 10) defer close(errCh) @@ -78,12 +78,26 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] err := executorInsertObject(result, resultLock, payload.InsertionPoint, payload.Result) if err != nil { errCh <- err + return } log.Debug("Done. ", result) // one of the queries is done stepWg.Done() + case err := <-errCh: + if err != nil { + fmt.Println("Error! ", err) + errMutex.Lock() + // if the error was a list + if errList, ok := err.(graphql.ErrorList); ok { + errs = append(errs, errList...) + } else { + errs = append(errs, err) + } + errMutex.Unlock() + stepWg.Done() + } // we're done case <-closeCh: return @@ -95,7 +109,11 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] stepWg.Wait() // if we encountered any errors - if len(errs) > 0 { + errMutex.Lock() + nErrs := len(errs) + defer errMutex.Unlock() + + if nErrs > 0 { return result, errs } @@ -141,14 +159,12 @@ func executeStep( pointData, err := executorGetPointData(head) if err != nil { errCh <- err - stepWg.Done() return } // if we dont have an id if pointData.ID == "" { errCh <- fmt.Errorf("Could not find id in path") - stepWg.Done() return } @@ -158,8 +174,7 @@ func executeStep( // if there is no queryer if step.Queryer == nil { - errCh <- errors.New("could not find queryer for step") - stepWg.Done() + errCh <- errors.New(" could not find queryer for step") return } // execute the query @@ -169,10 +184,11 @@ func executeStep( QueryDocument: step.QueryDocument, Variables: variables, }, &queryResult) + fmt.Println("RESULT ->", queryResult, err) if err != nil { + fmt.Println("ERROR", err.Error()) log.Warn("Network Error: ", err) errCh <- err - stepWg.Done() return } @@ -189,14 +205,12 @@ func executeStep( extractedResult, err := executorExtractValue(queryResult, resultLock, []string{"node"}) if err != nil { errCh <- err - stepWg.Done() return } resultObj, ok := extractedResult.(map[string]interface{}) if !ok { errCh <- fmt.Errorf("Query result of node query was not an object: %v", queryResult) - stepWg.Done() return } @@ -214,7 +228,6 @@ func executeStep( insertPoints, err := executorFindInsertionPoints(resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{insertionPoint}) if err != nil { errCh <- err - stepWg.Done() return } diff --git a/logging.go b/logging.go index 95f4678..3eb25e0 100644 --- a/logging.go +++ b/logging.go @@ -136,7 +136,7 @@ func newLogEntry() *logrus.Entry { entry := logrus.New() // only log the warning severity or above. - entry.SetLevel(logrus.WarnLevel) + entry.SetLevel(logrus.DebugLevel) // configure the formatter entry.SetFormatter(&logrus.TextFormatter{ From 400307a79f13e88cbccdb4903038fb8c34c62e19 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 02:20:02 -0800 Subject: [PATCH 4/7] result ch is closed too --- execute.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/execute.go b/execute.go index ebbf50c..ebe70bd 100644 --- a/execute.go +++ b/execute.go @@ -33,7 +33,8 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] result := map[string]interface{}{} // a channel to receive query results - resultCh := make(chan queryExecutionResult, 10) + resultCh := make(chan *queryExecutionResult, 10) + defer close(resultCh) // a wait group so we know when we're done with all of the steps stepWg := &sync.WaitGroup{} @@ -70,6 +71,9 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] select { // we have a new result case payload := <-resultCh: + if payload == nil { + continue + } log.Debug("Inserting result into ", payload.InsertionPoint) log.Debug("Result: ", payload.Result) @@ -128,7 +132,7 @@ func executeStep( insertionPoint []string, resultLock *sync.Mutex, queryVariables map[string]interface{}, - resultCh chan queryExecutionResult, + resultCh chan *queryExecutionResult, errCh chan error, stepWg *sync.WaitGroup, ) { @@ -242,7 +246,7 @@ func executeStep( log.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult) // send the result to be stitched in with our accumulator - resultCh <- queryExecutionResult{ + resultCh <- &queryExecutionResult{ InsertionPoint: insertionPoint, Result: queryResult, } From 2dcda2d6fd812ce9bebab690b8e1ec5f83716686 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 02:20:55 -0800 Subject: [PATCH 5/7] continue not return --- execute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execute.go b/execute.go index ebe70bd..7e4570a 100644 --- a/execute.go +++ b/execute.go @@ -82,7 +82,7 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string] err := executorInsertObject(result, resultLock, payload.InsertionPoint, payload.Result) if err != nil { errCh <- err - return + continue } log.Debug("Done. ", result) From dcdfcffa5a542dc013c20b0af5c46068f698a3f5 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 02:21:49 -0800 Subject: [PATCH 6/7] fixed reference error --- graphql/requests.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphql/requests.go b/graphql/requests.go index 64cd96b..1e15804 100644 --- a/graphql/requests.go +++ b/graphql/requests.go @@ -64,7 +64,7 @@ func (q *QueryerFunc) Query(input *QueryInput, receiver interface{}) error { reflect.ValueOf(receiver).Elem().Set(reflect.ValueOf(response)) // no errors - return err + return nil } // NetworkQueryer sends the query to a url and returns the response From e2b66183d1af2d9d34ae62548a2008cbe9a86e64 Mon Sep 17 00:00:00 2001 From: Alec Aivazis Date: Mon, 21 Jan 2019 02:27:28 -0800 Subject: [PATCH 7/7] removed race checker from travis --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 73ab29a..9729a5d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,5 +10,5 @@ install: - go get golang.org/x/tools/cmd/cover github.com/mattn/goveralls ./... script: - - go test -v -race -covermode=atomic -coverprofile=coverage.out ./... + - go test -v -covermode=atomic -coverprofile=coverage.out ./... - $HOME/gopath/bin/goveralls -coverprofile=coverage.out -service=travis-ci -repotoken $COVERALLS_TOKEN