diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index 16a578249f..6920268e43 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -461,23 +461,14 @@ void FragmentContext::TryFinishFragment() { return; } auto *parent_fragment_ctx = parent_plan_fragment->GetContext(); - auto *scheduler = query_context_->scheduler(); - if (parent_fragment_ctx->TryStartFragment()) { - // All child fragment are finished. - scheduler->ScheduleFragment(parent_plan_fragment); - return; - } - if (parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) { + if (!parent_fragment_ctx->TryStartFragment() && parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) { return; } - Vector task_ptrs; - for (auto &task : parent_fragment_ctx->Tasks()) { - if (task->TryIntoWorkerLoop()) { - task_ptrs.emplace_back(task.get()); - } - } - scheduler->ScheduleTasks(task_ptrs); + // All child fragment are finished. + auto *scheduler = query_context_->scheduler(); + scheduler->ScheduleFragment(parent_plan_fragment); + return; } Vector &FragmentContext::GetOperators() { return fragment_ptr_->GetOperators(); } diff --git a/src/scheduler/task_scheduler.cpp b/src/scheduler/task_scheduler.cpp index f1e2d40395..61d403bacc 100644 --- a/src/scheduler/task_scheduler.cpp +++ b/src/scheduler/task_scheduler.cpp @@ -95,7 +95,8 @@ void TaskScheduler::Schedule(PlanFragment *plan_fragment) { } Vector fragments = GetStartFragments(plan_fragment); - u64 worker_id = 0; + last_cpu_id_ = 0; // FIXME + u64 &worker_id = last_cpu_id_; for (auto *plan_fragment : fragments) { auto &tasks = plan_fragment->GetContext()->Tasks(); for (auto &task : tasks) { @@ -110,9 +111,12 @@ void TaskScheduler::ScheduleFragment(PlanFragment *plan_fragment) { Vector task_ptrs; auto &tasks = plan_fragment->GetContext()->Tasks(); for (auto &task : tasks) { - task_ptrs.emplace_back(task.get()); + if (task->TryIntoWorkerLoop()) { + task_ptrs.emplace_back(task.get()); + } } - u64 worker_id = 0; + + u64 &worker_id = last_cpu_id_; for (auto *task_ptr : task_ptrs) { ScheduleTask(task_ptr, worker_id); ++worker_id; @@ -120,17 +124,6 @@ void TaskScheduler::ScheduleFragment(PlanFragment *plan_fragment) { } } -void TaskScheduler::ScheduleTasks(Vector fragment_tasks) { - // u64 worker_id = 0; - for (auto *task_ptr : fragment_tasks) { - i64 last_worker_id = task_ptr->LastWorkerID(); - if (last_worker_id == -1) { - Error("Task is not scheduled before"); - } - ScheduleTask(task_ptr, last_worker_id); - } -} - void TaskScheduler::ScheduleTask(FragmentTask *task, u64 worker_id) { task->set_status(FragmentTaskStatus::kRunning); ++worker_workloads_[worker_id]; @@ -148,7 +141,9 @@ void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id } else { task_queue->TryDequeueBulk(dequeue_output); } - task_lists.insert(task_lists.end(), dequeue_output.begin(), dequeue_output.end()); + if (!dequeue_output.empty()) { + task_lists.insert(task_lists.end(), dequeue_output.begin(), dequeue_output.end()); + } if (iter == task_lists.end()) { iter = task_lists.begin(); } diff --git a/src/scheduler/task_scheduler.cppm b/src/scheduler/task_scheduler.cppm index 6733d92caf..57c281b1f4 100644 --- a/src/scheduler/task_scheduler.cppm +++ b/src/scheduler/task_scheduler.cppm @@ -46,11 +46,11 @@ public: void UnInit(); - void Schedule(PlanFragment * plan_fragment); + // Schedule start fragments + void Schedule(PlanFragment * plan_fragment_root); - void ScheduleFragment(PlanFragment * plan_fragment); - - void ScheduleTasks(Vector fragment_tasks); + // `plan_fragment` can be scheduled because all of its dependencies are met. + void ScheduleFragment(PlanFragment *plan_fragment); private: Vector GetStartFragments(PlanFragment* plan_fragment);