Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include data alongside errors for a service's partial success #213

Merged
merged 6 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/gateway/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ go 1.16

require (
github.com/nautilus/gateway v0.3.17
github.com/nautilus/graphql v0.0.25
github.com/nautilus/graphql v0.0.26
github.com/spf13/cobra v0.0.5
)
2 changes: 2 additions & 0 deletions cmd/gateway/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/nautilus/graphql v0.0.23 h1:uiQV2SxOfl63xNr4E5Qk9IxecD2llM/gVqKbM2lEH
github.com/nautilus/graphql v0.0.23/go.mod h1:eoqPxo/+IdRSfLQKZ2V0al2vYeoMJKO72XpTsUYOmFI=
github.com/nautilus/graphql v0.0.25 h1:N+wwpxnlob3nQK/+HTUPeGfSR0ayehhNqeJq1cdcqSU=
github.com/nautilus/graphql v0.0.25/go.mod h1:ex8FB+RiAJoWGlL9iUcCWvetFOICVpvYP2jWi0cxp9E=
github.com/nautilus/graphql v0.0.26 h1:Mfv0Eazj+xKWlSSIcXgJN1zFWhbSQkPsz4IWSk0G5EQ=
github.com/nautilus/graphql v0.0.26/go.mod h1:ex8FB+RiAJoWGlL9iUcCWvetFOICVpvYP2jWi0cxp9E=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
Expand Down
115 changes: 57 additions & 58 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ParallelExecutor struct{}
type queryExecutionResult struct {
InsertionPoint []string
Result map[string]interface{}
StripNode bool
Err error
}

// execution is broken up into two phases:
Expand Down Expand Up @@ -83,7 +83,7 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
// the root step could have multiple steps that have to happen
for _, step := range ctx.Plan.RootStep.Then {
stepWg.Add(1)
go executeStep(ctx, ctx.Plan, step, []string{}, resultLock, ctx.Variables, resultCh, errCh, stepWg)
go executeStep(ctx, ctx.Plan, step, []string{}, resultLock, ctx.Variables, resultCh, stepWg)
}

// the list of errors we have encountered while executing the plan
Expand All @@ -95,24 +95,23 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
select {
// we have a new result
case payload := <-resultCh:
if payload == nil {
continue
}
ctx.logger.Debug("Inserting result into ", payload.InsertionPoint)
ctx.logger.Debug("Result: ", payload.Result)

// we have to grab the value in the result and write it to the appropriate spot in the
// acumulator.
err := executorInsertObject(ctx, result, resultLock, payload.InsertionPoint, payload.Result)
if err != nil {
errCh <- err
continue
insertErr := executorInsertObject(ctx, result, resultLock, payload.InsertionPoint, payload.Result)

switch {
case payload.Err != nil: // response errors are the highest priority to return
errCh <- payload.Err
case insertErr != nil:
errCh <- insertErr
default:
ctx.logger.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()
}

ctx.logger.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()

case err := <-errCh:
if err != nil {
errMutex.Lock()
Expand Down Expand Up @@ -158,9 +157,41 @@ func executeStep(
resultLock *sync.Mutex,
queryVariables map[string]interface{},
resultCh chan *queryExecutionResult,
errCh chan error,
stepWg *sync.WaitGroup,
) {
queryResult, dependentSteps, queryErr := executeOneStep(ctx, plan, step, insertionPoint, resultLock, queryVariables)
// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
ctx.logger.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
Err: queryErr,
}
// We need to collect all the dependent steps and execute them after emitting the parent result in this function.
// This avoids a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration.
// Execute dependent steps after the main step has been published.
for _, sr := range dependentSteps {
ctx.logger.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, stepWg)
}
}

type dependentStepArgs struct {
step *QueryPlanStep
insertionPoint []string
}

func executeOneStep(
ctx *ExecutionContext,
plan *QueryPlan,
step *QueryPlanStep,
insertionPoint []string,
resultLock *sync.Mutex,
queryVariables map[string]interface{},
) (map[string]interface{}, []dependentStepArgs, error) {
ctx.logger.Debug("Executing step to be inserted in ", step.ParentType, ". Insertion point: ", insertionPoint)

ctx.logger.Debug(step.SelectionSet)
Expand All @@ -186,14 +217,12 @@ func executeStep(
// get the data of the point
pointData, err := executorGetPointData(head)
if err != nil {
errCh <- err
return
return nil, nil, err
}

// if we dont have an id
if pointData.ID == "" {
errCh <- fmt.Errorf("Could not find id in path")
return
return nil, nil, fmt.Errorf("Could not find id in path")
}

// save the id as a variable to the query
Expand All @@ -202,8 +231,7 @@ func executeStep(

// if there is no queryer
if step.Queryer == nil {
errCh <- errors.New(" could not find queryer for step")
return
return nil, nil, errors.New(" could not find queryer for step")
}

// the query we will use
Expand Down Expand Up @@ -233,8 +261,7 @@ func executeStep(
}, &queryResult)
if err != nil {
ctx.logger.Warn("Network Error: ", err)
errCh <- err
return
return queryResult, nil, err
}

// NOTE: this insertion point could point to a list of values. If it did, we have to have
Expand All @@ -249,36 +276,19 @@ func executeStep(
// get the result from the response that we have to stitch there
extractedResult, err := executorExtractValue(ctx, queryResult, resultLock, []string{"node"})
if err != nil {
errCh <- err
return
return nil, nil, err
}

resultObj, ok := extractedResult.(map[string]interface{})
if !ok {
errCh <- fmt.Errorf("Query result of node query was not an object: %v", queryResult)
return
return nil, nil, fmt.Errorf("Query result of node query was not an object: %v", queryResult)
}

queryResult = resultObj
}

// we need to collect all the dependent steps and execute them at last in this function
// to avoid a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration
type stepArgs struct {
step *QueryPlanStep
insertionPoint []string
}
var dependentSteps []stepArgs
// defer the execution of the dependent steps after the main step has been published
defer func() {
for _, sr := range dependentSteps {
ctx.logger.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg)
}
}()

// if there are next steps
var dependentSteps []dependentStepArgs
if len(step.Then) > 0 {
ctx.logger.Debug("Kicking off child queries")
// we need to find the ids of the objects we are inserting into and then kick of the worker with the right
Expand All @@ -288,30 +298,19 @@ func executeStep(
copy(copiedInsertionPoint, insertionPoint)
insertPoints, err := executorFindInsertionPoints(ctx, resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{copiedInsertionPoint}, step.FragmentDefinitions)
if err != nil {
// reset dependent steps - result would be discarded anyways
dependentSteps = nil
errCh <- err
return
return nil, nil, err
}

// this dependent needs to fire for every object that the insertion point references
for _, insertionPoint := range insertPoints {
dependentSteps = append(dependentSteps, stepArgs{
dependentSteps = append(dependentSteps, dependentStepArgs{
step: dependent,
insertionPoint: insertionPoint,
})
}
}
}

// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
ctx.logger.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
}
return queryResult, dependentSteps, nil
}

func max(a, b int) int {
Expand Down Expand Up @@ -386,7 +385,7 @@ func executorFindInsertionPoints(ctx *ExecutionContext, resultLock *sync.Mutex,
// make sure we are looking at the top of the selection set next time
selectionSetRoot = foundSelection.SelectionSet

var value = resultChunk
value := resultChunk

// the bit of result chunk with the appropriate key should be a list
rootValue, ok := value[point]
Expand Down
48 changes: 48 additions & 0 deletions gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,3 +753,51 @@ type User {
}
`, resp.Body.String())
}

func TestDataAndErrorsBothReturnFromOneServicePartialSuccess(t *testing.T) {
t.Parallel()
schema, err := graphql.LoadSchema(`
type Query {
foo: String
bar: String
}
`)
require.NoError(t, err)
queryerFactory := QueryerFactory(func(ctx *PlanningContext, url string) graphql.Queryer {
return graphql.QueryerFunc(func(input *graphql.QueryInput) (interface{}, error) {
return map[string]interface{}{
"foo": "foo",
"bar": nil,
}, graphql.ErrorList{
&graphql.Error{
Message: "bar is broken",
Path: []interface{}{"bar"},
},
}
})
})
gateway, err := New([]*graphql.RemoteSchema{
{Schema: schema, URL: "boo"},
}, WithQueryerFactory(&queryerFactory))
require.NoError(t, err)

req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(`{"query": "query { foo bar }"}`))
resp := httptest.NewRecorder()
gateway.GraphQLHandler(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)
assert.JSONEq(t, `
{
"data": {
"foo": "foo",
"bar": null
},
"errors": [
{
"message": "bar is broken",
"path": ["bar"],
"extensions": null
}
]
}
`, resp.Body.String())
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/99designs/gqlgen v0.17.15
github.com/mitchellh/mapstructure v1.4.1
github.com/nautilus/graphql v0.0.25
github.com/nautilus/graphql v0.0.26
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/vektah/gqlparser/v2 v2.5.16
Expand Down
Loading
Loading