Skip to content

Commit

Permalink
feat: Optimize helm deploy by using goroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Suleiman Dibirov <[email protected]>
  • Loading branch information
idsulik committed Nov 15, 2024
1 parent df305b3 commit b07b960
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 32 deletions.
7 changes: 7 additions & 0 deletions docs-v2/content/en/schemas/v4beta12.json
Original file line number Diff line number Diff line change
Expand Up @@ -3296,6 +3296,12 @@
},
"LegacyHelmDeploy": {
"properties": {
"concurrency": {
"type": "integer",
"description": "how many packages can be installed concurrently. 0 means \"no-limit\".",
"x-intellij-html-description": "how many packages can be installed concurrently. 0 means &quot;no-limit&quot;.",
"default": "1"
},
"flags": {
"$ref": "#/definitions/HelmDeployFlags",
"description": "additional option flags that are passed on the command line to `helm`.",
Expand All @@ -3316,6 +3322,7 @@
}
},
"preferredOrder": [
"concurrency",
"releases",
"flags",
"hooks"
Expand Down
8 changes: 6 additions & 2 deletions pkg/skaffold/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,15 @@ func IsMixedPlatformCluster(ctx context.Context, kubeContext string) bool {
if err != nil || nodes == nil {
return false
}
set := make(map[string]interface{})
set := make(map[string]struct{})
for _, n := range nodes.Items {
set[fmt.Sprintf("%s/%s", n.Status.NodeInfo.OperatingSystem, n.Status.NodeInfo.Architecture)] = struct{}{}

if len(set) > 1 {
return true
}
}
return len(set) > 1
return false
}

// IsKindCluster checks that the given `kubeContext` is talking to `kind`.
Expand Down
86 changes: 59 additions & 27 deletions pkg/skaffold/deploy/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"path/filepath"
"sort"
"strings"
sync2 "sync"
"time"

"github.com/blang/semver"
"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"
apimachinery "k8s.io/apimachinery/pkg/runtime/schema"

"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/access"
Expand Down Expand Up @@ -260,40 +262,68 @@ func (h *Deployer) Deploy(ctx context.Context, out io.Writer, builds []graph.Art

nsMap := map[string]struct{}{}
manifests := manifest.ManifestList{}
g, ctx := errgroup.WithContext(ctx)

if h.Concurrency == nil || *h.Concurrency == 1 {
g.SetLimit(1)
olog.Entry(ctx).Infof("Installing %d releases sequentially", len(h.Releases))
} else {
g.SetLimit(*h.Concurrency)
olog.Entry(ctx).Infof("Installing %d releases concurrently", len(h.Releases))
}

var mu sync2.Mutex
// Deploy every release
for _, r := range h.Releases {
releaseName, err := util.ExpandEnvTemplateOrFail(r.Name, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand release name %q", r.Name), err)
}
chartVersion, err := util.ExpandEnvTemplateOrFail(r.Version, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand chart version %q", r.Version), err)
}
g.Go(func() error {
releaseName, err := util.ExpandEnvTemplateOrFail(r.Name, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand release name %q", r.Name), err)
}
chartVersion, err := util.ExpandEnvTemplateOrFail(r.Version, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand chart version %q", r.Version), err)
}

repo, err := util.ExpandEnvTemplateOrFail(r.Repo, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand repo %q", r.Repo), err)
}
r.ChartPath, err = util.ExpandEnvTemplateOrFail(r.ChartPath, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand chart path %q", r.ChartPath), err)
}
repo, err := util.ExpandEnvTemplateOrFail(r.Repo, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand repo %q", r.Repo), err)
}
r.ChartPath, err = util.ExpandEnvTemplateOrFail(r.ChartPath, nil)
if err != nil {
return helm.UserErr(fmt.Sprintf("cannot expand chart path %q", r.ChartPath), err)
}

m, results, err := h.deployRelease(ctx, out, releaseName, r, builds, h.bV, chartVersion, repo)
if err != nil {
return helm.UserErr(fmt.Sprintf("deploying %q", releaseName), err)
}
m, results, err := h.deployRelease(ctx, out, releaseName, r, builds, h.bV, chartVersion, repo)
if err != nil {
return helm.UserErr(fmt.Sprintf("deploying %q", releaseName), err)
}

manifests.Append(m)
mu.Lock()
manifests.Append(m)
mu.Unlock()

// collect namespaces
for _, r := range results {
if trimmed := strings.TrimSpace(r.Namespace); trimmed != "" {
nsMap[trimmed] = struct{}{}
// Collect namespaces first
newNamespaces := make(map[string]struct{})
for _, res := range results {
if trimmed := strings.TrimSpace(res.Namespace); trimmed != "" {
newNamespaces[trimmed] = struct{}{}
}
}
}

// Lock only once to update nsMap
mu.Lock()
for ns := range newNamespaces {
nsMap[ns] = struct{}{}
}
mu.Unlock()

return nil
})
}

if err := g.Wait(); err != nil {
return err
}

// Let's make sure that every image tag is set with `--set`.
Expand Down Expand Up @@ -510,7 +540,9 @@ func (h *Deployer) deployRelease(ctx context.Context, out io.Writer, releaseName
}

defer func() {
os.Remove(constants.HelmOverridesFilename)
if err := os.Remove(constants.HelmOverridesFilename); err != nil {
olog.Entry(ctx).Debugf("unable to remove %q: %v", constants.HelmOverridesFilename, err)
}
}()
}

Expand Down
94 changes: 94 additions & 0 deletions pkg/skaffold/deploy/helm/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"testing"

"github.com/GoogleContainerTools/skaffold/v2/testutil/concurrency"
"github.com/mitchellh/go-homedir"
"github.com/pkg/errors"
"k8s.io/client-go/tools/clientcmd/api"
Expand Down Expand Up @@ -987,6 +988,99 @@ func TestHelmDeploy(t *testing.T) {
}
}


func TestHelmDeployConcurrently(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "TestHelmDeploy")
if err != nil {
t.Fatalf("tempdir: %v", err)
}

tests := []struct {
description string
commands util.Command
env []string
helm latest.LegacyHelmDeploy
namespace string
configure func(*Deployer)
builds []graph.Artifact
force bool
shouldErr bool
expectedWarnings []string
expectedNamespaces []string
}{
{
description: "helm3.1 deploy success",
commands: concurrency.
CmdRunWithOutput("helm version --client", version31).
AndRun("helm --kube-context kubecontext get all skaffold-helm --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade skaffold-helm examples/test --post-renderer SKAFFOLD-BINARY --set some.key=somevalue -f skaffold-overrides.yaml --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRunWithOutput("helm --kube-context kubecontext get all skaffold-helm --template {{.Release.Manifest}} --kubeconfig kubeconfig", validDeployYaml),
helm: testDeployConfig,
builds: testBuilds,
expectedNamespaces: []string{""},
},
{
description: "first release without tag, second with tag",
commands: concurrency.
CmdRunWithOutput("helm version --client", version31).
AndRun("helm --kube-context kubecontext get all other --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade other examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRun("helm --kube-context kubecontext get all other --template {{.Release.Manifest}} --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext get all skaffold-helm --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade skaffold-helm examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRunWithOutput("helm --kube-context kubecontext get all skaffold-helm --template {{.Release.Manifest}} --kubeconfig kubeconfig", validDeployYaml),
helm: testTwoReleases,
builds: testBuilds,
expectedNamespaces: []string{""},
},
}

concurrencyCount := 3
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&helm.WriteBuildArtifacts, func([]graph.Artifact) (string, func(), error) { return "TMPFILE", func() {}, nil })
t.Override(&client.Client, deployutil.MockK8sClient)
fakeWarner := &warnings.Collect{}
env := test.env
if env == nil {
env = []string{"FOO=FOOBAR"}
}
t.Override(&warnings.Printf, fakeWarner.Warnf)
t.Override(&util.OSEnviron, func() []string { return env })
t.Override(&util.DefaultExecCommand, test.commands)
t.Override(&helm.OSExecutable, func() (string, error) { return "SKAFFOLD-BINARY", nil })
t.Override(&kubectx.CurrentConfig, func() (api.Config, error) {
return api.Config{CurrentContext: ""}, nil
})


test.helm.Concurrency = &concurrencyCount
deployer, err := NewDeployer(context.Background(), &helmConfig{
namespace: test.namespace,
force: test.force,
configFile: "test.yaml",
}, &label.DefaultLabeller{}, &test.helm, nil, "default", nil)
t.RequireNoError(err)

if test.configure != nil {
test.configure(deployer)
}
deployer.pkgTmpDir = tmpDir
// Deploy returns nil unless `helm get all <release>` is set up to return actual release info
err = deployer.Deploy(context.Background(), io.Discard, test.builds, manifest.ManifestListByConfig{})
t.CheckError(test.shouldErr, err)
t.CheckDeepEqual(test.expectedWarnings, fakeWarner.Warnings)
t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expectedNamespaces, *deployer.namespaces)
})
}
}

func TestHelmCleanup(t *testing.T) {
tests := []struct {
description string
Expand Down
12 changes: 12 additions & 0 deletions pkg/skaffold/schema/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func Set(c *latest.SkaffoldConfig) error {
defaultToLocalBuild(c)
setDefaultTagger(c)
setDefaultLogsConfig(c)
setHelmDefaults(c)

for _, a := range c.Build.Artifacts {
setDefaultWorkspace(a)
Expand Down Expand Up @@ -113,6 +114,17 @@ func Set(c *latest.SkaffoldConfig) error {
return nil
}

func setHelmDefaults(c *latest.SkaffoldConfig) {
if c.Deploy.LegacyHelmDeploy == nil {
return
}

if c.Deploy.LegacyHelmDeploy.Concurrency == nil {
defaultConcurrency := 1
c.Deploy.LegacyHelmDeploy.Concurrency = &defaultConcurrency
}
}

// SetDefaultRenderer sets the default manifests to rawYaml.
func SetDefaultRenderer(c *latest.SkaffoldConfig) {
if len(c.Render.Generate.Kpt) > 0 {
Expand Down
4 changes: 4 additions & 0 deletions pkg/skaffold/schema/latest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,10 @@ type KubectlFlags struct {

// LegacyHelmDeploy *beta* uses the `helm` CLI to apply the charts to the cluster.
type LegacyHelmDeploy struct {
// Concurrency is how many packages can be installed concurrently. 0 means "no-limit".
// Defaults to `1`.
Concurrency *int `yaml:"concurrency,omitempty"`

// Releases is a list of Helm releases.
Releases []HelmRelease `yaml:"releases,omitempty"`

Expand Down
6 changes: 3 additions & 3 deletions testutil/cmd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *FakeCmd) AndRunEnv(command string, env []string) *FakeCmd {
})
}

func (c *FakeCmd) RunCmdOut(ctx context.Context, cmd *exec.Cmd) ([]byte, error) {
func (c *FakeCmd) RunCmdOut(_ context.Context, cmd *exec.Cmd) ([]byte, error) {
c.timesCalled++
command := strings.Join(cmd.Args, " ")

Expand All @@ -212,7 +212,7 @@ func (c *FakeCmd) RunCmdOut(ctx context.Context, cmd *exec.Cmd) ([]byte, error)
return r.output, r.err
}

func (c *FakeCmd) RunCmdOutOnce(ctx context.Context, cmd *exec.Cmd) ([]byte, error) {
func (c *FakeCmd) RunCmdOutOnce(_ context.Context, cmd *exec.Cmd) ([]byte, error) {
c.timesCalled++
command := strings.Join(cmd.Args, " ")

Expand All @@ -224,7 +224,7 @@ func (c *FakeCmd) RunCmdOutOnce(ctx context.Context, cmd *exec.Cmd) ([]byte, err
return r.output, r.err
}

func (c *FakeCmd) RunCmd(ctx context.Context, cmd *exec.Cmd) error {
func (c *FakeCmd) RunCmd(_ context.Context, cmd *exec.Cmd) error {
c.timesCalled++
command := strings.Join(cmd.Args, " ")

Expand Down
Loading

0 comments on commit b07b960

Please sign in to comment.