Skip to content

Commit

Permalink
feat: pre-pull images and sideload them into the cluster (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-airbyte authored Nov 5, 2024
1 parent 91789e6 commit 985bb74
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 52 deletions.
58 changes: 21 additions & 37 deletions internal/cmd/images/manifest_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"strings"

"github.com/airbytehq/abctl/internal/cmd/local/helm"
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
"github.com/airbytehq/abctl/internal/common"
"github.com/airbytehq/abctl/internal/trace"
helmlib "github.com/mittwald/go-helm-client"
"helm.sh/helm/v3/pkg/repo"

"github.com/airbytehq/abctl/internal/common"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -27,16 +26,11 @@ type ManifestCmd struct {
Values string `type:"existingfile" help:"An Airbyte helm chart values file to configure helm."`
}

func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
func (c *ManifestCmd) Run(ctx context.Context) error {
ctx, span := trace.NewSpan(ctx, "images manifest")
defer span.End()

client, err := helm.New(provider.Kubeconfig, provider.Context, common.AirbyteNamespace)
if err != nil {
return err
}

images, err := c.findAirbyteImages(ctx, client)
images, err := c.findAirbyteImages(ctx)
if err != nil {
return err
}
Expand All @@ -48,7 +42,7 @@ func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
return nil
}

func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) ([]string, error) {
func (c *ManifestCmd) findAirbyteImages(ctx context.Context) ([]string, error) {
valuesYaml, err := helm.BuildAirbyteValues(ctx, helm.ValuesOpts{
ValuesFile: c.Values,
})
Expand All @@ -57,11 +51,20 @@ func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client)
}

airbyteChartLoc := helm.LocateLatestAirbyteChart(c.ChartVersion, c.Chart)
return findImagesFromChart(client, valuesYaml, airbyteChartLoc, c.ChartVersion)
return FindImagesFromChart(valuesYaml, airbyteChartLoc, c.ChartVersion)
}

func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion string) ([]string, error) {
err := client.AddOrUpdateChartRepo(repo.Entry{
func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, error) {

// sharing a helm client with the install code causes some weird issues,
// and templating the chart doesn't need details about the k8s provider,
// we create a throwaway helm client here.
client, err := helmlib.New(helm.ClientOptions(common.AirbyteNamespace))
if err != nil {
return nil, err
}

err = client.AddOrUpdateChartRepo(repo.Entry{
Name: common.AirbyteRepoName,
URL: common.AirbyteRepoURL,
})
Expand All @@ -88,7 +91,7 @@ func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion
// It returns a unique, sorted list of images found.
func findAllImages(chartYaml string) []string {
objs := decodeK8sResources(chartYaml)
imageSet := set[string]{}
imageSet := common.Set[string]{}

for _, obj := range objs {

Expand All @@ -98,7 +101,7 @@ func findAllImages(chartYaml string) []string {
if strings.HasSuffix(z.Name, "airbyte-env") {
for k, v := range z.Data {
if strings.HasSuffix(k, "_IMAGE") {
imageSet.add(v)
imageSet.Add(v)
}
}
}
Expand All @@ -116,15 +119,15 @@ func findAllImages(chartYaml string) []string {
}

for _, c := range podSpec.InitContainers {
imageSet.add(c.Image)
imageSet.Add(c.Image)
}
for _, c := range podSpec.Containers {
imageSet.add(c.Image)
imageSet.Add(c.Image)
}
}

var out []string
for _, k := range imageSet.items() {
for _, k := range imageSet.Items() {
if k != "" {
out = append(out, k)
}
Expand All @@ -149,22 +152,3 @@ func decodeK8sResources(renderedYaml string) []runtime.Object {
}
return out
}

type set[T comparable] struct {
vals map[T]struct{}
}

func (s *set[T]) add(v T) {
if s.vals == nil {
s.vals = map[T]struct{}{}
}
s.vals[v] = struct{}{}
}

func (s *set[T]) items() []T {
out := make([]T, len(s.vals))
for k := range s.vals {
out = append(out, k)
}
return out
}
9 changes: 3 additions & 6 deletions internal/cmd/images/manifest_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ func getHelmTestClient(t *testing.T) helm.Client {
}

func TestManifestCmd(t *testing.T) {
client := getHelmTestClient(t)
cmd := ManifestCmd{
ChartVersion: "1.1.0",
}
actual, err := cmd.findAirbyteImages(context.Background(), client)
actual, err := cmd.findAirbyteImages(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -48,12 +47,11 @@ func TestManifestCmd(t *testing.T) {
}

func TestManifestCmd_Enterprise(t *testing.T) {
client := getHelmTestClient(t)
cmd := ManifestCmd{
ChartVersion: "1.1.0",
Values: "testdata/enterprise.values.yaml",
}
actual, err := cmd.findAirbyteImages(context.Background(), client)
actual, err := cmd.findAirbyteImages(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -81,13 +79,12 @@ func TestManifestCmd_Enterprise(t *testing.T) {
}

func TestManifestCmd_Nightly(t *testing.T) {
client := getHelmTestClient(t)
cmd := ManifestCmd{
// This version includes chart fixes that expose images more consistently and completely.
ChartVersion: "1.1.0-nightly-1728428783-9025e1a46e",
Values: "testdata/enterprise.values.yaml",
}
actual, err := cmd.findAirbyteImages(context.Background(), client)
actual, err := cmd.findAirbyteImages(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/local/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Client interface {

ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error)
ImagePull(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error)
ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error)

ServerVersion(ctx context.Context) (types.Version, error)
VolumeInspect(ctx context.Context, volumeID string) (volume.Volume, error)
Expand Down
5 changes: 5 additions & 0 deletions internal/cmd/local/docker/dockertest/dockertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type MockClient struct {
FnContainerExecStart func(ctx context.Context, execID string, config container.ExecStartOptions) error
FnImageList func(ctx context.Context, options image.ListOptions) ([]image.Summary, error)
FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error)
FnImageSave func(ctx context.Context, imageIDs []string) (io.ReadCloser, error)
FnServerVersion func(ctx context.Context) (types.Version, error)
FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error)
FnInfo func(ctx context.Context) (system.Info, error)
Expand Down Expand Up @@ -82,6 +83,10 @@ func (m MockClient) ImagePull(ctx context.Context, refStr string, options image.
return m.FnImagePull(ctx, refStr, options)
}

func (m MockClient) ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) {
return m.ImageSave(ctx, imageIDs)
}

func (m MockClient) ServerVersion(ctx context.Context) (types.Version, error) {
return m.FnServerVersion(ctx)
}
Expand Down
22 changes: 13 additions & 9 deletions internal/cmd/local/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ type Client interface {
TemplateChart(spec *helmclient.ChartSpec, options *helmclient.HelmTemplateOptions) ([]byte, error)
}

func ClientOptions(namespace string) *helmclient.Options {
logger := helmLogger{}
return &helmclient.Options{
Namespace: namespace,
Output: logger,
DebugLog: logger.Debug,
Debug: true,
RepositoryCache: paths.HelmRepoCache,
RepositoryConfig: paths.HelmRepoConfig,
}
}

// New returns the default helm client
func New(kubecfg, kubectx, namespace string) (Client, error) {
k8sCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
Expand All @@ -38,16 +50,8 @@ func New(kubecfg, kubectx, namespace string) (Client, error) {
return nil, fmt.Errorf("%w: unable to create rest config: %w", localerr.ErrKubernetes, err)
}

logger := helmLogger{}
helm, err := helmclient.NewClientFromRestConf(&helmclient.RestConfClientOptions{
Options: &helmclient.Options{
Namespace: namespace,
Output: logger,
DebugLog: logger.Debug,
Debug: true,
RepositoryCache: paths.HelmRepoCache,
RepositoryConfig: paths.HelmRepoConfig,
},
Options: ClientOptions(namespace),
RestConfig: restCfg,
})
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions internal/cmd/local/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/docker"
"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
"github.com/airbytehq/abctl/internal/cmd/local/paths"
"github.com/airbytehq/abctl/internal/trace"
Expand All @@ -30,6 +31,7 @@ type Cluster interface {
Delete(ctx context.Context) error
// Exists returns true if the cluster exists, false otherwise.
Exists(ctx context.Context) bool
LoadImages(ctx context.Context, dockerClient docker.Client, images []string)
}

// interface sanity check
Expand Down Expand Up @@ -110,6 +112,23 @@ func (k *kindCluster) Exists(ctx context.Context) bool {
return false
}

// LoadImages pulls images from Docker Hub, and loads them into the kind cluster.
// This is a best-effort optimization, which is why it doesn't return an error.
// It's possible that only some images will be loaded.
func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) {
// Get a list of Kind nodes.
nodes, err := k.p.ListNodes(k.clusterName)
if err != nil {
pterm.Debug.Printfln("failed to load images: %s", err)
return
}

err = loadImages(ctx, dockerClient, nodes, images)
if err != nil {
pterm.Debug.Printfln("failed to load images: %s", err)
}
}

func formatKindErr(err error) error {
var kindErr *kindExec.RunError
if errors.As(err, &kindErr) {
Expand Down
Loading

0 comments on commit 985bb74

Please sign in to comment.