Skip to content

Commit

Permalink
Perf optimizations / fixes (#26)
Browse files Browse the repository at this point in the history
Adds a simple performance test manifest and implements various fixes /
optimizations found while experimenting with the test.

---------

Co-authored-by: Jordan Olshevski <[email protected]>
  • Loading branch information
jveski and Jordan Olshevski authored Jan 2, 2024
1 parent 5cba364 commit aacab3e
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 33 deletions.
9 changes: 8 additions & 1 deletion cmd/eno-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/zapr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/Azure/eno/internal/controllers/synthesis"
Expand All @@ -28,14 +29,20 @@ func run() error {
var (
rolloutCooldown time.Duration
synthesisTimeout time.Duration
debugLogging bool
synconf = &synthesis.Config{}
)
flag.DurationVar(&synconf.Timeout, "synthesis-pod-timeout", time.Minute, "Maximum lifespan of synthesizer pods")
flag.DurationVar(&synthesisTimeout, "synthesis-timeout", time.Second*30, "Timeout when executing synthesizer binaries")
flag.DurationVar(&rolloutCooldown, "rollout-cooldown", time.Second*30, "Minimum period of time between each ensuing composition update after a synthesizer is updated")
flag.BoolVar(&debugLogging, "debug", true, "Enable debug logging")
flag.Parse()

zl, err := zap.NewProduction()
zapCfg := zap.NewProductionConfig()
if debugLogging {
zapCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
}
zl, err := zapCfg.Build()
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/eno-reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/zapr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/Azure/eno/internal/controllers/reconciliation"
Expand All @@ -28,17 +29,23 @@ func run() error {
rediscoverWhenNotFound bool
writeBatchInterval time.Duration
discoveryMaxRPS float32
debugLogging bool

mgrOpts = &manager.Options{
Rest: ctrl.GetConfigOrDie(),
}
)
flag.BoolVar(&rediscoverWhenNotFound, "rediscover-when-not-found", true, "Invalidate discovery cache when any type is not found in the openapi spec. Set this to false on <= k8s 1.14")
flag.DurationVar(&writeBatchInterval, "write-batch-interval", time.Second*5, "The max throughput of composition status updates")
flag.BoolVar(&debugLogging, "debug", true, "Enable debug logging")
mgrOpts.Bind(flag.CommandLine)
flag.Parse()

zl, err := zap.NewProduction()
zapCfg := zap.NewProductionConfig()
if debugLogging {
zapCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
}
zl, err := zapCfg.Build()
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions dev/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ spec:
containers:
- name: eno-reconciler
image: $REGISTRY/eno-reconciler:$TAG
env:
- name: PPROF_ADDR
value: ":8888"
36 changes: 36 additions & 0 deletions examples/loadtest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: eno.azure.io/v1
kind: Synthesizer
metadata:
name: load-test-synth
spec:
image: docker.io/ubuntu:latest
command:
- /bin/bash
- -c
- |
n=600
echo -n '['
for i in $(seq $n); do
echo -n "{
\"apiVersion\": \"v1\",
\"kind\": \"ConfigMap\",
\"metadata\": { \"name\": \"test-cm-${i}\", \"namespace\": \"default\", \"annotations\": { \"eno.azure.io/reconcile-interval\": \"10s\" } },
\"data\": { \"foo\": \"bar\" }
}"
if [[ $i == $n ]]; then
echo "]"
else
echo ","
fi
done
---

apiVersion: eno.azure.io/v1
kind: Composition
metadata:
name: load-test
spec:
synthesizer:
name: load-test-synth
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chzyer/readline v1.5.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
Expand All @@ -35,8 +36,10 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ=
github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -49,11 +53,15 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 h1:dHLYa5D8/Ta0aLR2XcPsrkpAgGeFs6thhMcQK0oQ0n8=
github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab h1:BA4a7pe6ZTd9F8kXETBoijjFJ/ntaa//1wiH9BZu4zU=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down Expand Up @@ -147,6 +155,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
Expand Down
65 changes: 52 additions & 13 deletions internal/controllers/reconciliation/discoverycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"

"github.com/go-logr/logr"
openapi_v2 "github.com/google/gnostic-models/openapiv2"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
Expand All @@ -17,10 +19,11 @@ import (
// discoveryCache is useful to prevent excessive QPS to the discovery APIs while
// still allowing dynamic refresh of the openapi spec on cache misses.
type discoveryCache struct {
mut sync.Mutex
client discovery.DiscoveryInterface
fillWhenNotFound bool
current openapi.Resources
mut sync.Mutex
client discovery.DiscoveryInterface
fillWhenNotFound bool
currentResources openapi.Resources
currentSupportedTypes map[schema.GroupVersionKind]struct{}
}

func newDicoveryCache(rc *rest.Config, qps float32, fillWhenNotFound bool) (*discoveryCache, error) {
Expand All @@ -33,7 +36,7 @@ func newDicoveryCache(rc *rest.Config, qps float32, fillWhenNotFound bool) (*dis
}
disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable

d := &discoveryCache{client: disc, fillWhenNotFound: fillWhenNotFound}
d := &discoveryCache{client: disc, fillWhenNotFound: fillWhenNotFound, currentSupportedTypes: map[schema.GroupVersionKind]struct{}{}}
return d, nil
}

Expand All @@ -45,16 +48,16 @@ func (d *discoveryCache) Get(ctx context.Context, gvk schema.GroupVersionKind) (
// Older versions of Kubernetes don't include CRDs in the openapi spec, so on those versions we cannot invalidate the cache if a resource is not found.
// However, on newer versions we expect every resource to exist in the spec so retries are safe and often necessary.
for i := 0; i < 2; i++ {
if d.current == nil {
if d.currentResources == nil {
logger.V(1).Info("filling discovery cache")
if err := d.fillUnlocked(ctx); err != nil {
return nil, err
}
}

model := d.current.LookupResource(gvk)
model := d.currentResources.LookupResource(gvk)
if model == nil && d.fillWhenNotFound {
d.current = nil // invalidate cache - retrieve fresh schema on next attempt
d.currentResources = nil // invalidate cache - retrieve fresh schema on next attempt
continue
}
return d.checkSupportUnlocked(ctx, gvk, model)
Expand All @@ -71,7 +74,8 @@ func (d *discoveryCache) fillUnlocked(ctx context.Context) error {
if err != nil {
return err
}
d.current = resources
d.currentResources = resources
d.currentSupportedTypes = buildSupportedTypesMap(doc)
return nil
}

Expand All @@ -82,11 +86,46 @@ func (d *discoveryCache) checkSupportUnlocked(ctx context.Context, gvk schema.Gr
return nil, nil
}

for _, c := range d.current.GetConsumes(gvk, "PATCH") {
if c == string(types.StrategicMergePatchType) {
return model, nil
}
if _, ok := d.currentSupportedTypes[gvk]; ok {
return model, nil
}

return nil, nil // doesn't support strategic merge
}

func buildSupportedTypesMap(doc *openapi_v2.Document) map[schema.GroupVersionKind]struct{} {
// This is copied and adapted from the kubectl openapi package
// Originally it walked the entire tree for every lookup, we have optimized it down to a single map lookup.
m := make(map[schema.GroupVersionKind]struct{})
for _, path := range doc.GetPaths().GetPath() {
for _, ex := range path.GetValue().GetPatch().GetVendorExtension() {
if ex.GetValue().GetYaml() == "" ||
ex.GetName() != "x-kubernetes-group-version-kind" {
continue
}

var value map[string]string
err := yaml.Unmarshal([]byte(ex.GetValue().GetYaml()), &value)
if err != nil {
continue
}

gvk := schema.GroupVersionKind{
Group: value["group"],
Version: value["version"],
Kind: value["kind"],
}
var supported bool
for _, c := range path.GetValue().GetPatch().GetConsumes() {
if c == string(types.StrategicMergePatchType) {
supported = true
break
}
}
if supported {
m[gvk] = struct{}{}
}
}
}
return m
}
78 changes: 78 additions & 0 deletions internal/controllers/reconciliation/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,81 @@ func getPhase(obj client.Object) string {
}
return anno["test-phase"]
}

func TestReconcileInterval(t *testing.T) {
scheme := runtime.NewScheme()
corev1.SchemeBuilder.AddToScheme(scheme)
testv1.SchemeBuilder.AddToScheme(scheme)

ctx := testutil.NewContext(t)
mgr := testutil.NewManager(t)
upstream := mgr.GetClient()
downstream := mgr.DownstreamClient

// Register supporting controllers
rm, err := reconstitution.New(mgr.Manager, time.Millisecond)
require.NoError(t, err)
require.NoError(t, synthesis.NewRolloutController(mgr.Manager, time.Millisecond))
require.NoError(t, synthesis.NewStatusController(mgr.Manager))
require.NoError(t, synthesis.NewPodLifecycleController(mgr.Manager, &synthesis.Config{
Timeout: time.Second * 5,
}))
require.NoError(t, synthesis.NewExecController(mgr.Manager, time.Second, &testutil.ExecConn{Hook: func(s *apiv1.Synthesizer) []client.Object {
obj := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-obj",
Namespace: "default",
Annotations: map[string]string{
"eno.azure.io/reconcile-interval": "250ms",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Name: "first",
Port: 1234,
Protocol: corev1.ProtocolTCP,
}},
},
}

gvks, _, err := scheme.ObjectKinds(obj)
require.NoError(t, err)
obj.GetObjectKind().SetGroupVersionKind(gvks[0])
return []client.Object{obj}
}}))

// Test subject
err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15))
require.NoError(t, err)
mgr.Start(t)

// Any syn/comp will do since we faked out the synthesizer pod
syn := &apiv1.Synthesizer{}
syn.Name = "test-syn"
syn.Spec.Image = "create"
require.NoError(t, upstream.Create(ctx, syn))

comp := &apiv1.Composition{}
comp.Name = "test-comp"
comp.Namespace = "default"
comp.Spec.Synthesizer.Name = syn.Name
require.NoError(t, upstream.Create(ctx, comp))

// Wait for service to be created
obj := &corev1.Service{}
testutil.Eventually(t, func() bool {
obj.SetName("test-obj")
obj.SetNamespace("default")
err = downstream.Get(context.Background(), client.ObjectKeyFromObject(obj), obj)
return err == nil
})

// Update the service from outside of Eno
obj.Spec.Ports[0].Port = 2345

// The service should eventually converge with the desired state
testutil.Eventually(t, func() bool {
err = downstream.Get(context.Background(), client.ObjectKeyFromObject(obj), obj)
return err == nil && obj.Spec.Ports[0].Port == 1234
})
}
3 changes: 2 additions & 1 deletion internal/controllers/synthesis/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,14 @@ func buildResourceSlices(comp *apiv1.Composition, previous []*apiv1.ResourceSlic
refs := map[resourceRef]struct{}{}
manifests := []apiv1.Manifest{}
for i, output := range outputs {
reconcileInterval := consumeReconcileIntervalAnnotation(output)
js, err := output.MarshalJSON()
if err != nil {
return nil, reconcile.TerminalError(fmt.Errorf("encoding output %d: %w", i, err))
}
manifests = append(manifests, apiv1.Manifest{
Manifest: string(js),
ReconcileInterval: consumeReconcileIntervalAnnotation(output),
ReconcileInterval: reconcileInterval,
})
refs[newResourceRef(output)] = struct{}{}
}
Expand Down
Loading

0 comments on commit aacab3e

Please sign in to comment.