diff --git a/README.md b/README.md index aed7a3e..4f579a5 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,7 @@ metadata: name: torrent-sample spec: hub: + name: Huggingface repoID: Qwen/Qwen2.5-0.5B-Instruct ``` @@ -62,6 +63,7 @@ metadata: name: torrent-sample spec: hub: + name: Huggingface repoID: Qwen/Qwen2.5-0.5B-Instruct nodeSelector: zone: zone-a @@ -78,6 +80,7 @@ metadata: name: torrent-sample spec: hub: + name: Huggingface repoID: Qwen/Qwen2.5-0.5B-Instruct reclaimPolicy: Delete ``` diff --git a/agent/config/base/clusterrole.yaml b/agent/config/base/clusterrole.yaml index 5e81492..407e780 100644 --- a/agent/config/base/clusterrole.yaml +++ b/agent/config/base/clusterrole.yaml @@ -47,3 +47,11 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch diff --git a/agent/config/manager/daemonset.yaml b/agent/config/manager/daemonset.yaml index d1c34ec..489006e 100644 --- a/agent/config/manager/daemonset.yaml +++ b/agent/config/manager/daemonset.yaml @@ -4,6 +4,7 @@ metadata: name: manta-agent namespace: manta-system labels: + # This is required for manta when fetching peer IP. app: manta-agent spec: selector: @@ -14,7 +15,6 @@ spec: labels: app: manta-agent spec: - hostNetwork: true serviceAccountName: manta-agent initContainers: - name: init-permissions diff --git a/agent/pkg/controller/replication_controller.go b/agent/pkg/controller/replication_controller.go index 4373e0d..bfaa1de 100644 --- a/agent/pkg/controller/replication_controller.go +++ b/agent/pkg/controller/replication_controller.go @@ -20,6 +20,7 @@ import ( "context" "os" + corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -92,6 +93,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles. + // TODO: should we create a Job to handle this? See discussion: https://github.com/InftyAI/Manta/issues/25 if err := handler.HandleReplication(ctx, r.Client, replication); err != nil { logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication)) return ctrl.Result{}, err @@ -144,6 +146,13 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati // SetupWithManager sets up the controller with the Manager. func (r *ReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string { + pod := rawObj.(*corev1.Pod) + return []string{pod.Spec.NodeName} + }); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&api.Replication{}). WithOptions(controller.Options{MaxConcurrentReconciles: 5}). diff --git a/agent/pkg/handler/chunk_handler.go b/agent/pkg/handler/chunk_handler.go index 12256c5..544d9d1 100644 --- a/agent/pkg/handler/chunk_handler.go +++ b/agent/pkg/handler/chunk_handler.go @@ -72,8 +72,8 @@ func SendChunk(w http.ResponseWriter, r *http.Request) { } } -func recvChunk(blobPath, snapshotPath, ipAddr string) error { - url := fmt.Sprintf("http://%s:%s/sync?path=%s", ipAddr, api.HttpPort, blobPath) +func recvChunk(blobPath, snapshotPath, addr string) error { + url := fmt.Sprintf("http://%s:%s/sync?path=%s", addr, api.HttpPort, blobPath) resp, err := http.Get(url) if err != nil { diff --git a/agent/pkg/handler/replication_handler.go b/agent/pkg/handler/handler.go similarity index 88% rename from agent/pkg/handler/replication_handler.go rename to agent/pkg/handler/handler.go index e09b703..40396d3 100644 --- a/agent/pkg/handler/replication_handler.go +++ b/agent/pkg/handler/handler.go @@ -24,7 +24,8 @@ import ( "strings" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -104,7 +105,7 @@ func syncChunk(ctx context.Context, client client.Client, replication *api.Repli sourceSplits := strings.Split(*replication.Spec.Source.URI, "://") addresses := strings.Split(sourceSplits[1], "@") nodeName, blobPath := addresses[0], addresses[1] - nodeIP, err := nodeIP(ctx, client, nodeName) + addr, err := peerAddr(ctx, client, nodeName) if err != nil { return err } @@ -112,7 +113,7 @@ func syncChunk(ctx context.Context, client client.Client, replication *api.Repli // The destination URI looks like localhost:// destSplits := strings.Split(*replication.Spec.Destination.URI, "://") - if err := recvChunk(blobPath, destSplits[1], nodeIP); err != nil { + if err := recvChunk(blobPath, destSplits[1], addr); err != nil { logger.Error(err, "failed to sync chunk") return err } @@ -196,15 +197,22 @@ func parseURI(uri string) (host string, address string) { return splits[0], splits[1] } -func nodeIP(ctx context.Context, client client.Client, nodeName string) (string, error) { - node := corev1.Node{} - if err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil { +func peerAddr(ctx context.Context, c client.Client, nodeName string) (string, error) { + fieldSelector := fields.OneTermEqualSelector("spec.nodeName", nodeName) + listOptions := &client.ListOptions{ + FieldSelector: fieldSelector, + // HACK: the label is hacked, we must set it explicitly in the daemonSet. + LabelSelector: labels.SelectorFromSet(map[string]string{"app": "manta-agent"}), + } + + pods := corev1.PodList{} + if err := c.List(ctx, &pods, listOptions); err != nil { return "", err } - for _, address := range node.Status.Addresses { - if address.Type == "InternalIP" { - return address.Address, nil - } + + if len(pods.Items) != 1 { + return "", fmt.Errorf("got more than one pod per node for daemonSet") } - return "", fmt.Errorf("can't get node internal IP") + + return pods.Items[0].Status.PodIP, nil } diff --git a/agent/pkg/handler/replication_handler_test.go b/agent/pkg/handler/handler_test.go similarity index 100% rename from agent/pkg/handler/replication_handler_test.go rename to agent/pkg/handler/handler_test.go diff --git a/agent/pkg/task/task.go b/agent/pkg/task/task.go index 2be9393..29afd7d 100644 --- a/agent/pkg/task/task.go +++ b/agent/pkg/task/task.go @@ -23,7 +23,6 @@ import ( "path/filepath" "time" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,25 +42,20 @@ const ( workspace = cons.DefaultWorkspace ) -var ( - logger logr.Logger -) - func BackgroundTasks(ctx context.Context, c client.Client) { - ctrl.SetLogger(zap.New(zap.UseDevMode(true))) - logger = ctrl.Log.WithName("Background") - // Sync the disk chunk infos to the nodeTracker. go syncChunks(ctx, c) } func syncChunks(ctx context.Context, c client.Client) { + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + logger := ctrl.Log.WithName("Background tasks") + forFunc := func(ctx context.Context) error { attempts := 0 for { attempts += 1 if err := findOrCreateNodeTracker(ctx, c); err != nil { - // fmt.Printf("failed to create nodeTracker: %v, retry.", err) logger.Error(err, "Failed to create nodeTracker, retry...") if attempts > 10 { diff --git a/cmd/main.go b/cmd/main.go index 4b3afd8..17dcf98 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -39,7 +39,6 @@ import ( "github.com/inftyai/manta/pkg/dispatcher" "github.com/inftyai/manta/pkg/dispatcher/framework" "github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware" - "github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber" "github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector" "github.com/inftyai/manta/pkg/webhook" //+kubebuilder:scaffold:imports @@ -133,7 +132,7 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) { <-certsReady setupLog.Info("certs ready") - dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New}) + dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New}) if err != nil { setupLog.Error(err, "unable to create dispatcher") os.Exit(1) diff --git a/pkg/controller/nodetracker_controller.go b/pkg/controller/nodetracker_controller.go index 2c6df20..7b566a5 100644 --- a/pkg/controller/nodetracker_controller.go +++ b/pkg/controller/nodetracker_controller.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -91,8 +90,6 @@ func (r *NodeTrackerReconciler) Create(e event.CreateEvent) bool { return false } - logger := log.FromContext(context.Background()).WithValues("NodeTracker", klog.KObj(nodeTracker)) - logger.Info("NodeTracker create event") r.dispatcher.AddNodeTracker(nodeTracker) return true } @@ -110,7 +107,11 @@ func (r *NodeTrackerReconciler) Update(e event.UpdateEvent) bool { } func (r *NodeTrackerReconciler) Delete(e event.DeleteEvent) bool { - obj := e.Object.(*api.NodeTracker) + obj, match := e.Object.(*api.NodeTracker) + if !match { + return false + } + r.dispatcher.DeleteNodeTracker(obj) return true } diff --git a/pkg/dispatcher/plugins/gnumber/goroutine_number.go b/pkg/dispatcher/plugins/gnumber/goroutine_number.go deleted file mode 100644 index 00218a5..0000000 --- a/pkg/dispatcher/plugins/gnumber/goroutine_number.go +++ /dev/null @@ -1,47 +0,0 @@ -/* -Copyright 2024. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gnumber - -import ( - "context" - "runtime" - - api "github.com/inftyai/manta/api/v1alpha1" - "github.com/inftyai/manta/pkg/dispatcher/cache" - "github.com/inftyai/manta/pkg/dispatcher/framework" -) - -var _ framework.ScorePlugin = &GNumber{} - -const ( - defaultGoroutineLimit = 1000 -) - -type GNumber struct{} - -func New() (framework.Plugin, error) { - return &GNumber{}, nil -} - -func (g *GNumber) Name() string { - return "GNumber" -} - -func (g *GNumber) Score(_ context.Context, _ framework.ChunkInfo, _ *framework.NodeInfo, _ api.NodeTracker, _ *cache.Cache) float32 { - number := runtime.NumGoroutine() - return (1 - float32(number)/float32(defaultGoroutineLimit)) * 100 -} diff --git a/test/e2e/suit_test.go b/test/e2e/suit_test.go index 00ff7b8..f5a782b 100644 --- a/test/e2e/suit_test.go +++ b/test/e2e/suit_test.go @@ -26,7 +26,6 @@ import ( admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -35,7 +34,6 @@ import ( api "github.com/inftyai/manta/api/v1alpha1" "github.com/inftyai/manta/test/util" - "github.com/inftyai/manta/test/util/wrapper" ) var cfg *rest.Config @@ -78,28 +76,6 @@ var _ = AfterSuite(func() { }) func readyForTesting(client client.Client) { - By("waiting for webhooks to ready") - - // To verify that webhooks are ready, let's create a simple Replication. - replication := wrapper.MakeReplication("sample-replication"). - NodeName("unknown-node"). - ChunkName("chunk1"). - SizeBytes(1024). - SourceOfURI("localhost:///workspace/models/modelA"). - Obj() - - // Once the creation succeeds, that means the webhooks are ready - // and we can begin testing. - Eventually(func() error { - return client.Create(ctx, replication) - }, util.Timeout, util.Interval).Should(Succeed()) - - // Delete this replication before beginning tests. - Expect(client.Delete(ctx, replication)).To(Succeed()) - Eventually(func() error { - return client.Get(ctx, types.NamespacedName{Name: replication.Name}, &api.Replication{}) - }).ShouldNot(Succeed()) - By("waiting for nodeTrackers to ready") Eventually(func() error { nodeTrackers := &api.NodeTrackerList{} @@ -110,5 +86,5 @@ func readyForTesting(client client.Client) { return fmt.Errorf("no nodeTrackers") } return nil - }, util.Timeout, util.Interval).Should(Succeed()) + }, util.Timeout*3, util.Interval).Should(Succeed()) } diff --git a/test/integration/controller/suite_test.go b/test/integration/controller/suite_test.go index ab0529c..5f863c7 100644 --- a/test/integration/controller/suite_test.go +++ b/test/integration/controller/suite_test.go @@ -42,7 +42,6 @@ import ( "github.com/inftyai/manta/pkg/dispatcher" "github.com/inftyai/manta/pkg/dispatcher/framework" "github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware" - "github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber" "github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector" ) @@ -106,7 +105,7 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New}) + dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New}) Expect(err).ToNot(HaveOccurred()) torrentController := controller.NewTorrentReconciler(mgr.GetClient(), mgr.GetScheme(), dispatcher)