Skip to content

Commit

Permalink
Refactor older terminal errors to use controller-runtime helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Jordan Olshevski committed Dec 29, 2023
1 parent d60dea8 commit 031c649
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 47 deletions.
46 changes: 21 additions & 25 deletions internal/controllers/synthesis/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// maxSliceJsonBytes is the max sum of a resource slice's manifests. It's set to 1mb, which leaves 512mb of space for the resource's status, encoding overhead, etc.
Expand Down Expand Up @@ -81,11 +82,7 @@ func (c *execController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, nil // old pod - don't bother synthesizing. The lifecycle controller will delete it
}

refs, safeToRetry, err := c.synthesize(ctx, syn, comp, pod)
if !safeToRetry {
logger.V(1).Info("dropped item without successfully reconciling")
return ctrl.Result{}, nil
}
refs, err := c.synthesize(ctx, syn, comp, pod)
if err != nil {
return ctrl.Result{}, fmt.Errorf("executing synthesizer: %w", err)
}
Expand All @@ -98,15 +95,12 @@ func (c *execController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, nil
}

func (c *execController) synthesize(ctx context.Context, syn *apiv1.Synthesizer, comp *apiv1.Composition, pod *corev1.Pod) ([]*apiv1.ResourceSliceRef, bool /* safe to retry */, error) {
func (c *execController) synthesize(ctx context.Context, syn *apiv1.Synthesizer, comp *apiv1.Composition, pod *corev1.Pod) ([]*apiv1.ResourceSliceRef, error) {
logger := logr.FromContextOrDiscard(ctx)

inputsJson, safeToRetry, err := c.buildInputsJson(ctx, comp)
if !safeToRetry {
return nil, false, nil
}
inputsJson, err := c.buildInputsJson(ctx, comp)
if err != nil {
return nil, true, fmt.Errorf("building inputs: %w", err)
return nil, fmt.Errorf("building inputs: %w", err)
}

synctx, done := context.WithTimeout(ctx, c.timeout)
Expand All @@ -115,14 +109,14 @@ func (c *execController) synthesize(ctx context.Context, syn *apiv1.Synthesizer,
start := time.Now()
stdout, err := c.conn.Synthesize(synctx, syn, pod, inputsJson)
if err != nil {
return nil, true, err
return nil, err
}
logger.V(1).Info("synthesizing is done", "latency", time.Since(start).Milliseconds())

return c.writeOutputToSlices(ctx, comp, stdout)
}

func (c *execController) buildInputsJson(ctx context.Context, comp *apiv1.Composition) ([]byte, bool /* safe to retry */, error) {
func (c *execController) buildInputsJson(ctx context.Context, comp *apiv1.Composition) ([]byte, error) {
logger := logr.FromContextOrDiscard(ctx)

var inputs []*unstructured.Unstructured
Expand All @@ -140,33 +134,35 @@ func (c *execController) buildInputsJson(ctx context.Context, comp *apiv1.Compos
start := time.Now()
err := c.client.Get(ctx, client.ObjectKeyFromObject(input), input)
if err != nil {
err = fmt.Errorf("getting resource %s/%s: %w", input.GetKind(), input.GetName(), err)
if errors.IsNotFound(err) {
return nil, false, err
}
return nil, true, err
// Ideally we could stop retrying eventually here in cases where the resource doesn't exist,
// but it isn't safe to _never_ retry (informers across types aren't ordered), and controller-runtime
// doesn't expose the retry count.
return nil, fmt.Errorf("getting resource %s/%s: %w", input.GetKind(), input.GetName(), err)
}

logger.V(1).Info("retrieved input resource", "resourceName", input.GetName(), "resourceNamespace", input.GetNamespace(), "resourceKind", input.GetKind(), "latency", time.Since(start).Milliseconds())
inputs = append(inputs, input)
}

js, err := json.Marshal(&inputs)
return js, true, err
if err != nil {
return nil, reconcile.TerminalError(err)
}
return js, nil
}

func (c *execController) writeOutputToSlices(ctx context.Context, comp *apiv1.Composition, stdout io.Reader) ([]*apiv1.ResourceSliceRef, bool /* safe to retry */, error) {
func (c *execController) writeOutputToSlices(ctx context.Context, comp *apiv1.Composition, stdout io.Reader) ([]*apiv1.ResourceSliceRef, error) {
logger := logr.FromContextOrDiscard(ctx)

outputs := []*unstructured.Unstructured{}
err := json.NewDecoder(stdout).Decode(&outputs)
if err != nil {
return nil, false, fmt.Errorf("parsing outputs: %w", err)
return nil, reconcile.TerminalError(fmt.Errorf("parsing outputs: %w", err))
}

slices, err := buildResourceSlices(comp, outputs, maxSliceJsonBytes)
if err != nil {
return nil, false, err
return nil, err
}

sliceRefs := make([]*apiv1.ResourceSliceRef, len(slices))
Expand All @@ -175,14 +171,14 @@ func (c *execController) writeOutputToSlices(ctx context.Context, comp *apiv1.Co

err = c.writeResourceSlice(ctx, slice)
if err != nil {
return nil, true, fmt.Errorf("creating resource slice %d: %w", i, err)
return nil, fmt.Errorf("creating resource slice %d: %w", i, err)
}

logger.V(1).Info("wrote resource slice", "resourceSliceName", slice.Name, "latency", time.Since(start).Milliseconds())
sliceRefs[i] = &apiv1.ResourceSliceRef{Name: slice.Name}
}

return sliceRefs, true, nil
return sliceRefs, nil
}

func buildResourceSlices(comp *apiv1.Composition, outputs []*unstructured.Unstructured, maxJsonBytes int) ([]*apiv1.ResourceSlice, error) {
Expand All @@ -195,7 +191,7 @@ func buildResourceSlices(comp *apiv1.Composition, outputs []*unstructured.Unstru
for i, output := range outputs {
js, err := output.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("encoding output %d: %w", i, err)
return nil, reconcile.TerminalError(fmt.Errorf("encoding output %d: %w", i, err))
}
if slice == nil || sliceBytes >= maxJsonBytes {
sliceBytes = 0
Expand Down
23 changes: 1 addition & 22 deletions internal/controllers/synthesis/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
Expand Down Expand Up @@ -43,27 +42,7 @@ func TestBuildInputsJson(t *testing.T) {
}},
}

js, safeToRetry, err := e.buildInputsJson(testutil.NewContext(t), comp)
js, err := e.buildInputsJson(testutil.NewContext(t), comp)
require.NoError(t, err)
assert.True(t, safeToRetry)
assert.Equal(t, "[{\"apiVersion\":\"v1\",\"kind\":\"ConfigMap\",\"metadata\":{\"creationTimestamp\":null,\"name\":\"test-cm\",\"namespace\":\"test-namespace\",\"resourceVersion\":\"999\"}}]", string(js))
}

func TestBuildInputsJsonNotFound(t *testing.T) {
client := fake.NewFakeClient()
e := &execController{client: client}

comp := &apiv1.Composition{}
comp.Spec.Inputs = []apiv1.InputRef{
{Resource: &apiv1.ResourceInputRef{
APIVersion: "v1",
Kind: "ConfigMap",
Name: "does-not-exist",
}},
}

js, safeToRetry, err := e.buildInputsJson(testutil.NewContext(t), comp)
require.True(t, errors.IsNotFound(err))
assert.False(t, safeToRetry)
assert.Empty(t, js)
}

0 comments on commit 031c649

Please sign in to comment.