Skip to content

Commit

Permalink
More perf optimizations (#29)
Browse files Browse the repository at this point in the history
Contains essentially two optimizations:

- Release the openapi doc pointer after getting what we need from it
(it's very large, has several copies of things etc.)
- Expose the client QPS/burst and increase the defaults
- Add jitter to the resource reconciliation loop

---------

Co-authored-by: Jordan Olshevski <[email protected]>
  • Loading branch information
jveski and Jordan Olshevski authored Jan 3, 2024
1 parent c5b99b4 commit cf1adc9
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 83 deletions.
2 changes: 2 additions & 0 deletions cmd/eno-reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/Azure/eno/internal/reconstitution"
)

// TODO: Support two rest clients: upstream/downstream

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
Expand Down
2 changes: 1 addition & 1 deletion dev/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function build() {
}

# Build!
for f in cmd/*; do
for f in docker/*; do
build $f &
done
wait
Expand Down
2 changes: 2 additions & 0 deletions dev/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ spec:
containers:
- name: eno-reconciler
image: $REGISTRY/eno-reconciler:$TAG
args:
- --qps=1000
env:
- name: PPROF_ADDR
value: ":8888"
2 changes: 1 addition & 1 deletion docker/eno-controller/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ADD go.sum .
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 go build ./cmd/eno-controller
RUN CGO_ENABLED=0 go build -ldflags="-s -w" ./cmd/eno-controller

FROM scratch
COPY --from=builder /app/eno-controller /eno-controller
Expand Down
2 changes: 1 addition & 1 deletion docker/eno-reconciler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ADD go.sum .
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 go build ./cmd/eno-reconciler
RUN CGO_ENABLED=0 go build -ldflags="-s -w" ./cmd/eno-reconciler

FROM scratch
COPY --from=builder /app/eno-reconciler /eno-reconciler
Expand Down
2 changes: 1 addition & 1 deletion examples/loadtest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
- /bin/bash
- -c
- |
n=25
n=23
echo -n '['
for i in $(seq $n); do
echo -n "{
Expand Down
3 changes: 2 additions & 1 deletion internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
})

if resource != nil && resource.Manifest.ReconcileInterval != nil {
return ctrl.Result{RequeueAfter: resource.Manifest.ReconcileInterval.Duration}, nil
return ctrl.Result{RequeueAfter: wait.Jitter(resource.Manifest.ReconcileInterval.Duration, 0.1)}, nil
}
return ctrl.Result{}, nil
}
Expand Down
85 changes: 16 additions & 69 deletions internal/controllers/reconciliation/discoverycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,20 @@ import (
"sync"

"github.com/go-logr/logr"
openapi_v2 "github.com/google/gnostic-models/openapiv2"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kube-openapi/pkg/util/proto"
"k8s.io/kubectl/pkg/util/openapi"
)

// discoveryCache is useful to prevent excessive QPS to the discovery APIs while
// still allowing dynamic refresh of the openapi spec on cache misses.
type discoveryCache struct {
mut sync.Mutex
client discovery.DiscoveryInterface
fillWhenNotFound bool
currentResources openapi.Resources
currentSupportedTypes map[schema.GroupVersionKind]struct{}
mut sync.Mutex
client discovery.DiscoveryInterface
fillWhenNotFound bool
current map[schema.GroupVersionKind]proto.Schema
}

func newDicoveryCache(rc *rest.Config, qps float32, fillWhenNotFound bool) (*discoveryCache, error) {
Expand All @@ -36,7 +31,7 @@ func newDicoveryCache(rc *rest.Config, qps float32, fillWhenNotFound bool) (*dis
}
disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable

d := &discoveryCache{client: disc, fillWhenNotFound: fillWhenNotFound, currentSupportedTypes: map[schema.GroupVersionKind]struct{}{}}
d := &discoveryCache{client: disc, fillWhenNotFound: fillWhenNotFound}
return d, nil
}

Expand All @@ -48,19 +43,24 @@ func (d *discoveryCache) Get(ctx context.Context, gvk schema.GroupVersionKind) (
// Older versions of Kubernetes don't include CRDs in the openapi spec, so on those versions we cannot invalidate the cache if a resource is not found.
// However, on newer versions we expect every resource to exist in the spec so retries are safe and often necessary.
for i := 0; i < 2; i++ {
if d.currentResources == nil {
if d.current == nil {
logger.V(1).Info("filling discovery cache")
if err := d.fillUnlocked(ctx); err != nil {
return nil, err
}
}

model := d.currentResources.LookupResource(gvk)
if model == nil && d.fillWhenNotFound {
d.currentResources = nil // invalidate cache - retrieve fresh schema on next attempt
model, ok := d.current[gvk]
if !ok && d.fillWhenNotFound {
d.current = nil // invalidate cache - retrieve fresh schema on next attempt
continue
}
return d.checkSupportUnlocked(ctx, gvk, model)
if ok && model == nil {
logger.V(1).Info("type does not support strategic merge")
} else if model == nil {
logger.V(1).Info("type not found in openapi schema")
}
return model, nil
}
return nil, nil
}
Expand All @@ -70,62 +70,9 @@ func (d *discoveryCache) fillUnlocked(ctx context.Context) error {
if err != nil {
return err
}
resources, err := openapi.NewOpenAPIData(doc)
d.current, err = buildCurrentSchemaMap(doc)
if err != nil {
return err
}
d.currentResources = resources
d.currentSupportedTypes = buildSupportedTypesMap(doc)
return nil
}

func (d *discoveryCache) checkSupportUnlocked(ctx context.Context, gvk schema.GroupVersionKind, model proto.Schema) (proto.Schema, error) {
logger := logr.FromContextOrDiscard(ctx)
if model == nil {
logger.V(1).Info("type not found in openapi schema")
return nil, nil
}

if _, ok := d.currentSupportedTypes[gvk]; ok {
return model, nil
}

return nil, nil // doesn't support strategic merge
}

func buildSupportedTypesMap(doc *openapi_v2.Document) map[schema.GroupVersionKind]struct{} {
// This is copied and adapted from the kubectl openapi package
// Originally it walked the entire tree for every lookup, we have optimized it down to a single map lookup.
m := make(map[schema.GroupVersionKind]struct{})
for _, path := range doc.GetPaths().GetPath() {
for _, ex := range path.GetValue().GetPatch().GetVendorExtension() {
if ex.GetValue().GetYaml() == "" ||
ex.GetName() != "x-kubernetes-group-version-kind" {
continue
}

var value map[string]string
err := yaml.Unmarshal([]byte(ex.GetValue().GetYaml()), &value)
if err != nil {
continue
}

gvk := schema.GroupVersionKind{
Group: value["group"],
Version: value["version"],
Kind: value["kind"],
}
var supported bool
for _, c := range path.GetValue().GetPatch().GetConsumes() {
if c == string(types.StrategicMergePatchType) {
supported = true
break
}
}
if supported {
m[gvk] = struct{}{}
}
}
}
return m
}
106 changes: 106 additions & 0 deletions internal/controllers/reconciliation/kubectl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package reconciliation

// Everything in this file was adapted from kubectl's openapi library.
// It essentially implements the same behavior with various performance optimizations.

import (
openapi_v2 "github.com/google/gnostic-models/openapiv2"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kube-openapi/pkg/util/proto"
)

func buildCurrentSchemaMap(doc *openapi_v2.Document) (map[schema.GroupVersionKind]proto.Schema, error) {
models, err := proto.NewOpenAPIData(doc)
if err != nil {
return nil, err
}

allSupported := map[schema.GroupVersionKind]struct{}{}
for _, path := range doc.GetPaths().GetPath() {
for _, ex := range path.GetValue().GetPatch().GetVendorExtension() {
if ex.GetValue().GetYaml() == "" ||
ex.GetName() != "x-kubernetes-group-version-kind" {
continue
}

var value map[string]string
err := yaml.Unmarshal([]byte(ex.GetValue().GetYaml()), &value)
if err != nil {
continue
}

gvk := schema.GroupVersionKind{
Group: value["group"],
Version: value["version"],
Kind: value["kind"],
}
for _, c := range path.GetValue().GetPatch().GetConsumes() {
if c == string(types.StrategicMergePatchType) {
allSupported[gvk] = struct{}{}
break
}
}
}
}

m := map[schema.GroupVersionKind]proto.Schema{}
for _, modelName := range models.ListModels() {
model := models.LookupModel(modelName)
gvkList := parseGroupVersionKind(model)
for _, gvk := range gvkList {
if len(gvk.Kind) > 0 {
if _, ok := allSupported[gvk]; ok {
m[gvk] = model
} else {
m[gvk] = nil // unsupported == map key with nil model
}
}
}
}

return m, nil
}

func parseGroupVersionKind(s proto.Schema) []schema.GroupVersionKind {
extensions := s.GetExtensions()
gvkListResult := []schema.GroupVersionKind{}

gvkExtension, ok := extensions["x-kubernetes-group-version-kind"]
if !ok {
return []schema.GroupVersionKind{}
}

gvkList, ok := gvkExtension.([]interface{})
if !ok {
return []schema.GroupVersionKind{}
}

for _, gvk := range gvkList {
gvkMap, ok := gvk.(map[interface{}]interface{})
if !ok {
continue
}
group, ok := gvkMap["group"].(string)
if !ok {
continue
}
version, ok := gvkMap["version"].(string)
if !ok {
continue
}
kind, ok := gvkMap["kind"].(string)
if !ok {
continue
}

gvkListResult = append(gvkListResult, schema.GroupVersionKind{
Group: group,
Version: version,
Kind: kind,
})
}

return gvkListResult
}
4 changes: 2 additions & 2 deletions internal/controllers/synthesis/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// maxSliceJsonBytes is the max sum of a resource slice's manifests. It's set to 1mb, which leaves 512mb of space for the resource's status, encoding overhead, etc.
const maxSliceJsonBytes = 1024 * 1024
// maxSliceJsonBytes is the max sum of a resource slice's manifests. It's set to 1mb, which leaves 512kb of space for the resource's status, encoding overhead, etc.
const maxSliceJsonBytes = 1024 * 768

type execController struct {
client client.Client
Expand Down
26 changes: 20 additions & 6 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -52,16 +53,34 @@ type Options struct {
Namespace string
HealthProbeAddr string
MetricsAddr string
qps float64 // flags don't support float32, bind to this value and copy over to Rest.QPS during initialization
}

func (o *Options) Bind(set *flag.FlagSet) {
flag.StringVar(&o.Namespace, "namespace", "", "Only reconcile resources in a particular namespace")
set.StringVar(&o.Namespace, "namespace", "", "Only reconcile resources in a particular namespace")
set.StringVar(&o.HealthProbeAddr, "health-probe-addr", ":8081", "Address to serve health probes on")
set.StringVar(&o.MetricsAddr, "metrics-addr", ":8080", "Address to serve Prometheus metrics on")
set.IntVar(&o.Rest.Burst, "burst", 50, "apiserver client rate limiter burst configuration")
set.Float64Var(&o.qps, "qps", 20, "Max requests per second to apiserver")
}

func New(logger logr.Logger, opts *Options) (ctrl.Manager, error) {
opts.Rest.QPS = float32(opts.qps)

scheme := runtime.NewScheme()
err := apiv1.SchemeBuilder.AddToScheme(scheme)
if err != nil {
return nil, err
}
err = corev1.SchemeBuilder.AddToScheme(scheme)
if err != nil {
return nil, err
}

mgrOpts := manager.Options{
Logger: logger,
HealthProbeBindAddress: opts.HealthProbeAddr,
Scheme: scheme,
Metrics: server.Options{
BindAddress: opts.MetricsAddr,
},
Expand Down Expand Up @@ -95,11 +114,6 @@ func New(logger logr.Logger, opts *Options) (ctrl.Manager, error) {
return nil, err
}

err = apiv1.SchemeBuilder.AddToScheme(mgr.GetScheme())
if err != nil {
return nil, err
}

err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, IdxPodsByComposition, func(o client.Object) []string {
pod := o.(*corev1.Pod)
owner := metav1.GetControllerOf(pod)
Expand Down
2 changes: 1 addition & 1 deletion internal/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func NewManager(t *testing.T) *Manager {
m.DownstreamRestConfig, err = downstreamEnv.Start()
require.NoError(t, err)

m.DownstreamClient, err = client.New(m.DownstreamRestConfig, client.Options{})
m.DownstreamClient, err = client.New(m.DownstreamRestConfig, client.Options{Scheme: mgr.GetScheme()})
require.NoError(t, err)

// Log apiserver version
Expand Down

0 comments on commit cf1adc9

Please sign in to comment.