Skip to content

Commit

Permalink
Merge pull request #24 from kerthcet/feat/agent
Browse files Browse the repository at this point in the history
Release v0.0.2
  • Loading branch information
InftyAI-Agent authored Nov 11, 2024
2 parents 0cf38c8 + a6fac3c commit 7f05ed5
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 79 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<p align="center">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://raw.githubusercontent.com/inftyai/manta/main/docs/assets/logo.png">
<img alt="llmaz" src="https://raw.githubusercontent.com/inftyai/manta/main/docs/assets/logo.png" width="35%">
<img alt="manta" src="https://raw.githubusercontent.com/inftyai/manta/main/docs/assets/logo.png" width="35%">
</picture>
</p>

Expand Down
8 changes: 8 additions & 0 deletions agent/deploy/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ kind: ClusterRole
metadata:
name: manta-agent-role
rules:
- apiGroups:
- "manta.io"
resources:
- torrents
verbs:
- get
- list
- watch
- apiGroups:
- "manta.io"
resources:
Expand Down
7 changes: 4 additions & 3 deletions agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ spec:
mountPath: /workspace/models
containers:
- name: agent
# image: inftyai/manta-agent:v0.0.1
image: inftyai/test:manta-agent-111001
image: inftyai/manta-agent:v0.0.2
ports:
- containerPort: 9090
resources:
limits:
memory: 200Mi
cpu: 1
memory: 2Gi
requests:
cpu: 100m
memory: 200Mi
env:
- name: NODE_NAME
valueFrom:
Expand Down
26 changes: 18 additions & 8 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// torrentName shouldn't be empty here, but let's ignore this case, it will not
// harm the happy path.
if torrentName, ok := replication.Labels[api.TorrentNameLabelKey]; ok {
torrent := api.Torrent{}
if err := r.Get(ctx, types.NamespacedName{Name: torrentName}, &torrent); err != nil {
// Once torrent not found, ignore the replication then.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

// Filter out unrelated events.
if replication.Spec.NodeName != NODE_NAME ||
replicationReady(replication) ||
Expand All @@ -73,7 +83,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)

logger.Info("Reconcile replication", "Replication", klog.KObj(replication))

conditionType := api.DownloadConditionType
conditionType := api.ReplicateConditionType
if replication.Spec.Destination == nil {
conditionType = api.ReclaimingConditionType
}
Expand All @@ -82,7 +92,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
if err := handler.HandleReplication(ctx, replication); err != nil {
if err := handler.HandleReplication(ctx, r.Client, replication); err != nil {
logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, err
} else {
Expand Down Expand Up @@ -136,19 +146,19 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati
func (r *ReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.Replication{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
Complete(r)
}

func setReplicationCondition(replication *api.Replication, conditionType string) (changed bool) {
if conditionType == api.DownloadConditionType {
if conditionType == api.ReplicateConditionType {
condition := metav1.Condition{
Type: conditionType,
Status: metav1.ConditionTrue,
Reason: "Downloading",
Message: "Downloading chunks",
Reason: "Replicating",
Message: "Replicating chunks",
}
replication.Status.Phase = ptr.To[string](api.DownloadConditionType)
replication.Status.Phase = ptr.To[string](api.ReplicateConditionType)
return apimeta.SetStatusCondition(&replication.Status.Conditions, condition)
}

Expand All @@ -157,7 +167,7 @@ func setReplicationCondition(replication *api.Replication, conditionType string)
Type: conditionType,
Status: metav1.ConditionTrue,
Reason: "Ready",
Message: "Download chunks successfully",
Message: "Chunks replicated successfully",
}
replication.Status.Phase = ptr.To[string](api.ReadyConditionType)
return apimeta.SetStatusCondition(&replication.Status.Conditions, condition)
Expand Down
11 changes: 8 additions & 3 deletions agent/pkg/handler/chunk_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"io"
"net/http"
"os"
"path/filepath"

"github.com/inftyai/manta/api"
)

const (
buffSize = 4 * 1024 * 1024 // 4MB buffer
buffSize = 10 * 1024 * 1024 // 10MB buffer
)

// SendChunk will send the chunk content via http request.
Expand Down Expand Up @@ -71,8 +72,8 @@ func SendChunk(w http.ResponseWriter, r *http.Request) {
}
}

func recvChunk(blobPath, snapshotPath, peerName string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", peerName, api.HttpPort, blobPath)
func recvChunk(blobPath, snapshotPath, ipAddr string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", ipAddr, api.HttpPort, blobPath)

resp, err := http.Get(url)
if err != nil {
Expand All @@ -82,6 +83,10 @@ func recvChunk(blobPath, snapshotPath, peerName string) error {
_ = resp.Body.Close()
}()

if err := os.MkdirAll(filepath.Dir(blobPath), os.ModePerm); err != nil {
return err
}

// Use the same path for different peers.
file, err := os.Create(blobPath)
if err != nil {
Expand Down
37 changes: 32 additions & 5 deletions agent/pkg/handler/replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ import (
"path/filepath"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

api "github.com/inftyai/manta/api/v1alpha1"
)

// This only happens when replication not ready.
func HandleReplication(ctx context.Context, replication *api.Replication) error {
func HandleReplication(ctx context.Context, client client.Client, replication *api.Replication) error {
// If destination is nil, the address must not be localhost.
if replication.Spec.Destination == nil {
return deleteChunk(ctx, replication)
Expand All @@ -44,7 +47,7 @@ func HandleReplication(ctx context.Context, replication *api.Replication) error
host, _ := parseURI(*replication.Spec.Source.URI)
// TODO: handel other uris.
if host != api.URI_LOCALHOST {
return syncChunk(ctx, replication)
return syncChunk(ctx, client, replication)
}
// TODO: handle uri with object store.
}
Expand Down Expand Up @@ -92,7 +95,7 @@ func downloadChunk(ctx context.Context, replication *api.Replication) error {
return nil
}

func syncChunk(ctx context.Context, replication *api.Replication) error {
func syncChunk(ctx context.Context, client client.Client, replication *api.Replication) error {
logger := log.FromContext(ctx)

logger.Info("start to sync chunks", "Replication", klog.KObj(replication))
Expand All @@ -101,10 +104,15 @@ func syncChunk(ctx context.Context, replication *api.Replication) error {
sourceSplits := strings.Split(*replication.Spec.Source.URI, "://")
addresses := strings.Split(sourceSplits[1], "@")
nodeName, blobPath := addresses[0], addresses[1]
nodeIP, err := nodeIP(ctx, client, nodeName)
if err != nil {
return err
}

// The destination URI looks like localhost://<path-to-your-file>
destSplits := strings.Split(*replication.Spec.Destination.URI, "://")

if err := recvChunk(blobPath, destSplits[1], nodeName); err != nil {
if err := recvChunk(blobPath, destSplits[1], nodeIP); err != nil {
logger.Error(err, "failed to sync chunk")
return err
}
Expand All @@ -124,9 +132,15 @@ func deleteChunk(ctx context.Context, replication *api.Replication) error {
}

// local(real) file looks like: /workspace/models/Qwen--Qwen2-0.5B-Instruct-GGUF/blobs/8b08b8632419bd6d7369362945b5976c7f47b1c1--0001
// target file locates at /workspace/models/Qwen--Qwen2-0.5B-Instruct-GGUF/snapshots/main/qwen2-0_5b-instruct-q5_k_m.gguf
// target file looks like /workspace/models/Qwen--Qwen2-0.5B-Instruct-GGUF/snapshots/main/qwen2-0_5b-instruct-q5_k_m.gguf
// the symlink of target file looks like ../../blobs/8b08b8632419bd6d7369362945b5976c7f47b1c1--0001
func createSymlink(localPath, targetPath string) error {
// This could happen like force delete a Torrent but downloading is still on the way,
// then the blob file is deleted, in this situation, we should not create the symlink.
if _, err := os.Stat(localPath); err != nil {
return err
}

dir := filepath.Dir(targetPath)
err := os.MkdirAll(dir, 0755)
if err != nil {
Expand Down Expand Up @@ -181,3 +195,16 @@ func parseURI(uri string) (host string, address string) {
splits := strings.Split(uri, "://")
return splits[0], splits[1]
}

func nodeIP(ctx context.Context, client client.Client, nodeName string) (string, error) {
node := corev1.Node{}
if err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
return "", err
}
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" {
return address.Address, nil
}
}
return "", fmt.Errorf("can't get node internal IP")
}
4 changes: 2 additions & 2 deletions agent/pkg/handler/replication_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestHandleReplication(t *testing.T) {
SourceOfHub("Huggingface", "Qwen/Qwen2.5-72B-Instruct", "main", "LICENSE").
DestinationOfURI("localhost://../../../tmp/replication/models/Qwen--Qwen2.5-72B-Instruct/blobs/LICENSE-chunk").
Obj()
if err := HandleReplication(ctx, toCreateReplication); err != nil {
if err := HandleReplication(ctx, nil, toCreateReplication); err != nil {
t.Errorf("failed to handle Replication: %v", err)
}

Expand All @@ -58,7 +58,7 @@ func TestHandleReplication(t *testing.T) {
toDeleteReplication := wrapper.MakeReplication("replication").
SourceOfURI("localhost://../../../tmp/replication/models/Qwen--Qwen2.5-72B-Instruct/snapshots/main/LICENSE").
Obj()
if err := HandleReplication(ctx, toDeleteReplication); err != nil {
if err := HandleReplication(ctx, nil, toDeleteReplication); err != nil {
t.Errorf("failed to handle Replication: %v", err)
}

Expand Down
6 changes: 3 additions & 3 deletions api/v1alpha1/torrent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ type RepoStatus struct {
const (
// PendingConditionType represents the Torrent is Pending.
PendingConditionType = "Pending"
// DownloadConditionType represents the Torrent is under downloading.
DownloadConditionType = "Downloading"
// ReadyConditionType represents the Torrent is downloaded successfully.
// ReplicateConditionType represents the Torrent is under replicating, downloading or syncing.
ReplicateConditionType = "Replicating"
// ReadyConditionType represents the Torrent is replicated successfully.
ReadyConditionType = "Ready"
// ReclaimingConditionType represents the Torrent is removing chunks.
ReclaimingConditionType = "Reclaiming"
Expand Down
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: inftyai/manta
newTag: "111001"
newName: inftyai/test
newTag: manta-111113
2 changes: 1 addition & 1 deletion config/samples/_v1alpha1_torrent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ spec:
hub:
repoID: Qwen/Qwen2.5-0.5B
# With one file.
# modelID: Qwen/Qwen2-0.5B-Instruct-GGUF
# repoID: Qwen/Qwen2-0.5B-Instruct-GGUF
# filename: qwen2-0_5b-instruct-q5_k_m.gguf
4 changes: 2 additions & 2 deletions docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
### Install

```cmd
VERSION=v0.0.1
VERSION=v0.0.2
kubectl apply --server-side -f https://github.com/inftyai/manta/releases/download/$VERSION/manifests.yaml
```

### Uninstall

```cmd
VERSION=v0.0.1
VERSION=v0.0.2
kubectl delete -f https://github.com/inftyai/manta/releases/download/$VERSION/manifests.yaml
```

Expand Down
31 changes: 17 additions & 14 deletions pkg/controller/torrent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

logger.Info("reconcile Torrent")

// Only delete chunks when torrent ready, or will lead to unexpected behaviors.
// We may change this in the future.
// TODO: delete torrent at anytime.
if torrentReady(torrent) && torrentDeleting(torrent) {
logger.Info("start to handle torrent deletion")

Expand Down Expand Up @@ -223,11 +222,12 @@ func (r *TorrentReconciler) handleDispatcher(ctx context.Context, torrent *api.T
condition := metav1.Condition{
Type: api.ReadyConditionType,
Status: metav1.ConditionTrue,
Reason: "Replicated",
Message: "All chunks are replicated",
Reason: "Ready",
Message: "All chunks are replicated already",
}
if setTorrentConditionTo(torrent, condition) {
return false, r.Status().Update(ctx, torrent)
}
statusChanged = statusChanged || setTorrentConditionTo(torrent, condition)
return statusChanged, nil
}

for _, rep := range replications {
Expand Down Expand Up @@ -309,7 +309,6 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (
return setTorrentConditionTo(torrent, condition)
}

// TODO: once we support delete torrent in unready state, we should change this.
if torrentReady(torrent) && torrentDeleting(torrent) {
condition := metav1.Condition{
Type: api.ReclaimingConditionType,
Expand All @@ -320,22 +319,26 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (
return setTorrentConditionTo(torrent, condition)
}

if apimeta.IsStatusConditionTrue(torrent.Status.Conditions, api.DownloadConditionType) && replicationsReady(replications) {
if torrentReady(torrent) {
return false
}

if apimeta.IsStatusConditionTrue(torrent.Status.Conditions, api.ReplicateConditionType) && replicationsReady(replications) {
condition := metav1.Condition{
Type: api.ReadyConditionType,
Status: metav1.ConditionTrue,
Reason: "Ready",
Message: "Download chunks successfully",
Message: "Chunks replicated successfully",
}
return setTorrentConditionTo(torrent, condition)
}

if torrentDownloading(replications) {
condition := metav1.Condition{
Type: api.DownloadConditionType,
Type: api.ReplicateConditionType,
Status: metav1.ConditionTrue,
Reason: "Downloading",
Message: "Downloading chunks",
Reason: "Replicating",
Message: "Replicating chunks",
}
return setTorrentConditionTo(torrent, condition)
}
Expand All @@ -350,8 +353,8 @@ func setTorrentConditionTo(torrent *api.Torrent, condition metav1.Condition) (ch

func torrentDownloading(replications []api.Replication) bool {
for _, replication := range replications {
// If one replication is in downloading, then yes.
if apimeta.IsStatusConditionTrue(replication.Status.Conditions, api.DownloadConditionType) {
// If one replication is in replicating, then yes.
if apimeta.IsStatusConditionTrue(replication.Status.Conditions, api.ReplicateConditionType) {
return true
}
}
Expand Down
Loading

0 comments on commit 7f05ed5

Please sign in to comment.