Skip to content

Commit

Permalink
Use PodIP for data transformaiton
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet committed Nov 15, 2024
1 parent d924085 commit 31e946d
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 103 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ metadata:
name: torrent-sample
spec:
hub:
name: Huggingface
repoID: Qwen/Qwen2.5-0.5B-Instruct
```
Expand All @@ -62,6 +63,7 @@ metadata:
name: torrent-sample
spec:
hub:
name: Huggingface
repoID: Qwen/Qwen2.5-0.5B-Instruct
nodeSelector:
zone: zone-a
Expand All @@ -78,6 +80,7 @@ metadata:
name: torrent-sample
spec:
hub:
name: Huggingface
repoID: Qwen/Qwen2.5-0.5B-Instruct
reclaimPolicy: Delete
```
Expand Down
8 changes: 8 additions & 0 deletions agent/config/base/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,11 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
2 changes: 1 addition & 1 deletion agent/config/manager/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: manta-agent
namespace: manta-system
labels:
# This is required for manta when fetching peer IP.
app: manta-agent
spec:
selector:
Expand All @@ -14,7 +15,6 @@ spec:
labels:
app: manta-agent
spec:
hostNetwork: true
serviceAccountName: manta-agent
initContainers:
- name: init-permissions
Expand Down
9 changes: 9 additions & 0 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"os"

corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
// TODO: should we create a Job to handle this? See discussion: https://github.com/InftyAI/Manta/issues/25
if err := handler.HandleReplication(ctx, r.Client, replication); err != nil {
logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, err
Expand Down Expand Up @@ -144,6 +146,13 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati

// SetupWithManager sets up the controller with the Manager.
func (r *ReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string {
pod := rawObj.(*corev1.Pod)
return []string{pod.Spec.NodeName}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&api.Replication{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
Expand Down
4 changes: 2 additions & 2 deletions agent/pkg/handler/chunk_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func SendChunk(w http.ResponseWriter, r *http.Request) {
}
}

func recvChunk(blobPath, snapshotPath, ipAddr string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", ipAddr, api.HttpPort, blobPath)
func recvChunk(blobPath, snapshotPath, addr string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", addr, api.HttpPort, blobPath)

resp, err := http.Get(url)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -104,15 +105,15 @@ func syncChunk(ctx context.Context, client client.Client, replication *api.Repli
sourceSplits := strings.Split(*replication.Spec.Source.URI, "://")
addresses := strings.Split(sourceSplits[1], "@")
nodeName, blobPath := addresses[0], addresses[1]
nodeIP, err := nodeIP(ctx, client, nodeName)
addr, err := peerAddr(ctx, client, nodeName)
if err != nil {
return err
}

// The destination URI looks like localhost://<path-to-your-file>
destSplits := strings.Split(*replication.Spec.Destination.URI, "://")

if err := recvChunk(blobPath, destSplits[1], nodeIP); err != nil {
if err := recvChunk(blobPath, destSplits[1], addr); err != nil {
logger.Error(err, "failed to sync chunk")
return err
}
Expand Down Expand Up @@ -196,15 +197,22 @@ func parseURI(uri string) (host string, address string) {
return splits[0], splits[1]
}

func nodeIP(ctx context.Context, client client.Client, nodeName string) (string, error) {
node := corev1.Node{}
if err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
func peerAddr(ctx context.Context, c client.Client, nodeName string) (string, error) {
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
listOptions := &client.ListOptions{
FieldSelector: fieldSelector,
// HACK: the label is hacked, we must set it explicitly in the daemonSet.
LabelSelector: labels.SelectorFromSet(map[string]string{"app": "manta-agent"}),
}

pods := corev1.PodList{}
if err := c.List(ctx, &pods, listOptions); err != nil {
return "", err
}
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" {
return address.Address, nil
}

if len(pods.Items) != 1 {
return "", fmt.Errorf("got more than one pod per node for daemonSet")
}
return "", fmt.Errorf("can't get node internal IP")

return pods.Items[0].Status.PodIP, nil
}
File renamed without changes.
12 changes: 3 additions & 9 deletions agent/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"path/filepath"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,25 +42,20 @@ const (
workspace = cons.DefaultWorkspace
)

var (
logger logr.Logger
)

func BackgroundTasks(ctx context.Context, c client.Client) {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
logger = ctrl.Log.WithName("Background")

// Sync the disk chunk infos to the nodeTracker.
go syncChunks(ctx, c)
}

func syncChunks(ctx context.Context, c client.Client) {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
logger := ctrl.Log.WithName("Background tasks")

forFunc := func(ctx context.Context) error {
attempts := 0
for {
attempts += 1
if err := findOrCreateNodeTracker(ctx, c); err != nil {
// fmt.Printf("failed to create nodeTracker: %v, retry.", err)
logger.Error(err, "Failed to create nodeTracker, retry...")

if attempts > 10 {
Expand Down
3 changes: 1 addition & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/inftyai/manta/pkg/dispatcher"
"github.com/inftyai/manta/pkg/dispatcher/framework"
"github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware"
"github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber"
"github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector"
"github.com/inftyai/manta/pkg/webhook"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -133,7 +132,7 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
<-certsReady
setupLog.Info("certs ready")

dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New})
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New})
if err != nil {
setupLog.Error(err, "unable to create dispatcher")
os.Exit(1)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/nodetracker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -91,8 +90,6 @@ func (r *NodeTrackerReconciler) Create(e event.CreateEvent) bool {
return false
}

logger := log.FromContext(context.Background()).WithValues("NodeTracker", klog.KObj(nodeTracker))
logger.Info("NodeTracker create event")
r.dispatcher.AddNodeTracker(nodeTracker)
return true
}
Expand All @@ -110,7 +107,11 @@ func (r *NodeTrackerReconciler) Update(e event.UpdateEvent) bool {
}

func (r *NodeTrackerReconciler) Delete(e event.DeleteEvent) bool {
obj := e.Object.(*api.NodeTracker)
obj, match := e.Object.(*api.NodeTracker)
if !match {
return false
}

r.dispatcher.DeleteNodeTracker(obj)
return true
}
Expand Down
47 changes: 0 additions & 47 deletions pkg/dispatcher/plugins/gnumber/goroutine_number.go

This file was deleted.

26 changes: 1 addition & 25 deletions test/e2e/suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -35,7 +34,6 @@ import (

api "github.com/inftyai/manta/api/v1alpha1"
"github.com/inftyai/manta/test/util"
"github.com/inftyai/manta/test/util/wrapper"
)

var cfg *rest.Config
Expand Down Expand Up @@ -78,28 +76,6 @@ var _ = AfterSuite(func() {
})

func readyForTesting(client client.Client) {
By("waiting for webhooks to ready")

// To verify that webhooks are ready, let's create a simple Replication.
replication := wrapper.MakeReplication("sample-replication").
NodeName("unknown-node").
ChunkName("chunk1").
SizeBytes(1024).
SourceOfURI("localhost:///workspace/models/modelA").
Obj()

// Once the creation succeeds, that means the webhooks are ready
// and we can begin testing.
Eventually(func() error {
return client.Create(ctx, replication)
}, util.Timeout, util.Interval).Should(Succeed())

// Delete this replication before beginning tests.
Expect(client.Delete(ctx, replication)).To(Succeed())
Eventually(func() error {
return client.Get(ctx, types.NamespacedName{Name: replication.Name}, &api.Replication{})
}).ShouldNot(Succeed())

By("waiting for nodeTrackers to ready")
Eventually(func() error {
nodeTrackers := &api.NodeTrackerList{}
Expand All @@ -110,5 +86,5 @@ func readyForTesting(client client.Client) {
return fmt.Errorf("no nodeTrackers")
}
return nil
}, util.Timeout, util.Interval).Should(Succeed())
}, util.Timeout*3, util.Interval).Should(Succeed())
}
3 changes: 1 addition & 2 deletions test/integration/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/inftyai/manta/pkg/dispatcher"
"github.com/inftyai/manta/pkg/dispatcher/framework"
"github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware"
"github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber"
"github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector"
)

Expand Down Expand Up @@ -106,7 +105,7 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New})
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New})
Expect(err).ToNot(HaveOccurred())

torrentController := controller.NewTorrentReconciler(mgr.GetClient(), mgr.GetScheme(), dispatcher)
Expand Down

0 comments on commit 31e946d

Please sign in to comment.