Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add persistent volumes #33

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Client interface {
// IngressUpdate updates an existing ingress in the given namespace
IngressUpdate(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error

// NamespaceCreate creates a namespace
NamespaceCreate(ctx context.Context, namespace string) error
// NamespaceExists returns true if the namespace exists, false otherwise
NamespaceExists(ctx context.Context, namespace string) bool
// NamespaceDelete deletes the existing namespace
Expand All @@ -39,6 +41,9 @@ type Client interface {
EventsWatch(ctx context.Context, namespace string) (watch.Interface, error)

LogsGet(ctx context.Context, namespace string, name string) (string, error)

// TODO remove
TestClientSet() *kubernetes.Clientset
}

var _ Client = (*DefaultK8sClient)(nil)
Expand All @@ -48,6 +53,10 @@ type DefaultK8sClient struct {
ClientSet *kubernetes.Clientset
}

func (d *DefaultK8sClient) TestClientSet() *kubernetes.Clientset {
return d.ClientSet
}

func (d *DefaultK8sClient) IngressCreate(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error {
_, err := d.ClientSet.NetworkingV1().Ingresses(namespace).Create(ctx, ingress, metav1.CreateOptions{})
return err
Expand All @@ -67,6 +76,11 @@ func (d *DefaultK8sClient) IngressUpdate(ctx context.Context, namespace string,
return err
}

func (d *DefaultK8sClient) NamespaceCreate(ctx context.Context, namespace string) error {
_, err := d.ClientSet.CoreV1().Namespaces().Create(ctx, &coreV1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
return err
}

func (d *DefaultK8sClient) NamespaceExists(ctx context.Context, namespace string) bool {
_, err := d.ClientSet.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err == nil {
Expand Down
54 changes: 54 additions & 0 deletions internal/cmd/local/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package k8s

import (
"fmt"
"os"
"path/filepath"
"sigs.k8s.io/kind/pkg/cluster"
"time"
)
Expand Down Expand Up @@ -30,6 +32,11 @@ type kindCluster struct {

const k8sVersion = "v1.29.1"

var mountPath = func() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".airbyte", "abctl", "data")
}

func (k *kindCluster) Create(port int) error {
// see https://kind.sigs.k8s.io/docs/user/ingress/#create-cluster
rawCfg := fmt.Sprintf(`kind: Cluster
Expand All @@ -42,10 +49,14 @@ nodes:
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraMounts:
- hostPath: %s
containerPath: /var/local-path-provisioner
extraPortMappings:
- containerPort: 80
hostPort: %d
protocol: TCP`,
mountPath(),
port)

opts := []cluster.CreateOption{
Expand Down Expand Up @@ -80,3 +91,46 @@ func (k *kindCluster) Exists() bool {

return false
}

//func pvc(name string) *corev1.PersistentVolumeClaim {
// size, _ := resource.ParseQuantity("500Mi")
//
// return &corev1.PersistentVolumeClaim{
// ObjectMeta: metav1.ObjectMeta{Name: name},
// Spec: corev1.PersistentVolumeClaimSpec{
// AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
// Resources: corev1.VolumeResourceRequirements{
// Requests: corev1.ResourceList{corev1.ResourceStorage: size},
// },
// VolumeName: "",
// StorageClassName: nil,
// VolumeMode: nil,
// DataSource: nil,
// DataSourceRef: nil,
// VolumeAttributesClassName: nil,
// },
// Status: corev1.PersistentVolumeClaimStatus{},
// }
//}

//func pv(name string) *corev1.PersistentVolume {
// size, _ := resource.ParseQuantity("500Mi")
//
// return &corev1.PersistentVolume{
// ObjectMeta: metav1.ObjectMeta{Name: name},
// Spec: corev1.PersistentVolumeSpec{
// Capacity: corev1.ResourceList{corev1.ResourceStorage: size},
// PersistentVolumeSource: corev1.PersistentVolumeSource{},
// AccessModes: []corev1.PersistentVolumeAccessMode{
// corev1.ReadWriteOnce,
// },
// ClaimRef: nil,
// PersistentVolumeReclaimPolicy: "",
// StorageClassName: "",
// MountOptions: nil,
// VolumeMode: nil,
// NodeAffinity: nil,
// VolumeAttributesClassName: nil,
// },
// }
//}
176 changes: 107 additions & 69 deletions internal/cmd/local/local/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/k8s"
"github.com/airbytehq/abctl/internal/cmd/local/localerr"
"github.com/airbytehq/abctl/internal/telemetry"
Expand All @@ -18,19 +27,13 @@ import (
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/repo"
"helm.sh/helm/v3/pkg/storage/driver"
corev1 "k8s.io/api/core/v1"
v1events "k8s.io/api/events/v1"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -220,10 +223,77 @@ func New(provider k8s.Provider, opts ...Option) (*Command, error) {
return c, nil
}

func pv(namespace, name string) *corev1.PersistentVolume {
size, _ := resource.ParseQuantity("500Mi")
hostPathType := corev1.HostPathDirectoryOrCreate

return &corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: corev1.PersistentVolumeSpec{
Capacity: corev1.ResourceList{corev1.ResourceStorage: size},
PersistentVolumeSource: corev1.PersistentVolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: path.Join("/var/local-path-provisioner", name),
Type: &hostPathType,
},
},
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
PersistentVolumeReclaimPolicy: "Retain",
StorageClassName: "standard",
},
}
}

func pvc(name string, volumeName string) *corev1.PersistentVolumeClaim {
size, _ := resource.ParseQuantity("500Mi")
storageClass := "standard"

return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceStorage: size}},
VolumeName: volumeName,
StorageClassName: &storageClass,
},
Status: corev1.PersistentVolumeClaimStatus{},
}
}

// Install handles the installation of Airbyte
func (c *Command) Install(ctx context.Context, user, pass string) error {
go c.watchEvents(ctx)

if !c.k8s.NamespaceExists(ctx, airbyteNamespace) {
c.spinner.UpdateText(fmt.Sprintf("Creating namespace '%s'", airbyteNamespace))
if err := c.k8s.NamespaceCreate(ctx, airbyteNamespace); err != nil {
pterm.Error.Printfln("Could not create namespace '%s'", airbyteNamespace)
return fmt.Errorf("could not create airbyte namespace: %w", err)
}
} else {
pterm.Info.Printfln("Namespace '%s' already exists", airbyteNamespace)
}

// Create minio PV and PVC
if _, err := c.k8s.TestClientSet().CoreV1().PersistentVolumes().Create(ctx, pv(airbyteNamespace, "airbyte-minio-pv"), metav1.CreateOptions{}); err != nil {
pterm.Error.Printfln("Failed to create persistent volume: %s", err.Error())
}

if _, err := c.k8s.TestClientSet().CoreV1().PersistentVolumeClaims(airbyteNamespace).Create(ctx, pvc("airbyte-minio-pv-claim-airbyte-minio-0", "airbyte-minio-pv"), metav1.CreateOptions{}); err != nil {
pterm.Error.Printfln("Failed to create persistent volume claim: %s", err.Error())
}

// Create database PV and PVC
if _, err := c.k8s.TestClientSet().CoreV1().PersistentVolumes().Create(ctx, pv(airbyteNamespace, "airbyte-volume-db"), metav1.CreateOptions{}); err != nil {
pterm.Error.Printfln("Failed to create persistent volume: %s", err.Error())
}

if _, err := c.k8s.TestClientSet().CoreV1().PersistentVolumeClaims(airbyteNamespace).Create(ctx, pvc("airbyte-volume-db-airbyte-db-0", "airbyte-volume-db"), metav1.CreateOptions{}); err != nil {
pterm.Error.Printfln("Failed to create persistent volume claim: %s", err.Error())
}

var telUser string
// only override the empty telUser if the tel.User returns a non-nil (uuid.Nil) value.
if c.tel.User() != uuid.Nil {
Expand Down Expand Up @@ -279,6 +349,19 @@ func (c *Command) Install(ctx context.Context, user, pass string) error {
return fmt.Errorf("could not create or update basic-auth secret: %w", err)
}

if err := c.handleIngress(ctx); err != nil {
return err
}

c.spinner.UpdateText("Verifying ingress")
if err := c.openBrowser(ctx, fmt.Sprintf("http://localhost:%d", c.portHTTP)); err != nil {
return err
}

return nil
}

func (c *Command) handleIngress(ctx context.Context) error {
c.spinner.UpdateText("Checking for existing Ingress")

if c.k8s.IngressExists(ctx, airbyteNamespace, airbyteIngress) {
Expand All @@ -288,20 +371,15 @@ func (c *Command) Install(ctx context.Context, user, pass string) error {
return fmt.Errorf("could not update existing ingress: %w", err)
}
pterm.Success.Println("Updated existing Ingress")
} else {
pterm.Info.Println("No existing Ingress found, will create one")
if err := c.k8s.IngressCreate(ctx, airbyteNamespace, ingress()); err != nil {
pterm.Error.Println("Unable to create ingress")
return fmt.Errorf("could not create ingress: %w", err)
}
pterm.Success.Println("Ingress created")
return nil
}

c.spinner.UpdateText("Verifying ingress")
if err := c.openBrowser(ctx, fmt.Sprintf("http://localhost:%d", c.portHTTP)); err != nil {
return err
pterm.Info.Println("No existing Ingress found, creating one")
if err := c.k8s.IngressCreate(ctx, airbyteNamespace, ingress()); err != nil {
pterm.Error.Println("Unable to create ingress")
return fmt.Errorf("could not create ingress: %w", err)
}

pterm.Success.Println("Ingress created")
return nil
}

Expand Down Expand Up @@ -333,10 +411,10 @@ func (c *Command) watchEvents(ctx context.Context) {
}

// now is used to filter out kubernetes events that happened in the past.
// Kubernetes wants to use the ResourceVersion on the event watch request itself, but that approach
// Kubernetes wants us to use the ResourceVersion on the event watch request itself, but that approach
// is more complicated as it requires determining which ResourceVersion to initially provide.
var now = func() *v1.Time {
t := v1.Now()
var now = func() *metav1.Time {
t := metav1.Now()
return &t
}()

Expand Down Expand Up @@ -400,7 +478,12 @@ func (c *Command) handleBasicAuthSecret(ctx context.Context, user, pass string)
}

// Uninstall handles the uninstallation of Airbyte.
func (c *Command) Uninstall(ctx context.Context) error {
func (c *Command) Uninstall(ctx context.Context, persist bool) error {
// if not removing persisted data, then this is a no-op
if !persist {
return nil
}

{
c.spinner.UpdateText(fmt.Sprintf("Verifying %s Helm Chart installation status", airbyteChartName))

Expand Down Expand Up @@ -638,51 +721,6 @@ func (c *Command) openBrowser(ctx context.Context, url string) error {
return nil
}

// ingress creates an ingress type for defining the webapp ingress rules.
func ingress() *networkingv1.Ingress {
var pathType = networkingv1.PathType("Prefix")
var ingressClassName = "nginx"

return &networkingv1.Ingress{
TypeMeta: v1.TypeMeta{},
ObjectMeta: v1.ObjectMeta{
Name: airbyteIngress,
Namespace: airbyteNamespace,
Annotations: map[string]string{
"nginx.ingress.kubernetes.io/auth-type": "basic",
"nginx.ingress.kubernetes.io/auth-secret": "basic-auth",
"nginx.ingress.kubernetes.io/auth-realm": "Authentication Required - Airbyte (abctl)",
},
},
Spec: networkingv1.IngressSpec{
IngressClassName: &ingressClassName,
Rules: []networkingv1.IngressRule{
{
Host: "localhost",
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: fmt.Sprintf("%s-airbyte-webapp-svc", airbyteChartRelease),
Port: networkingv1.ServiceBackendPort{
Name: "http",
},
},
},
},
},
},
},
},
},
},
}
}

// defaultK8s returns the default k8s client
func defaultK8s(kubecfg, kubectx string) (k8s.Client, error) {
k8sCfg, err := k8sClientConfig(kubecfg, kubectx)
Expand Down
Loading
Loading