Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple errors #40

Merged
merged 7 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ install:
- go get golang.org/x/tools/cmd/cover github.com/mattn/goveralls ./...

script:
- go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
- go test -v -covermode=atomic -coverprofile=coverage.out ./...
- $HOME/gopath/bin/goveralls -coverprofile=coverage.out -service=travis-ci -repotoken $COVERALLS_TOKEN
87 changes: 51 additions & 36 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string]
result := map[string]interface{}{}

// a channel to receive query results
resultCh := make(chan queryExecutionResult, 10)
// defer close(resultCh)
resultCh := make(chan *queryExecutionResult, 10)
defer close(resultCh)

// a wait group so we know when we're done with all of the steps
stepWg := &sync.WaitGroup{}

// and a channel for errors
errMutex := &sync.Mutex{}
errCh := make(chan error, 10)
// defer close(errCh)
defer close(errCh)

// a channel to close the goroutine
closeCh := make(chan bool)
// defer close(closeCh)
defer close(closeCh)

// a lock for reading and writing to the result
resultLock := &sync.Mutex{}
Expand All @@ -61,13 +62,18 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string]
go executeStep(plan, step, []string{}, resultLock, variables, resultCh, errCh, stepWg)
}

// the list of errors we have encountered while executing the plan
errs := graphql.ErrorList{}

// start a goroutine to add results to the list
go func() {
ConsumptionLoop:
for {
select {
// we have a new result
case payload := <-resultCh:
if payload == nil {
continue
}
log.Debug("Inserting result into ", payload.InsertionPoint)
log.Debug("Result: ", payload.Result)

Expand All @@ -76,50 +82,47 @@ func (executor *ParallelExecutor) Execute(plan *QueryPlan, variables map[string]
err := executorInsertObject(result, resultLock, payload.InsertionPoint, payload.Result)
if err != nil {
errCh <- err
continue ConsumptionLoop
continue
}

log.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()

case err := <-errCh:
if err != nil {
fmt.Println("Error! ", err)
errMutex.Lock()
// if the error was a list
if errList, ok := err.(graphql.ErrorList); ok {
errs = append(errs, errList...)
} else {
errs = append(errs, err)
}
errMutex.Unlock()
stepWg.Done()
}
// we're done
case <-closeCh:
return
}
}
}()

// there are 2 possible options:
// - either the wait group finishes
// - we get a messsage over the error chan
// when the wait group is finished
stepWg.Wait()

// in order to wait for either, let's spawn a go routine
// that waits until all of the steps are built and notifies us when its done
doneCh := make(chan bool)
// defer close(doneCh)
// if we encountered any errors
errMutex.Lock()
nErrs := len(errs)
defer errMutex.Unlock()

go func() {
// when the wait group is finished
stepWg.Wait()
// push a value over the channel
doneCh <- true
}()

// wait for either the error channel or done channel
select {
// there was an error
case err := <-errCh:
log.Warn(fmt.Sprintf("Ran into execution error: %s", err.Error()))
closeCh <- true
// bubble the error up
return nil, err
// we are done
case <-doneCh:
closeCh <- true
// we're done here
return result, nil
if nErrs > 0 {
return result, errs
}

// we didn't encounter any errors
return result, nil
}

// TODO: ugh... so... many... variables...
Expand All @@ -129,7 +132,7 @@ func executeStep(
insertionPoint []string,
resultLock *sync.Mutex,
queryVariables map[string]interface{},
resultCh chan queryExecutionResult,
resultCh chan *queryExecutionResult,
errCh chan error,
stepWg *sync.WaitGroup,
) {
Expand Down Expand Up @@ -175,7 +178,7 @@ func executeStep(

// if there is no queryer
if step.Queryer == nil {
errCh <- errors.New("could not find queryer for step")
errCh <- errors.New(" could not find queryer for step")
return
}
// execute the query
Expand All @@ -185,7 +188,9 @@ func executeStep(
QueryDocument: step.QueryDocument,
Variables: variables,
}, &queryResult)
fmt.Println("RESULT ->", queryResult, err)
if err != nil {
fmt.Println("ERROR", err.Error())
log.Warn("Network Error: ", err)
errCh <- err
return
Expand Down Expand Up @@ -241,7 +246,7 @@ func executeStep(

log.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- queryExecutionResult{
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
}
Expand Down Expand Up @@ -614,3 +619,13 @@ type ExecutorFn struct {
func (e *ExecutorFn) Execute(plan *QueryPlan, variables map[string]interface{}) (map[string]interface{}, error) {
return e.Fn(plan, variables)
}

// ErrExecutor always returnes the internal error.
type ErrExecutor struct {
Error error
}

// Execute returns the internet error
func (e *ErrExecutor) Execute(plan *QueryPlan, variables map[string]interface{}) (map[string]interface{}, error) {
return nil, e.Error
}
67 changes: 66 additions & 1 deletion execute_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gateway

import (
"errors"
"sync"
"testing"

Expand Down Expand Up @@ -465,6 +466,71 @@ func TestExecutor_insertIntoLists(t *testing.T) {
}, result)
}

func TestExecutor_multipleErrors(t *testing.T) {
// an executor should return a list of every error that it encounters while executing the plan

// build a query plan that the executor will follow
_, err := (&ParallelExecutor{}).Execute(&QueryPlan{
RootStep: &QueryPlanStep{
Then: []*QueryPlanStep{
{
// this is equivalent to
// query { values }
ParentType: "Query",
SelectionSet: ast.SelectionSet{
&ast.Field{
Name: "values",
Definition: &ast.FieldDefinition{
Type: ast.ListType(ast.NamedType("String", &ast.Position{}), &ast.Position{}),
},
},
},
// return a known value we can test against
Queryer: &graphql.QueryerFunc{
func(input *graphql.QueryInput) (interface{}, error) {
return map[string]interface{}{"data": map[string]interface{}{}}, errors.New("message")
},
},
},
{
// this is equivalent to
// query { values }
ParentType: "Query",
SelectionSet: ast.SelectionSet{
&ast.Field{
Name: "values",
Definition: &ast.FieldDefinition{
Type: ast.ListType(ast.NamedType("String", &ast.Position{}), &ast.Position{}),
},
},
},
// return a known value we can test against
Queryer: &graphql.QueryerFunc{
func(input *graphql.QueryInput) (interface{}, error) {
return map[string]interface{}{"data": map[string]interface{}{}}, graphql.ErrorList{errors.New("message"), errors.New("message")}
},
},
},
},
},
}, map[string]interface{}{})
if err == nil {
t.Errorf("Did not encounter error executing plan")
return
}

// since 3 errors were thrown we need to make sure we actually recieved an error list
list, ok := err.(graphql.ErrorList)
if !ok {
t.Error("Error was not an error list")
return
}

if !assert.Len(t, list, 3, "Error list did not have 3 items") {
return
}
}

func TestExecutor_threadsVariables(t *testing.T) {
// the variables we'll be threading through
fullVariables := map[string]interface{}{
Expand Down Expand Up @@ -981,7 +1047,6 @@ func TestFindInsertionPoint_stitchIntoObject(t *testing.T) {
}

assert.Equal(t, finalInsertionPoint, generatedPoint)

}

func TestFindInsertionPoint_handlesNullObjects(t *testing.T) {
Expand Down
48 changes: 44 additions & 4 deletions graphql/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"reflect"
Expand Down Expand Up @@ -122,14 +121,16 @@ func (q *NetworkQueryer) Query(input *QueryInput, receiver interface{}) error {

// if there is an error
if _, ok := result["errors"]; ok {
// a list of errors from the response
errList := ErrorList{}

// build up a list of errors
errs, ok := result["errors"].([]interface{})
if !ok {
return errors.New("errors was not a list")
}

// a list of error messages
messages := []string{}
for _, err := range errs {
obj, ok := err.(map[string]interface{})
if !ok {
Expand All @@ -141,10 +142,10 @@ func (q *NetworkQueryer) Query(input *QueryInput, receiver interface{}) error {
return errors.New("error message was not a string")
}

messages = append(messages, message)
errList = append(errList, NewError("", message))
}

return fmt.Errorf(strings.Join(messages, " "))
return errList
}

// assign the result under the data key to the receiver
Expand Down Expand Up @@ -172,3 +173,42 @@ func NewNetworkQueryer(url string) *NetworkQueryer {
Client: &http.Client{},
}
}

// ErrorExtensions define fields that extend the standard graphql error shape
type ErrorExtensions struct {
Code string `json:"code"`
}

// Error represents a graphql error
type Error struct {
Extensions ErrorExtensions `json:"extensions"`
Message string `json:"message"`
}

func (e *Error) Error() string {
return e.Message
}

// NewError returns a graphql error with the given code and message
func NewError(code string, message string) *Error {
return &Error{
Message: message,
Extensions: ErrorExtensions{
Code: code,
},
}
}

// ErrorList represents a list of errors
type ErrorList []error

// Error returns a string representation of each error
func (list ErrorList) Error() string {
acc := []string{}

for _, error := range list {
acc = append(acc, error.Error())
}

return strings.Join(acc, ". ")
}
Loading