diff --git a/internal/controllers/reconciliation/controller.go b/internal/controllers/reconciliation/controller.go index b15859c5..a3f4601a 100644 --- a/internal/controllers/reconciliation/controller.go +++ b/internal/controllers/reconciliation/controller.go @@ -7,9 +7,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/client-go/discovery" "k8s.io/client-go/rest" - "k8s.io/kubectl/pkg/util/openapi" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -23,26 +21,16 @@ type Controller struct { resourceClient reconstitution.Client upstreamClient client.Client - openapi openapi.Resources + discovery *discoveryCache } -func New(mgr *reconstitution.Manager, upstream *rest.Config) error { - upstreamClient, err := client.New(upstream, client.Options{}) +func New(mgr *reconstitution.Manager, downstream *rest.Config) error { + upstreamClient, err := client.New(downstream, client.Options{}) if err != nil { return err } - disc, err := discovery.NewDiscoveryClientForConfig(upstream) - if err != nil { - return err - } - disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable - - doc, err := disc.OpenAPISchema() - if err != nil { - return err - } - resources, err := openapi.NewOpenAPIData(doc) + disc, err := newDicoveryCache(downstream, 1) // TODO: Expose if err != nil { return err } @@ -51,7 +39,7 @@ func New(mgr *reconstitution.Manager, upstream *rest.Config) error { client: mgr.Manager.GetClient(), resourceClient: mgr.GetClient(), upstreamClient: upstreamClient, - openapi: resources, + discovery: disc, }) } @@ -172,9 +160,9 @@ func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitut return nil, fmt.Errorf("building json representation of desired state: %w", err) } - model := c.openapi.LookupResource(resource.Object.GroupVersionKind()) - if model == nil { - return nil, fmt.Errorf("resource is not known") // TODO: Refresh cache? + model, err := c.discovery.Get(ctx, resource.Object.GroupVersionKind()) + if err != nil { + return nil, fmt.Errorf("getting merge metadata: %w", err) } patchmeta := strategicpatch.NewPatchMetaFromOpenAPI(model) diff --git a/internal/controllers/reconciliation/discoverycache.go b/internal/controllers/reconciliation/discoverycache.go index bcb8fa29..2076a70a 100644 --- a/internal/controllers/reconciliation/discoverycache.go +++ b/internal/controllers/reconciliation/discoverycache.go @@ -1,18 +1,76 @@ package reconciliation import ( + "context" + "fmt" "sync" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/kube-openapi/pkg/util/proto" "k8s.io/kubectl/pkg/util/openapi" ) -// TODO - +// discoveryCache is useful to prevent excessive QPS to the discovery APIs while +// still allowing dynamic refresh of the openapi spec on cache misses. type discoveryCache struct { mut sync.Mutex + client discovery.DiscoveryInterface current openapi.Resources } -func (d *discoveryCache) Get() openapi.Resources { +func newDicoveryCache(rc *rest.Config, qps float32) (*discoveryCache, error) { + conf := rest.CopyConfig(rc) + conf.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, 1) + + disc, err := discovery.NewDiscoveryClientForConfig(rc) + if err != nil { + return nil, err + } + disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable + + d := &discoveryCache{client: disc} + return d, nil +} + +func (d *discoveryCache) Get(ctx context.Context, gvk schema.GroupVersionKind) (proto.Schema, error) { + d.mut.Lock() + defer d.mut.Unlock() + + if d.current == nil { + if err := d.fillUnlocked(ctx); err != nil { + return nil, err + } + } + + model := d.current.LookupResource(gvk) + if model == nil { + if err := d.fillUnlocked(ctx); err != nil { + return nil, err + } + model = d.current.LookupResource(gvk) + if model == nil { + return nil, fmt.Errorf("resource was not found in openapi spec") + } + } + + return model, nil +} + +func (d *discoveryCache) fillUnlocked(ctx context.Context) error { + logr.FromContextOrDiscard(ctx).V(1).Info("filling discovery cache") + + doc, err := d.client.OpenAPISchema() + if err != nil { + return err + } + resources, err := openapi.NewOpenAPIData(doc) + if err != nil { + return err + } + d.current = resources return nil } diff --git a/internal/controllers/reconciliation/integration_test.go b/internal/controllers/reconciliation/integration_test.go index 58786389..4dd4770e 100644 --- a/internal/controllers/reconciliation/integration_test.go +++ b/internal/controllers/reconciliation/integration_test.go @@ -37,6 +37,7 @@ func TestControllerPodBasics(t *testing.T) { slice.GenerateName = "test-" slice.Namespace = "default" slice.Spec.CompositionGeneration = c.Generation + // TODO: This should use multiple containers switch s.Spec.Image { case "create": slice.Spec.Resources = []apiv1.Manifest{{