diff --git a/pkg/builder/kaniko/kaniko.go b/pkg/builder/kaniko/kaniko.go index 791a0b1e..721707c2 100644 --- a/pkg/builder/kaniko/kaniko.go +++ b/pkg/builder/kaniko/kaniko.go @@ -11,9 +11,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "github.com/celestiaorg/knuu/pkg/builder" + "github.com/celestiaorg/knuu/pkg/k8s" "github.com/celestiaorg/knuu/pkg/minio" "github.com/celestiaorg/knuu/pkg/names" ) @@ -31,10 +31,9 @@ const ( ) type Kaniko struct { - K8sClientset kubernetes.Interface - K8sNamespace string - Minio *minio.Minio // Minio service to store the build context if it's a directory - ContentName string // Name of the content pushed to Minio + K8s k8s.KubeManager + Minio *minio.Minio // Minio service to store the build context if it's a directory + ContentName string // Name of the content pushed to Minio } var _ builder.Builder = &Kaniko{} @@ -45,7 +44,7 @@ func (k *Kaniko) Build(ctx context.Context, b *builder.BuilderOptions) (logs str return "", ErrPreparingJob.Wrap(err) } - cJob, err := k.K8sClientset.BatchV1().Jobs(k.K8sNamespace).Create(ctx, job, metav1.CreateOptions{}) + cJob, err := k.K8s.Clientset().BatchV1().Jobs(k.K8s.Namespace()).Create(ctx, job, metav1.CreateOptions{}) if err != nil { return "", ErrCreatingJob.Wrap(err) } @@ -77,7 +76,7 @@ func (k *Kaniko) Build(ctx context.Context, b *builder.BuilderOptions) (logs str } func (k *Kaniko) waitForJobCompletion(ctx context.Context, job *batchv1.Job) (*batchv1.Job, error) { - watcher, err := k.K8sClientset.BatchV1().Jobs(k.K8sNamespace).Watch(ctx, metav1.ListOptions{ + watcher, err := k.K8s.Clientset().BatchV1().Jobs(k.K8s.Namespace()).Watch(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", job.Name), }) if err != nil { @@ -108,7 +107,7 @@ func (k *Kaniko) waitForJobCompletion(ctx context.Context, job *batchv1.Job) (*b } func (k *Kaniko) firstPodFromJob(ctx context.Context, job *batchv1.Job) (*v1.Pod, error) { - podList, err := k.K8sClientset.CoreV1().Pods(k.K8sNamespace).List(ctx, metav1.ListOptions{ + podList, err := k.K8s.Clientset().CoreV1().Pods(k.K8s.Namespace()).List(ctx, metav1.ListOptions{ LabelSelector: fmt.Sprintf("job-name=%s", job.Name), }) if err != nil { @@ -131,7 +130,7 @@ func (k *Kaniko) containerLogs(ctx context.Context, pod *v1.Pod) (string, error) Container: pod.Spec.Containers[0].Name, } - req := k.K8sClientset.CoreV1().Pods(k.K8sNamespace).GetLogs(pod.Name, &logOptions) + req := k.K8s.Clientset().CoreV1().Pods(k.K8s.Namespace()).GetLogs(pod.Name, &logOptions) logs, err := req.DoRaw(ctx) if err != nil { return "", err @@ -141,7 +140,7 @@ func (k *Kaniko) containerLogs(ctx context.Context, pod *v1.Pod) (string, error) } func (k *Kaniko) cleanup(ctx context.Context, job *batchv1.Job) error { - err := k.K8sClientset.BatchV1().Jobs(k.K8sNamespace). + err := k.K8s.Clientset().BatchV1().Jobs(k.K8s.Namespace()). Delete(ctx, job.Name, metav1.DeleteOptions{ PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationBackground}[0], }) @@ -150,7 +149,7 @@ func (k *Kaniko) cleanup(ctx context.Context, job *batchv1.Job) error { } // Delete the associated Pods - err = k.K8sClientset.CoreV1().Pods(k.K8sNamespace). + err = k.K8s.Clientset().CoreV1().Pods(k.K8s.Namespace()). DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ LabelSelector: fmt.Sprintf("job-name=%s", job.Name), }) diff --git a/pkg/builder/kaniko/kaniko_test.go b/pkg/builder/kaniko/kaniko_test.go index cd14da83..c849441f 100644 --- a/pkg/builder/kaniko/kaniko_test.go +++ b/pkg/builder/kaniko/kaniko_test.go @@ -14,20 +14,21 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/celestiaorg/knuu/pkg/builder" + "github.com/celestiaorg/knuu/pkg/k8s" ) const ( - k8sNamespace = "test-namespace" + k8sNamespace = "test-namespace" + testImage = "test-image" + testDestination = "registry.example.com/test-image:latest" ) func TestKanikoBuilder(t *testing.T) { k8sCS := fake.NewSimpleClientset() kb := &Kaniko{ - K8sClientset: k8sCS, - K8sNamespace: k8sNamespace, + K8s: k8s.NewCustom(k8sCS, k8sCS.Discovery(), nil, k8sNamespace), } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + ctx := context.Background() t.Run("BuildSuccess", func(t *testing.T) { blCtx := "git://github.com/mojtaba-esk/sample-docker" @@ -36,9 +37,9 @@ func TestKanikoBuilder(t *testing.T) { require.NoError(t, err, "GetDefaultCacheOptions should succeed") buildOptions := &builder.BuilderOptions{ - ImageName: "test-image", + ImageName: testImage, BuildContext: blCtx, - Destination: "registry.example.com/test-image:latest", + Destination: testDestination, Args: []string{"--build-arg=value"}, Cache: cacheOpts, } @@ -54,7 +55,7 @@ func TestKanikoBuilder(t *testing.T) { }() // Simulate the successful completion of the Job after a short delay - time.Sleep(2 * time.Second) + time.Sleep(500 * time.Millisecond) completeAllJobInFakeClientset(t, k8sCS, k8sNamespace) wg.Wait() @@ -63,40 +64,27 @@ func TestKanikoBuilder(t *testing.T) { assert.NotEmpty(t, logs, "Build logs should not be empty") }) - t.Run("BuildFailure", func(t *testing.T) { - buildOptions := &builder.BuilderOptions{ - ImageName: "test-image", - BuildContext: "invalid-context", // Simulate an invalid context - Destination: "registry.example.com/test-image:latest", - } - - logs, err := kb.Build(ctx, buildOptions) - - assert.Error(t, err, "Build should fail") - assert.Empty(t, logs, "Build logs should be empty") - }) - t.Run("BuildWithContextCancellation", func(t *testing.T) { buildOptions := &builder.BuilderOptions{ - ImageName: "test-image", + ImageName: testImage, BuildContext: "git://example.com/repo", - Destination: "registry.example.com/test-image:latest", + Destination: testDestination, } // Cancel the context to simulate cancellation during the build + ctx, cancel := context.WithCancel(ctx) cancel() logs, err := kb.Build(ctx, buildOptions) - assert.Error(t, err, "Build should fail due to context cancellation") - assert.Empty(t, logs, "Build logs should be empty") + assert.Error(t, err, "build should fail due to context cancellation") + assert.Empty(t, logs, "build logs should be empty") }) } func completeAllJobInFakeClientset(t *testing.T, clientset *fake.Clientset, namespace string) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + ctx := context.Background() job, err := clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{}) assert.NoError(t, err) @@ -125,8 +113,8 @@ func createPodFromJob(job *batchv1.Job) *v1.Pod { Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fake-container", // Adjust as needed - Image: "fake-image", // Adjust as needed + Name: "fake-container", + Image: "fake-image", }, }, }, diff --git a/pkg/knuu/knuu.go b/pkg/knuu/knuu.go index c74d3cc7..74a69cd6 100644 --- a/pkg/knuu/knuu.go +++ b/pkg/knuu/knuu.go @@ -124,19 +124,15 @@ func New(ctx context.Context, opts ...Option) (*Knuu, error) { } if k.MinioCli == nil { - // TODO: minio also needs a little refactor to accept k8s obj instead k.MinioCli = &minio.Minio{ - Clientset: k.K8sCli.Clientset(), - Namespace: k.K8sCli.Namespace(), + K8s: k.K8sCli, } } if k.ImageBuilder == nil { - // TODO: Also here for kaniko k.ImageBuilder = &kaniko.Kaniko{ - K8sClientset: k.K8sCli.Clientset(), - K8sNamespace: k.K8sCli.Namespace(), - Minio: k.MinioCli, + K8s: k.K8sCli, + Minio: k.MinioCli, } } diff --git a/pkg/knuu/knuu_old.go b/pkg/knuu/knuu_old.go index 84c4e40a..c583139e 100644 --- a/pkg/knuu/knuu_old.go +++ b/pkg/knuu/knuu_old.go @@ -82,9 +82,8 @@ func InitializeWithScope(testScope string) error { switch builderType { case "kubernetes": tmpKnuu.ImageBuilder = &kaniko.Kaniko{ - K8sClientset: tmpKnuu.K8sCli.Clientset(), - K8sNamespace: tmpKnuu.K8sCli.Namespace(), - Minio: tmpKnuu.MinioCli, + K8s: tmpKnuu.K8sCli, + Minio: tmpKnuu.MinioCli, } case "docker", "": tmpKnuu.ImageBuilder = &docker.Docker{ diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index 3f5a829f..bf518b42 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -4,9 +4,10 @@ import ( "context" "fmt" "io" - "net" "time" + "github.com/celestiaorg/knuu/pkg/k8s" + miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/sirupsen/logrus" @@ -16,7 +17,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" ) const ( @@ -42,8 +42,7 @@ const ( ) type Minio struct { - Clientset kubernetes.Interface - Namespace string + K8s k8s.KubeManager } func (m *Minio) DeployMinio(ctx context.Context) error { @@ -59,7 +58,7 @@ func (m *Minio) DeployMinio(ctx context.Context) error { return ErrMinioFailedToCreateOrUpdateService.Wrap(err) } - if err := m.waitForMinioService(ctx); err != nil { + if err := m.K8s.WaitForService(ctx, ServiceName); err != nil { return ErrMinioFailedToBeReadyService.Wrap(err) } @@ -68,13 +67,13 @@ func (m *Minio) DeployMinio(ctx context.Context) error { } func (m *Minio) createOrUpdateDeployment(ctx context.Context) error { - deploymentClient := m.Clientset.AppsV1().Deployments(m.Namespace) + deploymentClient := m.K8s.Clientset().AppsV1().Deployments(m.K8s.Namespace()) // Define the Minio deployment minioDeployment := &appsV1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: DeploymentName, - Namespace: m.Namespace, + Namespace: m.K8s.Namespace(), }, Spec: appsV1.DeploymentSpec{ Selector: &metav1.LabelSelector{ @@ -147,7 +146,7 @@ func (m *Minio) createOrUpdateDeployment(ctx context.Context) error { } func (m *Minio) IsMinioDeployed(ctx context.Context) (bool, error) { - deploymentClient := m.Clientset.AppsV1().Deployments(m.Namespace) + deploymentClient := m.K8s.Clientset().AppsV1().Deployments(m.K8s.Namespace()) _, err := deploymentClient.Get(ctx, DeploymentName, metav1.GetOptions{}) if err != nil { @@ -246,13 +245,13 @@ func (m *Minio) GetMinioURL(ctx context.Context, minioFilePath, bucketName strin } func (m *Minio) createOrUpdateService(ctx context.Context) error { - serviceClient := m.Clientset.CoreV1().Services(m.Namespace) + serviceClient := m.K8s.Clientset().CoreV1().Services(m.K8s.Namespace()) // Define Minio service minioService := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: ServiceName, - Namespace: m.Namespace, + Namespace: m.K8s.Namespace(), }, Spec: v1.ServiceSpec{ Selector: map[string]string{"app": "minio"}, @@ -313,7 +312,7 @@ func (m *Minio) createBucketIfNotExists(ctx context.Context, cli *miniogo.Client } func (m *Minio) getEndpoint(ctx context.Context) (string, error) { - minioService, err := m.Clientset.CoreV1().Services(m.Namespace).Get(ctx, ServiceName, metav1.GetOptions{}) + minioService, err := m.K8s.Clientset().CoreV1().Services(m.K8s.Namespace()).Get(ctx, ServiceName, metav1.GetOptions{}) if err != nil { return "", ErrMinioFailedToGetService.Wrap(err) } @@ -328,7 +327,7 @@ func (m *Minio) getEndpoint(ctx context.Context) (string, error) { if minioService.Spec.Type == v1.ServiceTypeNodePort { // Use the Node IP and NodePort - nodes, err := m.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodes, err := m.K8s.Clientset().CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return "", ErrMinioFailedToGetNodes.Wrap(err) } @@ -352,7 +351,7 @@ func (m *Minio) getEndpoint(ctx context.Context) (string, error) { func (m *Minio) waitForMinio(ctx context.Context) error { for { - deployment, err := m.Clientset.AppsV1().Deployments(m.Namespace).Get(ctx, DeploymentName, metav1.GetOptions{}) + deployment, err := m.K8s.Clientset().AppsV1().Deployments(m.K8s.Namespace()).Get(ctx, DeploymentName, metav1.GetOptions{}) if err == nil && deployment.Status.ReadyReplicas > 0 { break } @@ -368,64 +367,13 @@ func (m *Minio) waitForMinio(ctx context.Context) error { return nil } -func (m *Minio) waitForMinioService(ctx context.Context) error { - for { - service, err := m.Clientset.CoreV1().Services(m.Namespace).Get(ctx, ServiceName, metav1.GetOptions{}) - if err != nil { - return ErrMinioFailedToGetService.Wrap(err) - } - - if service.Spec.Type == v1.ServiceTypeLoadBalancer { - if len(service.Status.LoadBalancer.Ingress) == 0 { - time.Sleep(waitRetry) - continue // Wait until the LoadBalancer IP is available - } - } else if service.Spec.Type == v1.ServiceTypeNodePort { - if service.Spec.Ports[0].NodePort == 0 { - return ErrMinioNodePortNotSet - } - } else if len(service.Spec.ExternalIPs) == 0 { - return ErrMinioExternalIPsNotSet - } - - // Check if Minio is reachable - endpoint, err := m.getEndpoint(ctx) - if err != nil { - return ErrMinioFailedToGetEndpoint.Wrap(err) - } - - if err := checkServiceConnectivity(endpoint); err != nil { - time.Sleep(waitRetry) // Retry after some seconds if Minio is not reachable - continue - } - - break // Minio is reachable, exit the loop - } - - select { - case <-ctx.Done(): - return ErrMinioTimeoutWaitingForServiceReady - default: - return nil - } -} - -func checkServiceConnectivity(serviceEndpoint string) error { - conn, err := net.DialTimeout("tcp", serviceEndpoint, 2*time.Second) - if err != nil { - return ErrMinioFailedToConnect.WithParams(serviceEndpoint).Wrap(err) - } - defer conn.Close() - return nil // success -} - func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize string, createOptions metav1.CreateOptions) error { storageQt, err := resource.ParseQuantity(storageSize) if err != nil { return ErrMinioFailedToParseStorageSize.Wrap(err) } - pvcClient := m.Clientset.CoreV1().PersistentVolumeClaims(m.Namespace) + pvcClient := m.K8s.Clientset().CoreV1().PersistentVolumeClaims(m.K8s.Namespace()) // Check if PVC already exists _, err = pvcClient.Get(ctx, pvcName, metav1.GetOptions{}) @@ -435,7 +383,7 @@ func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize strin } // Create a simple PersistentVolume if no suitable one is found - pvList, err := m.Clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{}) + pvList, err := m.K8s.Clientset().CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{}) if err != nil { return ErrMinioFailedToListPersistentVolumes.Wrap(err) } @@ -451,7 +399,7 @@ func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize strin if existingPV == nil { // Create a simple PV if no existing PV is suitable - _, err = m.Clientset.CoreV1().PersistentVolumes().Create(ctx, &v1.PersistentVolume{ + _, err = m.K8s.Clientset().CoreV1().PersistentVolumes().Create(ctx, &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ GenerateName: pvPrefix, }, @@ -477,7 +425,7 @@ func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize strin pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcName, - Namespace: m.Namespace, + Namespace: m.K8s.Namespace(), }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},