From 66c6e174a36ef03bfd2580c95bd9bc8d9fae2c11 Mon Sep 17 00:00:00 2001 From: Jordan Olshevski Date: Mon, 11 Dec 2023 21:36:59 +0000 Subject: [PATCH] Initial commit of semi-working reconiliation controller --- .github/workflows/k8scompat.yaml | 54 ++++ go.mod | 2 +- go.sum | 10 - .../controllers/reconciliation/controller.go | 191 +++++++++++ .../crd/enotest.azure.io_testresources.yaml | 50 +++ .../enotest.azure.io_testresources-old.yaml | 45 +++ .../reconciliation/fixtures/v1/types.go | 49 +++ .../fixtures/v1/zz_generated.deepcopy.go | 125 ++++++++ .../reconciliation/integration_test.go | 297 ++++++++++++++++++ internal/controllers/synthesis/rollout.go | 2 +- internal/testutil/testutil.go | 210 ++++++++++--- 11 files changed, 978 insertions(+), 57 deletions(-) create mode 100644 .github/workflows/k8scompat.yaml create mode 100644 internal/controllers/reconciliation/controller.go create mode 100644 internal/controllers/reconciliation/fixtures/v1/config/crd/enotest.azure.io_testresources.yaml create mode 100644 internal/controllers/reconciliation/fixtures/v1/config/enotest.azure.io_testresources-old.yaml create mode 100644 internal/controllers/reconciliation/fixtures/v1/types.go create mode 100644 internal/controllers/reconciliation/fixtures/v1/zz_generated.deepcopy.go create mode 100644 internal/controllers/reconciliation/integration_test.go diff --git a/.github/workflows/k8scompat.yaml b/.github/workflows/k8scompat.yaml new file mode 100644 index 00000000..76718e4a --- /dev/null +++ b/.github/workflows/k8scompat.yaml @@ -0,0 +1,54 @@ +name: Kubernetes Version Compatibility Tests +on: + push: + workflow_dispatch: + schedule: + - cron: 0 0 * * * + +env: + setupEnvtestCmd: "go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest" + +jobs: + buildMatrix: + name: Generate Matrix + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.build.outputs.matrix }} + steps: + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Generate test matrix using setup-envtest + id: build + run: | + echo "matrix=$($setupEnvtestCmd -p env list | awk '/)/ {print $2}' | awk -F'.' '{print $2}' | jq -c --slurp 'map(tostring) | unique')" >> $GITHUB_OUTPUT + + test: + name: Kubernetes 1.${{ matrix.downstreamApiserverMinorVersion }} + needs: buildMatrix + runs-on: ubuntu-latest + env: + upstreamApiserverVersion: 1.28.x + strategy: + fail-fast: false + matrix: + downstreamApiserverMinorVersion: ${{ fromJson(needs.buildMatrix.outputs.matrix) }} + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Download kubebuilder assets + run: | + echo "UPSTREAM_KUBEBUILDER_ASSETS=$($setupEnvtestCmd use -p path ${{ env.upstreamApiserverVersion }})" >> $GITHUB_ENV + echo "DOWNSTREAM_KUBEBUILDER_ASSETS=$($setupEnvtestCmd use -p path 1.${{ matrix.downstreamApiserverMinorVersion }}.x)" >> $GITHUB_ENV + + - name: Run tests + run: go test -v ./internal/controllers/reconciliation + env: + DOWNSTREAM_VERSION_MINOR: "${{ matrix.downstreamApiserverMinorVersion }}" diff --git a/go.mod b/go.mod index 61f505f4..2409fca3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/Azure/eno -go 1.21 +go 1.20 require ( github.com/go-logr/logr v1.2.4 diff --git a/go.sum b/go.sum index 98f09b20..d769c469 100644 --- a/go.sum +++ b/go.sum @@ -18,7 +18,6 @@ github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/zapr v1.2.4 h1:QHVo+6stLbfJmYGkQ7uGHUCu5hnAFAj6mDe6Ea0SeOo= -github.com/go-logr/zapr v1.2.4/go.mod h1:FyHWQIzQORZ0QVE1BtVHv3cKtNLuXsbNLtpuhNapBOA= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.20.0 h1:ESKJdU9ASRfaPNOPRx12IUyA1vn3R9GiE3KYD14BXdQ= github.com/go-openapi/jsonpointer v0.20.0/go.mod h1:6PGzBjjIIumbLYysB73Klnms1mwnU4G3YHOECG3CedA= @@ -28,7 +27,6 @@ github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogBU= github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -47,7 +45,6 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= @@ -60,7 +57,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -77,9 +73,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= -github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -93,7 +87,6 @@ github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGy github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -109,9 +102,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -164,7 +155,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/controllers/reconciliation/controller.go b/internal/controllers/reconciliation/controller.go new file mode 100644 index 00000000..0f0c945f --- /dev/null +++ b/internal/controllers/reconciliation/controller.go @@ -0,0 +1,191 @@ +package reconciliation + +import ( + "context" + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/reconstitution" + "github.com/go-logr/logr" +) + +// TODO: Minimal retries for validation error + +type Controller struct { + client client.Client + resourceClient reconstitution.Client + + upstreamClient client.Client + discovery *discoveryCache +} + +func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS float32, rediscoverWhenNotFound bool) error { + upstreamClient, err := client.New(downstream, client.Options{ + Scheme: runtime.NewScheme(), // empty scheme since we shouldn't rely on compile-time types + }) + if err != nil { + return err + } + + disc, err := newDicoveryCache(downstream, discoveryRPS, rediscoverWhenNotFound) + if err != nil { + return err + } + + return mgr.Add(&Controller{ + client: mgr.Manager.GetClient(), + resourceClient: mgr.GetClient(), + upstreamClient: upstreamClient, + discovery: disc, + }) +} + +func (c *Controller) Name() string { return "reconciliationController" } + +func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + comp := &apiv1.Composition{} + err := c.client.Get(ctx, req.Composition, comp) + if err != nil { + return ctrl.Result{}, fmt.Errorf("getting composition: %w", err) + } + + if comp.Status.CurrentState == nil { + // we don't log here because it would be too noisy + return ctrl.Result{}, nil + } + + // Find the current and (optionally) previous desired states in the cache + currentGen := comp.Status.CurrentState.ObservedCompositionGeneration + resource, _ := c.resourceClient.Get(ctx, &req.ResourceRef, currentGen) + + var prev *reconstitution.Resource + if comp.Status.PreviousState != nil { + prev, _ = c.resourceClient.Get(ctx, &req.ResourceRef, comp.Status.PreviousState.ObservedCompositionGeneration) + } else { + logger.V(1).Info("no previous state given") + } + + // TODO: This probably isn't a good solution. Maybe include in queue msg? + var apiVersion string + if resource != nil { + apiVersion = resource.Object.GetAPIVersion() + } else if prev != nil { + apiVersion = prev.Object.GetAPIVersion() + } + + // Fetch the current resource + current := &unstructured.Unstructured{} + current.SetName(req.Name) + current.SetNamespace(req.Namespace) + current.SetKind(req.Kind) + current.SetAPIVersion(apiVersion) + err = c.upstreamClient.Get(ctx, client.ObjectKeyFromObject(current), current) + if client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, fmt.Errorf("getting current state: %w", err) + } + + // Do the reconciliation + if err := c.reconcileResource(ctx, prev, resource, current); err != nil { + return ctrl.Result{}, err + } + logger.V(1).Info("sync'd resource") + + c.resourceClient.PatchStatusAsync(ctx, &req.Manifest, func(rs *apiv1.ResourceState) bool { + if rs.Reconciled { + return false // already in sync + } + rs.Reconciled = true + return true + }) + + if resource != nil { + return ctrl.Result{RequeueAfter: resource.ReconcileInterval}, nil + } + return ctrl.Result{}, nil +} + +func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) error { + logger := logr.FromContextOrDiscard(ctx) + + // Delete + if resource == nil && prev != nil { + if current.GetResourceVersion() == "" || current.GetDeletionTimestamp() != nil { + return nil // already deleted + } + + logger.V(0).Info("deleting resource") + err := c.upstreamClient.Delete(ctx, prev.Object) + if err != nil { + return fmt.Errorf("deleting resource: %w", err) + } + return nil + } + + // Always create the resource when it doesn't exist + if current.GetResourceVersion() == "" { + err := c.upstreamClient.Create(ctx, resource.Object) + if err != nil { + return fmt.Errorf("creating resource: %w", err) + } + logger.V(0).Info("created resource") + return nil + } + + // Compute a merge patch + patch, patchType, err := c.buildPatch(ctx, prev, resource, current) + if err != nil { + return fmt.Errorf("building patch: %w", err) + } + if string(patch) == "{}" { + logger.V(1).Info("skipping empty patch") + return nil + } + err = c.upstreamClient.Patch(ctx, current, client.RawPatch(patchType, patch)) + if err != nil { + return fmt.Errorf("applying patch: %w", err) + } + logger.V(0).Info("patched resource", "patchType", string(patchType), "resourceVersion", current.GetResourceVersion()) + + return nil +} + +func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) ([]byte, types.PatchType, error) { + // We need to remove the creation timestamp since the other versions of the resource we're merging against won't have it. + // It's safe to mutate in this case because resource has already been copied by the cache. + current.SetCreationTimestamp(metav1.NewTime(time.Time{})) + + var prevManifest []byte + if prev != nil { + prevManifest = []byte(prev.Manifest) + } + + currentJS, err := current.MarshalJSON() + if err != nil { + return nil, "", fmt.Errorf("building json representation of desired state: %w", err) + } + + model, err := c.discovery.Get(ctx, resource.Object.GroupVersionKind()) + if err != nil { + return nil, "", fmt.Errorf("getting merge metadata: %w", err) + } + if model == nil { + patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(prevManifest, []byte(resource.Manifest), currentJS) + return patch, types.MergePatchType, err + } + + patchmeta := strategicpatch.NewPatchMetaFromOpenAPI(model) + patch, err := strategicpatch.CreateThreeWayMergePatch(prevManifest, []byte(resource.Manifest), currentJS, patchmeta, true) + return patch, types.StrategicMergePatchType, err +} diff --git a/internal/controllers/reconciliation/fixtures/v1/config/crd/enotest.azure.io_testresources.yaml b/internal/controllers/reconciliation/fixtures/v1/config/crd/enotest.azure.io_testresources.yaml new file mode 100644 index 00000000..fb0b883c --- /dev/null +++ b/internal/controllers/reconciliation/fixtures/v1/config/crd/enotest.azure.io_testresources.yaml @@ -0,0 +1,50 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.3 + creationTimestamp: null + name: testresources.enotest.azure.io +spec: + group: enotest.azure.io + names: + kind: TestResource + listKind: TestResourceList + plural: testresources + singular: testresource + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + properties: + values: + items: + properties: + int: + type: integer + type: object + type: array + type: object + status: + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/internal/controllers/reconciliation/fixtures/v1/config/enotest.azure.io_testresources-old.yaml b/internal/controllers/reconciliation/fixtures/v1/config/enotest.azure.io_testresources-old.yaml new file mode 100644 index 00000000..31f494ff --- /dev/null +++ b/internal/controllers/reconciliation/fixtures/v1/config/enotest.azure.io_testresources-old.yaml @@ -0,0 +1,45 @@ +# This is a copy of the generated enotest.azure.io_testresources.yaml to provide backwards compatibility +# with old versions of k8s that don't have apiextensions.k8s.io/v1. It needs to be updated manually when +# regenerating its source. +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: testresources.enotest.azure.io +spec: + group: enotest.azure.io + version: v1 + names: + kind: TestResource + listKind: TestResourceList + plural: testresources + singular: testresource + scope: Namespaced + validation: + openAPIV3Schema: + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + properties: + values: + items: + properties: + int: + type: integer + type: object + type: array + type: object + status: + type: object + type: object diff --git a/internal/controllers/reconciliation/fixtures/v1/types.go b/internal/controllers/reconciliation/fixtures/v1/types.go new file mode 100644 index 00000000..78d695fb --- /dev/null +++ b/internal/controllers/reconciliation/fixtures/v1/types.go @@ -0,0 +1,49 @@ +// +kubebuilder:object:generate=true +// +groupName=enotest.azure.io +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +// When re-generating also update any *-old.yaml files (see their comments for details) +//go:generate controller-gen object crd rbac:roleName=resourceprovider paths=./... + +var ( + SchemeGroupVersion = schema.GroupVersion{Group: "enotest.azure.io", Version: "v1"} + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} +) + +func init() { + SchemeBuilder.Register(&TestResourceList{}, &TestResource{}) +} + +// +kubebuilder:object:root=true +type TestResourceList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []TestResource `json:"items"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +type TestResource struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec TestResourceSpec `json:"spec,omitempty"` + Status TestResourceStatus `json:"status,omitempty"` +} + +type TestResourceSpec struct { + Values []*TestValue `json:"values,omitempty"` +} + +type TestValue struct { + Int int `json:"int,omitempty"` +} + +type TestResourceStatus struct { +} diff --git a/internal/controllers/reconciliation/fixtures/v1/zz_generated.deepcopy.go b/internal/controllers/reconciliation/fixtures/v1/zz_generated.deepcopy.go new file mode 100644 index 00000000..62308d7c --- /dev/null +++ b/internal/controllers/reconciliation/fixtures/v1/zz_generated.deepcopy.go @@ -0,0 +1,125 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by controller-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TestResource) DeepCopyInto(out *TestResource) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TestResource. +func (in *TestResource) DeepCopy() *TestResource { + if in == nil { + return nil + } + out := new(TestResource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TestResource) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TestResourceList) DeepCopyInto(out *TestResourceList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TestResource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TestResourceList. +func (in *TestResourceList) DeepCopy() *TestResourceList { + if in == nil { + return nil + } + out := new(TestResourceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TestResourceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TestResourceSpec) DeepCopyInto(out *TestResourceSpec) { + *out = *in + if in.Values != nil { + in, out := &in.Values, &out.Values + *out = make([]*TestValue, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(TestValue) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TestResourceSpec. +func (in *TestResourceSpec) DeepCopy() *TestResourceSpec { + if in == nil { + return nil + } + out := new(TestResourceSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TestResourceStatus) DeepCopyInto(out *TestResourceStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TestResourceStatus. +func (in *TestResourceStatus) DeepCopy() *TestResourceStatus { + if in == nil { + return nil + } + out := new(TestResourceStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TestValue) DeepCopyInto(out *TestValue) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TestValue. +func (in *TestValue) DeepCopy() *TestValue { + if in == nil { + return nil + } + out := new(TestValue) + in.DeepCopyInto(out) + return out +} diff --git a/internal/controllers/reconciliation/integration_test.go b/internal/controllers/reconciliation/integration_test.go new file mode 100644 index 00000000..f82b0a80 --- /dev/null +++ b/internal/controllers/reconciliation/integration_test.go @@ -0,0 +1,297 @@ +package reconciliation + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + testv1 "github.com/Azure/eno/internal/controllers/reconciliation/fixtures/v1" + "github.com/Azure/eno/internal/controllers/synthesis" + "github.com/Azure/eno/internal/reconstitution" + "github.com/Azure/eno/internal/testutil" +) + +// TODO: Cover no-op update, assert on exact k8s api requests + +// TODO: Why are we sending strategic patches for CRs? Why does it work? + +// TODO: Test what happens if the resource already exists but we have no previous record of it + +// TODO: Assert on status + +// TODO: Test renaming + +type crudTestCase struct { + Name string + Empty, Initial, Updated client.Object + AssertCreated, AssertUpdated func(t *testing.T, obj client.Object) + ApplyExternalUpdate func(t *testing.T, obj client.Object) client.Object +} + +var crudTests = []crudTestCase{ + { + // TODO: This test has a rare race condition I think + Name: "strategic-merge", // will fail if non-strategic merge is used + Empty: &corev1.Service{}, + Initial: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-obj", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Name: "first", + Port: 1234, + Protocol: corev1.ProtocolTCP, + }}, + }, + }, + AssertCreated: func(t *testing.T, obj client.Object) { + svc := obj.(*corev1.Service).Spec + assert.Equal(t, []corev1.ServicePort{{ + Name: "first", + Port: 1234, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(1234), + }}, svc.Ports) + }, + ApplyExternalUpdate: func(t *testing.T, obj client.Object) client.Object { + svc := obj.(*corev1.Service).DeepCopy() + svc.Spec.Ports = []corev1.ServicePort{{ + Name: "second", + Port: 2345, + Protocol: corev1.ProtocolTCP, + }} + return svc + }, + Updated: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-obj", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Name: "third", + Port: 3456, + Protocol: corev1.ProtocolTCP, + }}, + }, + }, + AssertUpdated: func(t *testing.T, obj client.Object) { + svc := obj.(*corev1.Service).Spec + assert.Equal(t, []corev1.ServicePort{ + { + Name: "third", + Port: 3456, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(3456), + }, + { + Name: "second", + Port: 2345, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(2345), + }, + }, svc.Ports) + }, + }, + { + Name: "cr-basics", + Empty: &testv1.TestResource{}, + Initial: &testv1.TestResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cr", + Namespace: "default", + }, + Spec: testv1.TestResourceSpec{ + Values: []*testv1.TestValue{{Int: 1}, {Int: 2}}, + }, + }, + AssertCreated: func(t *testing.T, obj client.Object) { + tr := obj.(*testv1.TestResource) + assert.Equal(t, []*testv1.TestValue{{Int: 1}, {Int: 2}}, tr.Spec.Values) + }, + Updated: &testv1.TestResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cr", + Namespace: "default", + }, + Spec: testv1.TestResourceSpec{ + Values: []*testv1.TestValue{{Int: 2}}, + }, + }, + AssertUpdated: func(t *testing.T, obj client.Object) { + tr := obj.(*testv1.TestResource) + assert.Equal(t, []*testv1.TestValue{{Int: 2}}, tr.Spec.Values) + }, + }, +} + +func TestCRUD(t *testing.T) { + scheme := runtime.NewScheme() + corev1.SchemeBuilder.AddToScheme(scheme) + testv1.SchemeBuilder.AddToScheme(scheme) + + for _, test := range crudTests { + test := test + t.Run(test.Name, func(t *testing.T) { + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + upstream := mgr.GetClient() + downstream := mgr.DownstreamClient + + // Register supporting controllers + rm, err := reconstitution.New(mgr.Manager, time.Millisecond) + require.NoError(t, err) + require.NoError(t, synthesis.NewRolloutController(mgr.Manager, time.Millisecond)) + require.NoError(t, synthesis.NewStatusController(mgr.Manager)) + require.NoError(t, synthesis.NewPodLifecycleController(mgr.Manager, &synthesis.Config{ + WrapperImage: "test-wrapper", + MaxRestarts: 2, + Timeout: time.Second * 5, + })) + + // Simulate synthesis of our test composition into the resources specified by the test case + testutil.NewPodController(t, mgr.Manager, newSliceBuilder(t, scheme, &test)) + + // Test subject + // Only enable rediscoverWhenNotFound on k8s versions that can support it. + err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15)) + require.NoError(t, err) + mgr.Start(t) + + // Any syn/comp will do since we faked out the synthesizer pod + syn := &apiv1.Synthesizer{} + syn.Name = "test-syn" + syn.Spec.Image = "create" + require.NoError(t, upstream.Create(ctx, syn)) + + comp := &apiv1.Composition{} + comp.Name = "test-comp" + comp.Namespace = "default" + comp.Spec.Synthesizer.Name = syn.Name + require.NoError(t, upstream.Create(ctx, comp)) + + var lastResourceVersion string + t.Run("creation", func(t *testing.T) { + var obj client.Object + testutil.Eventually(t, func() bool { + obj, err = test.Get(downstream) + return err == nil + }) + test.AssertCreated(t, obj) + lastResourceVersion = obj.GetResourceVersion() + }) + + if test.ApplyExternalUpdate != nil { + t.Run("external update", func(t *testing.T) { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + obj, err := test.Get(downstream) + require.NoError(t, err) + + updatedObj := test.ApplyExternalUpdate(t, obj) + if err := downstream.Update(ctx, updatedObj); err != nil { + return err + } + + lastResourceVersion = updatedObj.GetResourceVersion() + t.Logf("external update version %s", lastResourceVersion) + return nil + }) + require.NoError(t, err) + + // wait for this write to hit the informer cache + testutil.Eventually(t, func() bool { + obj, err := test.Get(downstream) + if err != nil || obj.GetResourceVersion() != lastResourceVersion { + return false + } + lastResourceVersion = obj.GetResourceVersion() + return true + }) + }) + } + + t.Run("update", func(t *testing.T) { + setSynImage(t, upstream, syn, "update") + + var obj client.Object + testutil.Eventually(t, func() bool { + obj, err = test.Get(downstream) + return err == nil && obj.GetResourceVersion() != lastResourceVersion + }) + test.AssertUpdated(t, obj) + }) + + // TODO + // t.Run("delete", func(t *testing.T) { + // setSynImage(t, upstream, syn, "delete") + + // testutil.Eventually(t, func() bool { + // _, err = test.Get(downstream) + // return errors.IsNotFound(err) + // }) + // }) + }) + } +} + +func (c *crudTestCase) Get(downstream client.Client) (client.Object, error) { + obj := c.Empty.DeepCopyObject().(client.Object) + obj.SetName(c.Initial.GetName()) + obj.SetNamespace(c.Initial.GetNamespace()) + return obj, downstream.Get(context.Background(), client.ObjectKeyFromObject(obj), obj) +} + +func setSynImage(t *testing.T, upstream client.Client, syn *apiv1.Synthesizer, image string) { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := upstream.Get(context.Background(), client.ObjectKeyFromObject(syn), syn); err != nil { + return err + } + syn.Spec.Image = image + return upstream.Update(context.Background(), syn) + }) + require.NoError(t, err) +} + +func newSliceBuilder(t *testing.T, scheme *runtime.Scheme, test *crudTestCase) func(c *apiv1.Composition, s *apiv1.Synthesizer) []*apiv1.ResourceSlice { + return func(c *apiv1.Composition, s *apiv1.Synthesizer) []*apiv1.ResourceSlice { + slice := &apiv1.ResourceSlice{} + slice.GenerateName = "test-" + slice.Namespace = "default" + slice.Spec.CompositionGeneration = c.Generation + + var obj client.Object + switch s.Spec.Image { + case "create": + obj = test.Initial.DeepCopyObject().(client.Object) + case "update": + obj = test.Updated.DeepCopyObject().(client.Object) + case "delete": + return []*apiv1.ResourceSlice{slice} + default: + t.Fatalf("unknown pseudo-image: %s", s.Spec.Image) + } + + gvks, _, err := scheme.ObjectKinds(obj) + require.NoError(t, err) + obj.GetObjectKind().SetGroupVersionKind(gvks[0]) + + js, err := json.Marshal(obj) + require.NoError(t, err) + + slice.Spec.Resources = []apiv1.Manifest{{Manifest: string(js)}} + return []*apiv1.ResourceSlice{slice} + } +} diff --git a/internal/controllers/synthesis/rollout.go b/internal/controllers/synthesis/rollout.go index ccff6d77..bf662bc5 100644 --- a/internal/controllers/synthesis/rollout.go +++ b/internal/controllers/synthesis/rollout.go @@ -98,7 +98,7 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, fmt.Errorf("updating synthesizer's current generation: %w", err) } logger.Info("rollout is complete - updated synthesizer's current generation") - return ctrl.Result{}, nil + return ctrl.Result{}, nil // TODO: Consider leaving this loop open in case new compositions fell through the cracks earlier } return ctrl.Result{}, nil diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 3b5e7c6a..3073a822 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -3,8 +3,10 @@ package testutil import ( "context" "fmt" + "os" "path/filepath" goruntime "runtime" + "strconv" "testing" "time" @@ -12,7 +14,12 @@ import ( "github.com/go-logr/logr/testr" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -22,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" apiv1 "github.com/Azure/eno/api/v1" + testv1 "github.com/Azure/eno/internal/controllers/reconciliation/fixtures/v1" "github.com/Azure/eno/internal/manager" ) @@ -53,13 +61,25 @@ func NewContext(t *testing.T) context.Context { return logr.NewContext(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 2})) } +// NewManager starts one or two envtest environments depending on the env. +// This should work seamlessly when run locally assuming binaries have been fetched with setup-envtest. +// In CI the second environment is used to compatibility test against a matrix of k8s versions. +// This compatibility testing is tightly coupled to the github action and not expected to work locally. func NewManager(t *testing.T) *Manager { _, b, _, _ := goruntime.Caller(0) root := filepath.Join(filepath.Dir(b), "..", "..") + testCrdDir := filepath.Join(root, "internal", "controllers", "reconciliation", "fixtures", "v1", "config", "crd") env := &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join(root, "api", "v1", "config", "crd")}, + CRDDirectoryPaths: []string{ + filepath.Join(root, "api", "v1", "config", "crd"), + testCrdDir, + }, ErrorIfCRDPathMissing: true, + + // We can't use KUBEBUILDER_ASSETS when also setting DOWNSTREAM_KUBEBUILDER_ASSETS + // because the envvar overrides BinaryAssetsDirectory + BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"), } t.Cleanup(func() { err := env.Stop() @@ -67,7 +87,6 @@ func NewManager(t *testing.T) *Manager { panic(err) } }) - cfg, err := env.Start() require.NoError(t, err) @@ -77,14 +96,82 @@ func NewManager(t *testing.T) *Manager { MetricsAddr: "127.0.0.1:0", }) require.NoError(t, err) + require.NoError(t, testv1.SchemeBuilder.AddToScheme(mgr.GetScheme())) // test-specific CRDs + + m := &Manager{ + Manager: mgr, + RestConfig: cfg, + DownstreamRestConfig: cfg, // possible override below + DownstreamClient: mgr.GetClient(), + } + + dir := os.Getenv("DOWNSTREAM_KUBEBUILDER_ASSETS") + if dir == "" { + return m // only one env needed + } + version, _ := strconv.Atoi(os.Getenv("DOWNSTREAM_VERSION_MINOR")) + + downstreamEnv := &envtest.Environment{ + BinaryAssetsDirectory: dir, + ErrorIfCRDPathMissing: true, + } + + // Only newer clusters can use envtest to install CRDs + if version >= 21 { + t.Logf("managing downstream cluster CRD with envtest because version >= 21") + downstreamEnv.CRDDirectoryPaths = append(downstreamEnv.CRDDirectoryPaths, testCrdDir) + } + + // k8s <1.13 will not start if these flags are set + if version < 13 { + conf := downstreamEnv.ControlPlane.GetAPIServer().Configure() + conf.Disable("service-account-signing-key-file") + conf.Disable("service-account-issuer") + } + + t.Cleanup(func() { + err := downstreamEnv.Stop() + if err != nil { + panic(err) + } + }) + m.DownstreamRestConfig, err = downstreamEnv.Start() + require.NoError(t, err) + + m.DownstreamClient, err = client.New(m.DownstreamRestConfig, client.Options{}) + require.NoError(t, err) + + // Log apiserver version + disc, err := discovery.NewDiscoveryClientForConfig(m.DownstreamRestConfig) + if err == nil { + version, err := disc.ServerVersion() + if err == nil { + t.Logf("downstream control plane version: %s", version.String()) + } + } + + // We install old (v1beta1) CRDs ourselves because envtest assumes v1 + if version < 21 { + t.Logf("managing downstream cluster CRD ourselves (not with envtest) because version < 21") + raw, err := os.ReadFile(filepath.Join(root, "internal", "controllers", "reconciliation", "fixtures", "v1", "config", "enotest.azure.io_testresources-old.yaml")) + require.NoError(t, err) + + res := &unstructured.Unstructured{} + require.NoError(t, yaml.Unmarshal(raw, res)) - return &Manager{ - Manager: mgr, + cli, err := client.New(m.DownstreamRestConfig, client.Options{}) + require.NoError(t, err) + require.NoError(t, cli.Create(context.Background(), res)) } + + return m } type Manager struct { ctrl.Manager + RestConfig *rest.Config + DownstreamRestConfig *rest.Config // may or may not == RestConfig + DownstreamClient client.Client // may or may not == Manager.GetClient() } func (m *Manager) Start(t *testing.T) { @@ -101,7 +188,7 @@ func Eventually(t testing.TB, fn func() bool) { t.Helper() start := time.Now() for { - if time.Since(start) > time.Second*2 { + if time.Since(start) > time.Second*5 { t.Fatalf("timeout while waiting for condition") return } @@ -123,8 +210,10 @@ func NewPodController(t testing.TB, mgr ctrl.Manager, fn func(*apiv1.Composition if err != nil { return reconcile.Result{}, err } + + // The state is populated async with pod creation so it may not exist at this point if comp.Status.CurrentState == nil { - return reconcile.Result{}, nil // wait for controller to write initial status + return reconcile.Result{}, nil } syn := &apiv1.Synthesizer{} @@ -134,21 +223,6 @@ func NewPodController(t testing.TB, mgr ctrl.Manager, fn func(*apiv1.Composition return reconcile.Result{}, err } - var slices []*apiv1.ResourceSlice - if fn != nil { - slices = fn(comp, syn) - for _, slice := range slices { - cp := slice.DeepCopy() - cp.Spec.CompositionGeneration = comp.Generation - if err := controllerutil.SetControllerReference(comp, cp, cli.Scheme()); err != nil { - return reconcile.Result{}, err - } - if err := cli.Create(ctx, cp); err != nil { - return reconcile.Result{}, err - } - } - } - pods := &corev1.PodList{} err = cli.List(ctx, pods, client.MatchingFields{ manager.IdxPodsByComposition: comp.Name, @@ -156,38 +230,74 @@ func NewPodController(t testing.TB, mgr ctrl.Manager, fn func(*apiv1.Composition if err != nil { return reconcile.Result{}, err } - if len(pods.Items) == 0 { - return reconcile.Result{}, nil // no pods yet - } - // Add resource slice count - the wrapper will do this in the real world - pod := pods.Items[0] - if comp.Status.CurrentState.ResourceSliceCount == nil { - count := int64(len(slices)) - comp.Status.CurrentState.ResourceSliceCount = &count - err = cli.Status().Update(ctx, comp) - if err != nil { + for _, pod := range pods.Items { + pod := pod + + // The real pod controller will ignore outdated (probably deleting) pods + compGen, _ := strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0) + synGen, _ := strconv.ParseInt(pod.Annotations["eno.azure.io/synthesizer-generation"], 10, 0) + if synGen < syn.Generation || compGen < comp.Generation { + t.Logf("skipping pod %s because it's out of date (%d < %d || %d < %d)", pod.Name, synGen, syn.Generation, compGen, comp.Generation) + continue + } + + // nil func == 0 slices + var slices []*apiv1.ResourceSlice + if fn != nil { + slices = fn(comp, syn) + } + + // Write all of the resource slices, update the resource slice count accordingly + // TODO: We need a controller to remove failed/outdated resource slice writes + if comp.Status.CurrentState.ResourceSliceCount == nil { + for _, slice := range slices { + cp := slice.DeepCopy() + cp.Spec.CompositionGeneration = comp.Generation + if err := controllerutil.SetControllerReference(comp, cp, cli.Scheme()); err != nil { + return reconcile.Result{}, err + } + if err := cli.Create(ctx, cp); err != nil { + return reconcile.Result{}, err // TODO: we can't recover from this + } + t.Logf("created resource slice: %s", cp.Name) + } + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := cli.Get(ctx, r.NamespacedName, comp) + if err != nil { + return err + } + count := int64(len(slices)) + comp.Status.CurrentState.ResourceSliceCount = &count + err = cli.Status().Update(ctx, comp) + if err != nil { + return err + } + t.Logf("updated resource slice count for %s (image %s)", pod.Name, pod.Spec.Containers[0].Image) + return nil + }) return reconcile.Result{}, err } - t.Logf("updated resource slice count for %s", pod.Name) - return reconcile.Result{}, nil } // Mark the pod as terminated to signal that synthesis is complete - if len(pod.Status.ContainerStatuses) == 0 { - pod.Status.ContainerStatuses = []corev1.ContainerStatus{{ - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 0, + for _, pod := range pods.Items { + if len(pod.Status.ContainerStatuses) == 0 { + pod.Status.ContainerStatuses = []corev1.ContainerStatus{{ + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 0, + }, }, - }, - }} - err = cli.Status().Update(ctx, &pod) - if err != nil { - return reconcile.Result{}, err + }} + err = cli.Status().Update(ctx, &pod) + if err != nil { + return reconcile.Result{}, err + } + t.Logf("updated container status for %s", pod.Name) + return reconcile.Result{}, nil } - t.Logf("updated container status for %s", pod.Name) - return reconcile.Result{}, nil } return reconcile.Result{}, nil @@ -199,3 +309,13 @@ func NewPodController(t testing.TB, mgr ctrl.Manager, fn func(*apiv1.Composition Build(podCtrl) require.NoError(t, err) } + +func AtLeastVersion(t *testing.T, minor int) bool { + versionStr := os.Getenv("DOWNSTREAM_VERSION_MINOR") + if versionStr == "" { + return true // fail open for local dev + } + + version, _ := strconv.Atoi(versionStr) + return version >= minor +}