From b85ffbd25ce3d42329351de17d98585b254f93b4 Mon Sep 17 00:00:00 2001 From: storezhang Date: Thu, 5 Dec 2024 21:09:15 +0800 Subject: [PATCH] =?UTF-8?q?refactor(=E8=AE=A1=E5=88=92):=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E9=9D=9E=E5=9B=BA=E5=AE=9A=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/internal/core/processor.go | 15 ++++----------- internal/kernel/tasker.go | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/internal/internal/core/processor.go b/internal/internal/core/processor.go index 0f1b96c..85d1f7a 100644 --- a/internal/internal/core/processor.go +++ b/internal/internal/core/processor.go @@ -29,17 +29,10 @@ func NewProcessor(tasker kernel.Tasker, params *param.Agent) *Processor { func (p *Processor) Process(selector kernel.Selector) { for { - tasks := p.tasker.Pop(p.params.Retries) - if 0 == len(tasks) { // 让出时间切片 - time.Sleep(0) - continue - } - - for _, _task := range tasks { - if _, exists := p.progresses.Load(_task.Id()); exists { - continue - } - + _task := p.tasker.Pop(p.params.Retries) + if _, exists := p.progresses.Load(_task.Id()); exists { + time.Sleep(0) // 让出时间片 + } else { go func() { _ = p.process(_task, selector) // 错误已经处理,纯接收 }() diff --git a/internal/kernel/tasker.go b/internal/kernel/tasker.go index beb02b7..08ffd9b 100644 --- a/internal/kernel/tasker.go +++ b/internal/kernel/tasker.go @@ -22,7 +22,7 @@ type Tasker interface { Update(id uint64, status Status, runtime time.Time) error // Pop 取出任务并执行 - Pop(retries uint32) []Task + Pop(retries uint32) Task // Archive 存档 Archive(task Task) error