Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet committed Nov 9, 2024
1 parent 6cb257f commit f34403a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
containers:
- name: agent
# image: inftyai/manta-agent:v0.0.1
image: inftyai/test:manta-agent-110810
image: inftyai/test:manta-agent-110811
ports:
- containerPort: 9090
resources:
Expand Down
12 changes: 9 additions & 3 deletions agent/pkg/handler/chunk_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func SendChunk(w http.ResponseWriter, r *http.Request) {
http.Error(w, "File not found", http.StatusNotFound)
return
}
defer file.Close()
defer func() {
_ = file.Close()
}()

buffer := make([]byte, buffSize)
for {
Expand Down Expand Up @@ -76,14 +78,18 @@ func recvChunk(blobPath, snapshotPath, peerName string) error {
if err != nil {
return err
}
defer resp.Body.Close()
defer func() {
_ = resp.Body.Close()
}()

// Use the same path for different peers.
file, err := os.Create(blobPath)
if err != nil {
return err
}
defer file.Close()
defer func() {
_ = file.Close()
}()

_, err = io.Copy(file, resp.Body)
if err != nil {
Expand Down
24 changes: 10 additions & 14 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ type Dispatcher struct {
}

func NewDispatcher(plugins []framework.RegisterFunc) (*Dispatcher, error) {

dispatcher := &Dispatcher{
cache: cache.NewCache(),
}

dispatcher.RegisterPlugins(plugins)
if err := dispatcher.RegisterPlugins(plugins); err != nil {
return nil, err
}
return dispatcher, nil
}

Expand All @@ -67,7 +67,7 @@ func (d *Dispatcher) snapshot() *cache.Cache {
// or we have to introduce file lock when downloading chunks.
func (d *Dispatcher) PrepareReplications(ctx context.Context, torrent *api.Torrent, nodeTrackers []api.NodeTracker) (replications []*api.Replication, torrentStatusChanged bool, firstTime bool, err error) {
if torrent.Status.Repo == nil {
return nil, false, false, nil
return nil, false, false, fmt.Errorf("repo is nil")
}

logger := log.FromContext(ctx)
Expand Down Expand Up @@ -95,11 +95,7 @@ func (d *Dispatcher) PrepareReplications(ctx context.Context, torrent *api.Torre
}

if d.cache.ChunkExist(chunk.Name) {
newReplications, err := d.schedulingSyncChunk(ctx, torrent, chunk, nodeTrackers, cache)
if err != nil {
logger.Error(err, "failed to dispatch chunk for syncing", "chunk", chunk.Name)
return nil, false, false, err
}
newReplications := d.schedulingSyncChunk(ctx, torrent, chunk, nodeTrackers, cache)
replications = append(replications, newReplications...)
} else {
newReplications, err := d.schedulingDownloadChunk(ctx, torrent, chunk, nodeTrackers, cache)
Expand Down Expand Up @@ -187,7 +183,7 @@ func (d *Dispatcher) schedulingDownloadChunk(ctx context.Context, torrent *api.T
return
}

func (d *Dispatcher) schedulingSyncChunk(ctx context.Context, torrent *api.Torrent, chunk framework.ChunkInfo, nodeTrackers []api.NodeTracker, cache *cache.Cache) (replications []*api.Replication, err error) {
func (d *Dispatcher) schedulingSyncChunk(ctx context.Context, torrent *api.Torrent, chunk framework.ChunkInfo, nodeTrackers []api.NodeTracker, cache *cache.Cache) (replications []*api.Replication) {
logger := log.FromContext(ctx).WithValues("chunk", chunk.Name)
logger.Info("start to schedule sync chunk")

Expand Down Expand Up @@ -218,14 +214,14 @@ func (d *Dispatcher) schedulingSyncChunk(ctx context.Context, torrent *api.Torre
continue
}

totalCandidates = append(totalCandidates, framework.ScoreCandidate{SourceNodeName: &nodeName, CandidateNodeName: candidate.Node.Name, Score: candidate.Score})
totalCandidates = append(totalCandidates, framework.ScoreCandidate{SourceNodeName: nodeName, CandidateNodeName: candidate.Node.Name, Score: candidate.Score})
}
}

// We have enough replicated nodes.
if replicas <= 0 {
logger.V(1).Info("Have enough replicas, no need to sync anymore")
return
return nil
}

if len(totalCandidates) > int(replicas) {
Expand All @@ -237,7 +233,7 @@ func (d *Dispatcher) schedulingSyncChunk(ctx context.Context, torrent *api.Torre
}

for _, candidate := range totalCandidates {
replica := buildSyncReplication(torrent, chunk, *candidate.SourceNodeName, candidate.CandidateNodeName)
replica := buildSyncReplication(torrent, chunk, candidate.SourceNodeName, candidate.CandidateNodeName)
replications = append(replications, replica)

// Make sure the snapshot cache is always updated.
Expand All @@ -246,7 +242,7 @@ func (d *Dispatcher) schedulingSyncChunk(ctx context.Context, torrent *api.Torre
}, candidate.CandidateNodeName)
}

return
return replications
}

func (d *Dispatcher) UpdateNodeTracker(old *api.NodeTracker, new *api.NodeTracker) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dispatcher/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type Candidate struct {
// ScoreCandidate will be used after Score extension point for picking best effort nodes.
type ScoreCandidate struct {
// SourceNodeName represents the the source node name in syncing tasks.
// It's nil once in downloading tasks.
SourceNodeName *string
// It's empty once in downloading tasks.
SourceNodeName string
// CandidateNodeName represents the target node name.
CandidateNodeName string
// Score for candidate node.
Expand Down

0 comments on commit f34403a

Please sign in to comment.