Skip to content

Commit

Permalink
Add discovery cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jveski committed Nov 20, 2023
1 parent 859af5b commit a53f310
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 23 deletions.
28 changes: 8 additions & 20 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/util/openapi"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -23,26 +21,16 @@ type Controller struct {
resourceClient reconstitution.Client

upstreamClient client.Client
openapi openapi.Resources
discovery *discoveryCache
}

func New(mgr *reconstitution.Manager, upstream *rest.Config) error {
upstreamClient, err := client.New(upstream, client.Options{})
func New(mgr *reconstitution.Manager, downstream *rest.Config) error {
upstreamClient, err := client.New(downstream, client.Options{})
if err != nil {
return err
}

disc, err := discovery.NewDiscoveryClientForConfig(upstream)
if err != nil {
return err
}
disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable

doc, err := disc.OpenAPISchema()
if err != nil {
return err
}
resources, err := openapi.NewOpenAPIData(doc)
disc, err := newDicoveryCache(downstream, 1) // TODO: Expose
if err != nil {
return err
}
Expand All @@ -51,7 +39,7 @@ func New(mgr *reconstitution.Manager, upstream *rest.Config) error {
client: mgr.Manager.GetClient(),
resourceClient: mgr.GetClient(),
upstreamClient: upstreamClient,
openapi: resources,
discovery: disc,
})
}

Expand Down Expand Up @@ -172,9 +160,9 @@ func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitut
return nil, fmt.Errorf("building json representation of desired state: %w", err)
}

model := c.openapi.LookupResource(resource.Object.GroupVersionKind())
if model == nil {
return nil, fmt.Errorf("resource is not known") // TODO: Refresh cache?
model, err := c.discovery.Get(ctx, resource.Object.GroupVersionKind())
if err != nil {
return nil, fmt.Errorf("getting merge metadata: %w", err)
}

patchmeta := strategicpatch.NewPatchMetaFromOpenAPI(model)
Expand Down
64 changes: 61 additions & 3 deletions internal/controllers/reconciliation/discoverycache.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,76 @@
package reconciliation

import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kube-openapi/pkg/util/proto"
"k8s.io/kubectl/pkg/util/openapi"
)

// TODO

// discoveryCache is useful to prevent excessive QPS to the discovery APIs while
// still allowing dynamic refresh of the openapi spec on cache misses.
type discoveryCache struct {
mut sync.Mutex
client discovery.DiscoveryInterface
current openapi.Resources
}

func (d *discoveryCache) Get() openapi.Resources {
func newDicoveryCache(rc *rest.Config, qps float32) (*discoveryCache, error) {
conf := rest.CopyConfig(rc)
conf.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, 1)

disc, err := discovery.NewDiscoveryClientForConfig(rc)
if err != nil {
return nil, err
}
disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable

d := &discoveryCache{client: disc}
return d, nil
}

func (d *discoveryCache) Get(ctx context.Context, gvk schema.GroupVersionKind) (proto.Schema, error) {
d.mut.Lock()
defer d.mut.Unlock()

if d.current == nil {
if err := d.fillUnlocked(ctx); err != nil {
return nil, err
}
}

model := d.current.LookupResource(gvk)
if model == nil {
if err := d.fillUnlocked(ctx); err != nil {
return nil, err
}
model = d.current.LookupResource(gvk)
if model == nil {
return nil, fmt.Errorf("resource was not found in openapi spec")
}
}

return model, nil
}

func (d *discoveryCache) fillUnlocked(ctx context.Context) error {
logr.FromContextOrDiscard(ctx).V(1).Info("filling discovery cache")

doc, err := d.client.OpenAPISchema()
if err != nil {
return err
}
resources, err := openapi.NewOpenAPIData(doc)
if err != nil {
return err
}
d.current = resources
return nil
}
1 change: 1 addition & 0 deletions internal/controllers/reconciliation/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestControllerPodBasics(t *testing.T) {
slice.GenerateName = "test-"
slice.Namespace = "default"
slice.Spec.CompositionGeneration = c.Generation
// TODO: This should use multiple containers
switch s.Spec.Image {
case "create":
slice.Spec.Resources = []apiv1.Manifest{{
Expand Down

0 comments on commit a53f310

Please sign in to comment.