Skip to content

Commit

Permalink
Fix: check if task is already in worker loop in TaskScheduler::Schedu…
Browse files Browse the repository at this point in the history
…leFragment.
  • Loading branch information
small-turtle-1 committed Dec 29, 2023
1 parent dacf253 commit d864ea3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 33 deletions.
19 changes: 5 additions & 14 deletions src/scheduler/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,23 +461,14 @@ void FragmentContext::TryFinishFragment() {
return;
}
auto *parent_fragment_ctx = parent_plan_fragment->GetContext();
auto *scheduler = query_context_->scheduler();
if (parent_fragment_ctx->TryStartFragment()) {
// All child fragment are finished.
scheduler->ScheduleFragment(parent_plan_fragment);
return;
}

if (parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) {
if (!parent_fragment_ctx->TryStartFragment() && parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) {
return;
}
Vector<FragmentTask *> task_ptrs;
for (auto &task : parent_fragment_ctx->Tasks()) {
if (task->TryIntoWorkerLoop()) {
task_ptrs.emplace_back(task.get());
}
}
scheduler->ScheduleTasks(task_ptrs);
// All child fragment are finished.
auto *scheduler = query_context_->scheduler();
scheduler->ScheduleFragment(parent_plan_fragment);
return;
}

Vector<PhysicalOperator *> &FragmentContext::GetOperators() { return fragment_ptr_->GetOperators(); }
Expand Down
25 changes: 10 additions & 15 deletions src/scheduler/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ void TaskScheduler::Schedule(PlanFragment *plan_fragment) {
}

Vector<PlanFragment *> fragments = GetStartFragments(plan_fragment);
u64 worker_id = 0;
last_cpu_id_ = 0; // FIXME
u64 &worker_id = last_cpu_id_;
for (auto *plan_fragment : fragments) {
auto &tasks = plan_fragment->GetContext()->Tasks();
for (auto &task : tasks) {
Expand All @@ -110,27 +111,19 @@ void TaskScheduler::ScheduleFragment(PlanFragment *plan_fragment) {
Vector<FragmentTask *> task_ptrs;
auto &tasks = plan_fragment->GetContext()->Tasks();
for (auto &task : tasks) {
task_ptrs.emplace_back(task.get());
if (task->TryIntoWorkerLoop()) {
task_ptrs.emplace_back(task.get());
}
}
u64 worker_id = 0;

u64 &worker_id = last_cpu_id_;
for (auto *task_ptr : task_ptrs) {
ScheduleTask(task_ptr, worker_id);
++worker_id;
worker_id %= worker_count_;
}
}

void TaskScheduler::ScheduleTasks(Vector<FragmentTask *> fragment_tasks) {
// u64 worker_id = 0;
for (auto *task_ptr : fragment_tasks) {
i64 last_worker_id = task_ptr->LastWorkerID();
if (last_worker_id == -1) {
Error<SchedulerException>("Task is not scheduled before");
}
ScheduleTask(task_ptr, last_worker_id);
}
}

void TaskScheduler::ScheduleTask(FragmentTask *task, u64 worker_id) {
task->set_status(FragmentTaskStatus::kRunning);
++worker_workloads_[worker_id];
Expand All @@ -148,7 +141,9 @@ void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id
} else {
task_queue->TryDequeueBulk(dequeue_output);
}
task_lists.insert(task_lists.end(), dequeue_output.begin(), dequeue_output.end());
if (!dequeue_output.empty()) {
task_lists.insert(task_lists.end(), dequeue_output.begin(), dequeue_output.end());
}
if (iter == task_lists.end()) {
iter = task_lists.begin();
}
Expand Down
8 changes: 4 additions & 4 deletions src/scheduler/task_scheduler.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ public:

void UnInit();

void Schedule(PlanFragment * plan_fragment);
// Schedule start fragments
void Schedule(PlanFragment * plan_fragment_root);

void ScheduleFragment(PlanFragment * plan_fragment);

void ScheduleTasks(Vector<FragmentTask *> fragment_tasks);
// `plan_fragment` can be scheduled because all of its dependencies are met.
void ScheduleFragment(PlanFragment *plan_fragment);

private:
Vector<PlanFragment *> GetStartFragments(PlanFragment* plan_fragment);
Expand Down

0 comments on commit d864ea3

Please sign in to comment.