From ada746cfa22f37ead2edcb8dfe857a3371951736 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Fri, 29 Dec 2023 11:28:43 +0800 Subject: [PATCH] Add scheduler (#395) * tmp schedule strategy. * Add scheduler when fragment task finished. * Prevent stream operator dead loop. * Fix bug of not finishing. * Replace create index source and sink operator. * Fix bug that source operator of stream fragment is not kQueue. * Fmt. * Fix: check if task is already in worker loop in TaskScheduler::ScheduleFragment. * Feat: add simple cpu affinity. --- src/common/blocking_queue.cppm | 10 + src/common/stl.cppm | 3 + src/executor/fragment/plan_fragment.cppm | 2 + .../operator/physical_create_index_finish.cpp | 9 +- src/executor/operator/physical_sink.cpp | 10 + src/executor/operator/physical_source.cpp | 9 - src/executor/operator/physical_source.cppm | 2 - src/executor/operator_state.cpp | 10 - src/executor/operator_state.cppm | 3 +- src/executor/physical_operator_type.cppm | 2 +- src/main/query_context.cpp | 5 +- src/scheduler/fragment_context.cpp | 70 +++-- src/scheduler/fragment_context.cppm | 52 ++-- src/scheduler/fragment_task.cpp | 51 +++- src/scheduler/fragment_task.cppm | 64 ++--- src/scheduler/task_scheduler.cpp | 255 +++++++++--------- src/scheduler/task_scheduler.cppm | 31 ++- src/unit_test/test_helper/sql_runner.cpp | 5 +- 18 files changed, 306 insertions(+), 287 deletions(-) diff --git a/src/common/blocking_queue.cppm b/src/common/blocking_queue.cppm index 7cfaf0eb68..40f4c3846d 100644 --- a/src/common/blocking_queue.cppm +++ b/src/common/blocking_queue.cppm @@ -102,6 +102,16 @@ public: full_cv_.notify_one(); } + bool TryDequeueBulk(Vector &output_array) { + UniqueLock lock(queue_mutex_); + if (queue_.empty()) { + return false; + } + output_array.insert(output_array.end(), queue_.begin(), queue_.end()); + queue_.clear(); + full_cv_.notify_one(); + } + [[nodiscard]] SizeT Size() const { LockGuard lock(queue_mutex_); return queue_.size(); diff --git a/src/common/stl.cppm b/src/common/stl.cppm index 616d4c339f..ffa45609a0 100644 --- a/src/common/stl.cppm +++ b/src/common/stl.cppm @@ -94,6 +94,9 @@ export { template > using HashMap = std::unordered_map; + template > + using MultiHashMap = std::unordered_multimap; + template using HashSet = std::unordered_set; diff --git a/src/executor/fragment/plan_fragment.cppm b/src/executor/fragment/plan_fragment.cppm index 374766f9e3..bc584f5330 100644 --- a/src/executor/fragment/plan_fragment.cppm +++ b/src/executor/fragment/plan_fragment.cppm @@ -57,6 +57,8 @@ public: [[nodiscard]] inline PhysicalSink *GetSinkNode() const { return sink_.get(); } + [[nodiscard]] inline PlanFragment *GetParent() const { return parent_; } + inline void AddChild(UniquePtr child_fragment) { child_fragment->parent_ = this; children_.emplace_back(Move(child_fragment)); diff --git a/src/executor/operator/physical_create_index_finish.cpp b/src/executor/operator/physical_create_index_finish.cpp index 81039e3f0f..4663741a89 100644 --- a/src/executor/operator/physical_create_index_finish.cpp +++ b/src/executor/operator/physical_create_index_finish.cpp @@ -45,13 +45,10 @@ bool PhysicalCreateIndexFinish::Execute(QueryContext *query_context, OperatorSta auto *txn = query_context->GetTxn(); auto *create_index_finish_op_state = static_cast(operator_state); - if (create_index_finish_op_state->input_complete_) { - txn->AddWalCmd(MakeShared(*db_name_, *table_name_, index_def_)); + txn->AddWalCmd(MakeShared(*db_name_, *table_name_, index_def_)); - operator_state->SetComplete(); - return true; - } - return false; + operator_state->SetComplete(); + return true; } } // namespace infinity \ No newline at end of file diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index f753d2caad..113af3d041 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -338,6 +338,16 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MessageSinkState *message_ message_sink_state->message_ = Move(insert_output_state->result_msg_); break; } + case PhysicalOperatorType::kCreateIndexPrepare: { + auto *create_index_prepare_output_state = static_cast(task_operator_state); + message_sink_state->message_ = Move(create_index_prepare_output_state->result_msg_); + break; + } + case PhysicalOperatorType::kCreateIndexDo: { + auto *create_index_do_output_state = static_cast(task_operator_state); + message_sink_state->message_ = Move(create_index_do_output_state->result_msg_); + break; + } default: { Error(Format("{} isn't supported here.", PhysicalOperatorToString(task_operator_state->operator_type_))); break; diff --git a/src/executor/operator/physical_source.cpp b/src/executor/operator/physical_source.cpp index fe00a42e59..08383a3e9d 100644 --- a/src/executor/operator/physical_source.cpp +++ b/src/executor/operator/physical_source.cpp @@ -57,13 +57,4 @@ bool PhysicalSource::Execute(QueryContext *, SourceState *source_state) { return true; } -bool PhysicalSource::ReadyToExec(SourceState *source_state) { - bool result = true; - if (source_state->state_type_ == SourceStateType::kQueue) { - QueueSourceState *queue_source_state = static_cast(source_state); - result = queue_source_state->source_queue_.Size() > 0; - } - return result; -} - } // namespace infinity diff --git a/src/executor/operator/physical_source.cppm b/src/executor/operator/physical_source.cppm index 7b85afc5eb..648ce5b468 100644 --- a/src/executor/operator/physical_source.cppm +++ b/src/executor/operator/physical_source.cppm @@ -53,8 +53,6 @@ public: bool Execute(QueryContext *query_context, SourceState *source_state); - bool ReadyToExec(SourceState *source_state); - inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index 7f8967dabd..f2ee0e3602 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -92,16 +92,6 @@ bool QueueSourceState::GetData() { fusion_op_state->input_complete_ = completed; break; } - case PhysicalOperatorType::kCreateIndexDo: { - auto *create_index_do_op_state = static_cast(next_op_state); - create_index_do_op_state->input_complete_ = completed; - break; - } - case PhysicalOperatorType::kCreateIndexFinish: { - auto *create_index_finish_op_state = static_cast(next_op_state); - create_index_finish_op_state->input_complete_ = completed; - break; - } case PhysicalOperatorType::kMergeLimit: { auto *fragment_data = static_cast(fragment_data_base.get()); MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index a5b2515027..bcd90799dc 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -245,14 +245,13 @@ export struct CreateIndexPrepareOperatorState : public OperatorState { export struct CreateIndexDoOperatorState : public OperatorState { inline explicit CreateIndexDoOperatorState() : OperatorState(PhysicalOperatorType::kCreateIndexDo) {} - bool input_complete_ = false; + UniquePtr result_msg_{}; CreateIndexSharedData *create_index_shared_data_; }; export struct CreateIndexFinishOperatorState : public OperatorState { inline explicit CreateIndexFinishOperatorState() : OperatorState(PhysicalOperatorType::kCreateIndexFinish) {} - bool input_complete_ = false; UniquePtr error_message_{}; }; diff --git a/src/executor/physical_operator_type.cppm b/src/executor/physical_operator_type.cppm index e9be8c31ed..a974a57d1d 100644 --- a/src/executor/physical_operator_type.cppm +++ b/src/executor/physical_operator_type.cppm @@ -70,7 +70,6 @@ export enum class PhysicalOperatorType : i8 { kInsert, kImport, kExport, - kCreateIndexDo, // DDL kAlter, @@ -86,6 +85,7 @@ export enum class PhysicalOperatorType : i8 { kDropView, kCreateIndexPrepare, + kCreateIndexDo, kCreateIndexFinish, // misc diff --git a/src/main/query_context.cpp b/src/main/query_context.cpp index 7b90aafe6f..fc950e7b15 100644 --- a/src/main/query_context.cpp +++ b/src/main/query_context.cpp @@ -141,12 +141,11 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) { StopProfile(QueryPhase::kPipelineBuild); StartProfile(QueryPhase::kTaskBuild); - Vector tasks; - FragmentContext::BuildTask(this, nullptr, plan_fragment.get(), tasks); + FragmentContext::BuildTask(this, nullptr, plan_fragment.get()); StopProfile(QueryPhase::kTaskBuild); StartProfile(QueryPhase::kExecution); - scheduler_->Schedule(this, tasks, plan_fragment.get()); + scheduler_->Schedule(plan_fragment.get()); query_result.result_table_ = plan_fragment->GetResult(); query_result.root_operator_type_ = logical_plan->operator_type(); StopProfile(QueryPhase::kExecution); diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index f0c890723c..6920268e43 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -47,7 +47,7 @@ import physical_merge_knn; import merge_knn_data; import create_index_data; import logger; - +import task_scheduler; import plan_fragment; module fragment_context; @@ -326,10 +326,7 @@ void CollectTasks(Vector> &result, PlanFragment *fragment_ptr) } } -void FragmentContext::BuildTask(QueryContext *query_context, - FragmentContext *parent_context, - PlanFragment *fragment_ptr, - Vector &task_array) { +void FragmentContext::BuildTask(QueryContext *query_context, FragmentContext *parent_context, PlanFragment *fragment_ptr) { Vector &fragment_operators = fragment_ptr->GetOperators(); i64 operator_count = fragment_operators.size(); if (operator_count < 1) { @@ -419,7 +416,7 @@ void FragmentContext::BuildTask(QueryContext *query_context, if (fragment_ptr->HasChild()) { // current fragment have children for (const auto &child_fragment : fragment_ptr->Children()) { - FragmentContext::BuildTask(query_context, fragment_context.get(), child_fragment.get(), task_array); + FragmentContext::BuildTask(query_context, fragment_context.get(), child_fragment.get()); } } switch (fragment_operators[0]->operator_type()) { @@ -430,7 +427,6 @@ void FragmentContext::BuildTask(QueryContext *query_context, if (explain_op->explain_type() == ExplainType::kPipeline) { CollectTasks(result, fragment_ptr->Children()[0].get()); explain_op->SetExplainTaskText(MakeShared>>(result)); - task_array.clear(); break; } } @@ -438,26 +434,41 @@ void FragmentContext::BuildTask(QueryContext *query_context, break; } - for (const auto &task : tasks) { - task_array.emplace_back(task.get()); - } - fragment_ptr->SetContext(Move(fragment_context)); } FragmentContext::FragmentContext(PlanFragment *fragment_ptr, QueryContext *query_context) - : fragment_ptr_(fragment_ptr), fragment_type_(fragment_ptr->GetFragmentType()), query_context_(query_context){}; + : fragment_ptr_(fragment_ptr), query_context_(query_context), fragment_type_(fragment_ptr->GetFragmentType()), + fragment_status_(FragmentStatus::kNotStart), unfinished_child_n_(fragment_ptr->Children().size()) {} -void FragmentContext::FinishTask() { - u64 unfinished_task = task_n_.fetch_sub(1); - auto sink_op = GetSinkOperator(); +void FragmentContext::TryFinishFragment() { + if (!TryFinishFragmentInner()) { + LOG_TRACE(Format("{} tasks in fragment are not completed: {} are not completed", unfinished_task_n_.load(), fragment_ptr_->FragmentID())); + return; + } + LOG_TRACE(Format("All tasks in fragment: {} are completed", fragment_ptr_->FragmentID())); + fragment_status_ = FragmentStatus::kFinish; - if (unfinished_task == 1 && sink_op->sink_type() == SinkType::kResult) { - LOG_TRACE(Format("All tasks in fragment: {} are completed", fragment_ptr_->FragmentID())); + auto *sink_op = GetSinkOperator(); + if (sink_op->sink_type() == SinkType::kResult) { Complete(); - } else { - LOG_TRACE(Format("Not all tasks in fragment: {} are completed", fragment_ptr_->FragmentID())); + return; + } + + // Try to schedule parent fragment + auto *parent_plan_fragment = fragment_ptr_->GetParent(); + if (parent_plan_fragment == nullptr) { + return; } + auto *parent_fragment_ctx = parent_plan_fragment->GetContext(); + + if (!parent_fragment_ctx->TryStartFragment() && parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) { + return; + } + // All child fragment are finished. + auto *scheduler = query_context_->scheduler(); + scheduler->ScheduleFragment(parent_plan_fragment); + return; } Vector &FragmentContext::GetOperators() { return fragment_ptr_->GetOperators(); } @@ -561,8 +572,7 @@ void FragmentContext::MakeSourceState(i64 parallel_count) { case PhysicalOperatorType::kMergeTop: case PhysicalOperatorType::kMergeSort: case PhysicalOperatorType::kMergeKnn: - case PhysicalOperatorType::kFusion: - case PhysicalOperatorType::kCreateIndexFinish: { + case PhysicalOperatorType::kFusion: { if (fragment_type_ != FragmentType::kSerialMaterialize) { Error( Format("{} should be serial materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); @@ -581,7 +591,7 @@ void FragmentContext::MakeSourceState(i64 parallel_count) { Format("{} should in parallel materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); } for (auto &task : tasks_) { - task->source_state_ = MakeUnique(); + task->source_state_ = MakeUnique(); } break; } @@ -634,6 +644,7 @@ void FragmentContext::MakeSourceState(i64 parallel_count) { case PhysicalOperatorType::kCreateTable: case PhysicalOperatorType::kCreateIndex: case PhysicalOperatorType::kCreateIndexPrepare: + case PhysicalOperatorType::kCreateIndexFinish: case PhysicalOperatorType::kCreateCollection: case PhysicalOperatorType::kCreateDatabase: case PhysicalOperatorType::kCreateView: @@ -754,9 +765,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { break; } case PhysicalOperatorType::kSort: - case PhysicalOperatorType::kKnnScan: - case PhysicalOperatorType::kCreateIndexPrepare: - case PhysicalOperatorType::kCreateIndexDo: { + case PhysicalOperatorType::kKnnScan: { if (fragment_type_ != FragmentType::kParallelMaterialize && fragment_type_ != FragmentType::kSerialMaterialize) { Error( Format("{} should in parallel/serial materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); @@ -835,6 +844,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } break; } + case PhysicalOperatorType::kCreateIndexPrepare: case PhysicalOperatorType::kInsert: case PhysicalOperatorType::kImport: case PhysicalOperatorType::kExport: { @@ -850,6 +860,16 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { tasks_[0]->sink_state_ = MakeUnique(); break; } + case PhysicalOperatorType::kCreateIndexDo: { + if (fragment_type_ != FragmentType::kParallelMaterialize) { + Error( + Format("{} should in parallel materialized fragment", PhysicalOperatorToString(last_operator->operator_type()))); + } + for (auto &task : tasks_) { + task->sink_state_ = MakeUnique(); + } + break; + } case PhysicalOperatorType::kCommand: case PhysicalOperatorType::kCreateTable: case PhysicalOperatorType::kCreateIndex: diff --git a/src/scheduler/fragment_context.cppm b/src/scheduler/fragment_context.cppm index 1ff604e58c..1f2978a8f8 100644 --- a/src/scheduler/fragment_context.cppm +++ b/src/scheduler/fragment_context.cppm @@ -32,14 +32,13 @@ namespace infinity { class PlanFragment; -//class KnnScanSharedData; - -// enum class FragmentStatus { -// kNotStart, -// k -// kStart, -// kFinish, -// }; +enum class FragmentStatus { + kNotStart, + kStart, + kFinish, + kInvalid, +}; + export enum class FragmentType { kInvalid, kSerialMaterialize, @@ -52,14 +51,14 @@ class PlanFragment; export class FragmentContext { public: static void - BuildTask(QueryContext *query_context, FragmentContext *parent_context, PlanFragment *fragment_ptr, Vector &tasks); + BuildTask(QueryContext *query_context, FragmentContext *parent_context, PlanFragment *fragment_ptr); public: explicit FragmentContext(PlanFragment *fragment_ptr, QueryContext *query_context); virtual ~FragmentContext() = default; - inline void IncreaseTask() { task_n_.fetch_add(1); } + inline void IncreaseTask() { unfinished_task_n_.fetch_add(1); } inline void FlushProfiler(TaskProfiler &profiler) { if(!query_context_->is_enable_profiling()) { @@ -68,7 +67,7 @@ public: query_context_->FlushProfiler(Move(profiler)); } - void FinishTask(); + void TryFinishFragment(); Vector &GetOperators(); @@ -85,14 +84,14 @@ public: inline SharedPtr GetResult() { UniqueLock lk(locker_); - cv_.wait(lk, [&] { return completed_; }); + cv_.wait(lk, [&] { return fragment_status_ == FragmentStatus::kFinish; }); return GetResultInternal(); } inline void Complete() { UniqueLock lk(locker_); - completed_ = true; + fragment_status_ = FragmentStatus::kFinish; cv_.notify_one(); } @@ -103,6 +102,19 @@ public: [[nodiscard]] inline FragmentType ContextType() const { return fragment_type_; } private: + bool TryStartFragment() { + if (fragment_status_ != FragmentStatus::kNotStart) { + return false; + } + u64 unfinished_child = unfinished_child_n_.fetch_sub(1); + return unfinished_child == 1; + } + + bool TryFinishFragmentInner() { + u64 unfinished_task = unfinished_task_n_.fetch_sub(1); + return unfinished_task == 1; + } + void MakeSourceState(i64 parallel_count); void MakeSinkState(i64 parallel_count); @@ -111,22 +123,22 @@ protected: virtual SharedPtr GetResultInternal() = 0; protected: - atomic_u64 task_n_{0}; - Mutex locker_{}; CondVar cv_{}; PlanFragment *fragment_ptr_{}; - // HashMap> tasks_; + + QueryContext *query_context_{}; + Vector> tasks_{}; - bool finish_building_{false}; - bool completed_{false}; - i64 finished_task_count_{}; Vector> data_array_{}; FragmentType fragment_type_{FragmentType::kInvalid}; - QueryContext *query_context_{}; + FragmentStatus fragment_status_{FragmentStatus::kInvalid}; + + atomic_u64 unfinished_task_n_{0}; + atomic_u64 unfinished_child_n_{0}; }; export class SerialMaterializedFragmentCtx final : public FragmentContext { diff --git a/src/scheduler/fragment_task.cpp b/src/scheduler/fragment_task.cpp index a7e544d2bd..5b37623b04 100644 --- a/src/scheduler/fragment_task.cpp +++ b/src/scheduler/fragment_task.cpp @@ -31,6 +31,7 @@ import physical_operator_type; import query_context; import base_table_ref; import defer_op; +import fragment_context; module fragment_task; @@ -89,13 +90,13 @@ void FragmentTask::OnExecute(i64) { if (err_msg.get() != nullptr) { sink_state_->error_message_ = Move(err_msg); - this->set_status(FragmentTaskStatus::kError); + status_ = FragmentTaskStatus::kError; } } - if(source_complete && source_state_->error_message_.get() != nullptr) { + if (source_complete && source_state_->error_message_.get() != nullptr) { sink_state_->error_message_ = Move(source_state_->error_message_); - this->set_status(FragmentTaskStatus::kError); + status_ = FragmentTaskStatus::kError; } if (execute_success or sink_state_->error_message_.get() != nullptr) { @@ -104,27 +105,49 @@ void FragmentTask::OnExecute(i64) { } } -bool FragmentTask::Ready() const { - FragmentContext *fragment_context = (FragmentContext *)fragment_context_; - PhysicalSource *source_op = fragment_context->GetSourceOperator(); - return source_op->ReadyToExec(source_state_.get()); +u64 FragmentTask::FragmentId() const { + auto *fragment_context = static_cast(fragment_context_); + return fragment_context->fragment_ptr()->FragmentID(); } -bool FragmentTask::IsComplete() const { return sink_state_->prev_op_state_->Complete() or status() == FragmentTaskStatus::kError; } +// Finished **OR** Error +bool FragmentTask::IsComplete() const { return sink_state_->prev_op_state_->Complete() || status_ == FragmentTaskStatus::kError; } + +// Stream fragment source has no data +bool FragmentTask::QuitFromWorkerLoop() { + auto fragment_context = static_cast(fragment_context_); + if (fragment_context->ContextType() != FragmentType::kParallelStream) { + return false; + } + if (source_state_->state_type_ != SourceStateType::kQueue) { + // fragment's source is not from queue + return false; + } + auto *queue_state = static_cast(source_state_.get()); + + UniqueLock lock(mutex_); + if (status_ == FragmentTaskStatus::kRunning && queue_state->source_queue_.Empty()) { + status_ = FragmentTaskStatus::kPending; + return true; + } + return false; +} TaskBinding FragmentTask::TaskBinding() const { - FragmentContext *fragment_context = (FragmentContext *)fragment_context_; struct TaskBinding binding {}; binding.task_id_ = task_id_; - binding.fragment_id_ = fragment_context->fragment_ptr()->FragmentID(); + binding.fragment_id_ = FragmentId(); return binding; } -void FragmentTask::TryCompleteFragment() { +void FragmentTask::CompleteTask() { + if (status_ == FragmentTaskStatus::kRunning) { + status_ = FragmentTaskStatus::kFinished; + } FragmentContext *fragment_context = (FragmentContext *)fragment_context_; - LOG_TRACE(Format("Task: {} of Fragment: {} is completed", task_id_, fragment_context->fragment_ptr()->FragmentID())); - fragment_context->FinishTask(); + LOG_TRACE(Format("Task: {} of Fragment: {} is completed", task_id_, FragmentId())); + fragment_context->TryFinishFragment(); } String FragmentTask::PhysOpsToString() { @@ -136,4 +159,6 @@ String FragmentTask::PhysOpsToString() { return ss.str(); } +FragmentContext *FragmentTask::fragment_context() const { return reinterpret_cast(fragment_context_); } + } // namespace infinity diff --git a/src/scheduler/fragment_task.cppm b/src/scheduler/fragment_task.cppm index 5661e8dd15..740c755b5f 100644 --- a/src/scheduler/fragment_task.cppm +++ b/src/scheduler/fragment_task.cppm @@ -22,30 +22,12 @@ export module fragment_task; namespace infinity { -// Task type: -// DDL task -// DML task -// Query task -// Read data from queue or file system - -enum class FragmentSourceType { - kNone, - kScan, - kQueue, -}; - -enum class FragmentSinkType { - kGlobalMaterialize, - kLocalMaterialize, - kStream, -}; +class FragmentContext; -enum class FragmentTaskStatus { +export enum class FragmentTaskStatus { + kPending, kRunning, - kCancelled, kFinished, - kReady, - kPending, kError, }; @@ -58,18 +40,8 @@ public: Init(); } - inline void SetTerminator() { is_terminator_ = true; } - [[nodiscard]] inline bool IsTerminator() const { return is_terminator_; } - // Set source - // Scan source - // void - // AddSourceSegment(const SegmentEntry* segment_entry_ptr); - // - // // Input queue - // void - // AddQueue(const BatchBlockingQueue); void Init(); void OnExecute(i64 worker_id); @@ -78,29 +50,34 @@ public: [[nodiscard]] inline i64 LastWorkerID() const { return last_worker_id_; } - [[nodiscard]] inline i64 TaskID() const { return task_id_; } + u64 FragmentId() const; - [[nodiscard]] bool Ready() const; + [[nodiscard]] inline i64 TaskID() const { return task_id_; } [[nodiscard]] bool IsComplete() const; + bool TryIntoWorkerLoop() { + UniqueLock lock(mutex_); + if (status_ == FragmentTaskStatus::kPending) { + status_ = FragmentTaskStatus::kRunning; + return true; + } + return false; + } + + bool QuitFromWorkerLoop(); + [[nodiscard]] TaskBinding TaskBinding() const; - void TryCompleteFragment(); + void CompleteTask(); String PhysOpsToString(); - inline void set_status(FragmentTaskStatus new_status) { - status_ = new_status; - } + inline void set_status(FragmentTaskStatus new_status) { status_ = new_status; } - [[nodiscard]] inline FragmentTaskStatus status() const { - return status_; - } + FragmentContext *fragment_context() const; public: - FragmentTaskStatus status_{FragmentTaskStatus::kReady}; - UniquePtr source_state_{}; Vector> operator_states_{}; @@ -108,6 +85,9 @@ public: UniquePtr sink_state_{}; private: + Mutex mutex_{}; + FragmentTaskStatus status_{FragmentTaskStatus::kPending}; + void *fragment_context_{}; bool is_terminator_{false}; i64 last_worker_id_{-1}; diff --git a/src/scheduler/task_scheduler.cpp b/src/scheduler/task_scheduler.cpp index 47b6edf2b1..52fb513ec6 100644 --- a/src/scheduler/task_scheduler.cpp +++ b/src/scheduler/task_scheduler.cpp @@ -59,179 +59,164 @@ void TaskScheduler::Init(const Config *config_ptr) { Error("No cpu is used in scheduler"); } - // Start coordinator - ready_queue_ = MakeUnique(); - coordinator_ = MakeUnique(&TaskScheduler::CoordinatorLoop, this, ready_queue_.get(), 0); - - ThreadUtil::pin(*coordinator_, 0); - initialized_ = true; } void TaskScheduler::UnInit() { initialized_ = false; UniquePtr terminate_task = MakeUnique(true); - ready_queue_->Enqueue(terminate_task.get()); - coordinator_->join(); + for (const auto &worker : worker_array_) { worker.queue_->Enqueue(terminate_task.get()); worker.thread_->join(); } } -void TaskScheduler::Schedule(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { +Vector TaskScheduler::GetStartFragments(PlanFragment *plan_fragment) { + Vector leaf_fragments; + StdFunction TraversePlanFragmentTree = [&](PlanFragment *root) { + if (root->Children().empty()) { + leaf_fragments.emplace_back(root); + return; + } + for (auto &child : root->Children()) { + TraversePlanFragmentTree(child.get()); + } + }; + // Traverse the tree to get all leaf fragments + TraversePlanFragmentTree(plan_fragment); + + return leaf_fragments; +} + +void TaskScheduler::Schedule(PlanFragment *plan_fragment) { if (!initialized_) { Error("Scheduler isn't initialized"); } - // Vector>& children = plan_fragment->Children(); - // if(!children.empty()) { - // SchedulerError("Only support one fragment query") - // } - // 1. Recursive traverse the fragment tree - // 2. Check the fragment - // if the first op is SCAN op, then get all block entry and create the source type is kScan. - // if the first op isn't SCAN op, fragment task source type is kQueue and a task_result_queue need to be created. - // According to the fragment output type to set the correct fragment task sink type. - // Set the queue of parent fragment task. - if (plan_fragment->GetOperators().empty()) { - Error("Empty fragment"); - } - auto *last_operator = plan_fragment->GetOperators()[0]; - switch (last_operator->operator_type()) { - case PhysicalOperatorType::kCreateIndexFinish: { - ScheduleRoundRobin(query_context, tasks, plan_fragment); - break; - } - default: { - ScheduleOneWorkerIfPossible(query_context, tasks, plan_fragment); - break; + Vector fragments = GetStartFragments(plan_fragment); + last_cpu_id_ = 0; // FIXME + u64 &worker_id = last_cpu_id_; + for (auto *plan_fragment : fragments) { + auto &tasks = plan_fragment->GetContext()->Tasks(); + for (auto &task : tasks) { + ScheduleTask(task.get(), worker_id); + ++worker_id; + worker_id %= worker_count_; } } } -void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { - LOG_TRACE(Format("Schedule {} tasks of query id: {} into scheduler with OneWorkerPerQuery policy", tasks.size(), query_context->query_id())); - u64 worker_id = ProposedWorkerID(query_context->GetTxn()->TxnID()); - for (const auto &fragment_task : tasks) { - ScheduleTask(fragment_task, worker_id); +void TaskScheduler::ScheduleFragment(PlanFragment *plan_fragment) { + Vector task_ptrs; + auto &tasks = plan_fragment->GetContext()->Tasks(); + for (auto &task : tasks) { + if (task->TryIntoWorkerLoop()) { + task_ptrs.emplace_back(task.get()); + } } -} - -void TaskScheduler::ScheduleOneWorkerIfPossible(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { - // Schedule worker 0 if possible - u64 scheduled_worker = u64_max; - u64 min_load_worker{0}; - u64 min_work_load{u64_max}; - for(u64 proposed_worker = 0; proposed_worker < worker_count_; ++ proposed_worker) { - u64 current_work_load = worker_workloads_[proposed_worker]; - if(current_work_load < 1) { - scheduled_worker = proposed_worker; - break; + last_cpu_id_ = 0; // FIXME + u64 &worker_id = last_cpu_id_; + for (auto *task_ptr : task_ptrs) { + if (task_ptr->LastWorkerID() == -1) { + ScheduleTask(task_ptr, worker_id); + ++worker_id; + worker_id %= worker_count_; } else { - if(current_work_load < min_work_load) { - min_load_worker = proposed_worker; - min_work_load = current_work_load; - } + ScheduleTask(task_ptr, task_ptr->LastWorkerID()); } } - - if(scheduled_worker == u64_max) { - scheduled_worker = min_load_worker; - } - - worker_workloads_[scheduled_worker] += tasks.size(); - LOG_TRACE(Format("Schedule {} tasks of query id: {} into worker: {} with ScheduleOneWorkerIfPossible policy", - tasks.size(), - query_context->query_id(), - scheduled_worker)); - for (const auto &fragment_task : tasks) { - ScheduleTask(fragment_task, scheduled_worker); - } } -void TaskScheduler::ScheduleRoundRobin(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { - LOG_TRACE(Format("Schedule {} tasks of query id: {} into scheduler with RR policy", tasks.size(), query_context->query_id())); - u64 worker_id = 0; - for (const auto &fragment_task : tasks) { - ScheduleTask(fragment_task, worker_id); - worker_id++; - worker_id %= worker_count_; - } +void TaskScheduler::ScheduleTask(FragmentTask *task, u64 worker_id) { + task->set_status(FragmentTaskStatus::kRunning); + ++worker_workloads_[worker_id]; + worker_array_[worker_id].queue_->Enqueue(task); } -void TaskScheduler::ToReadyQueue(FragmentTask *task) { - if (!initialized_) { - Error("Scheduler isn't initialized"); - } -} -void TaskScheduler::CoordinatorLoop(FragmentTaskBlockQueue *ready_queue, i64 cpu_id) { - FragmentTask *fragment_task{nullptr}; - bool running{true}; - u64 current_cpu_id{0}; - HashSet fragment_task_ptr; - while (running) { - ready_queue->Dequeue(fragment_task); - if (auto iter = fragment_task_ptr.find(u64(fragment_task)); iter == fragment_task_ptr.end()) { - fragment_task_ptr.emplace(u64(fragment_task)); +void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id) { + List task_lists; + auto iter = task_lists.end(); + while (true) { + Vector dequeue_output; + if (task_lists.empty()) { + task_queue->DequeueBulk(dequeue_output); + } else { + task_queue->TryDequeueBulk(dequeue_output); } - + if (!dequeue_output.empty()) { + task_lists.insert(task_lists.end(), dequeue_output.begin(), dequeue_output.end()); + } + if (iter == task_lists.end()) { + iter = task_lists.begin(); + } + auto &fragment_task = *iter; if (fragment_task->IsTerminator()) { - running = false; - continue; + break; } - if (!fragment_task->Ready()) { - ready_queue->Enqueue(fragment_task); - continue; - } + fragment_task->OnExecute(worker_id); + fragment_task->SetLastWorkID(worker_id); - if (fragment_task->LastWorkerID() == -1) { - // Select an available worker to dispatch - u64 to_use_cpu_id = current_cpu_id; - ++current_cpu_id; - to_use_cpu_id %= worker_count_; - worker_array_[to_use_cpu_id].queue_->Enqueue(fragment_task); + if (fragment_task->IsComplete()) { + --worker_workloads_[worker_id]; + fragment_task->CompleteTask(); + iter = task_lists.erase(iter); + } else if (fragment_task->QuitFromWorkerLoop()) { + --worker_workloads_[worker_id]; + iter = task_lists.erase(iter); } else { - // Dispatch to the same worker - worker_array_[fragment_task->LastWorkerID()].queue_->Enqueue(fragment_task); + ++iter; } } } -void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id) { - FragmentTask *fragment_task{nullptr}; - Vector task_list; - task_list.reserve(DEFAULT_BLOCKING_QUEUE_SIZE); - bool running{true}; - while (running) { - task_queue->DequeueBulk(task_list); - SizeT list_size = task_list.size(); - for (SizeT idx = 0; idx < list_size; ++idx) { - fragment_task = task_list[idx]; - - if (fragment_task->IsTerminator()) { - running = false; - break; - } - - if (!fragment_task->Ready()) { - ready_queue_->Enqueue(fragment_task); - continue; - } - - fragment_task->OnExecute(worker_id); - fragment_task->SetLastWorkID(worker_id); - if (!fragment_task->IsComplete()) { - ready_queue_->Enqueue(fragment_task); - } else { - --worker_workloads_[worker_id]; - fragment_task->TryCompleteFragment(); - } - } - task_list.clear(); - } -} +// void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { +// LOG_TRACE(Format("Schedule {} tasks of query id: {} into scheduler with OneWorkerPerQuery policy", tasks.size(), query_context->query_id())); +// u64 worker_id = ProposedWorkerID(query_context->GetTxn()->TxnID()); +// for (const auto &fragment_task : tasks) { +// ScheduleTask(fragment_task, worker_id); +// } +// } + +// void TaskScheduler::ScheduleOneWorkerIfPossible(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { +// // Schedule worker 0 if possible +// u64 scheduled_worker = u64_max; +// u64 min_load_worker{0}; +// u64 min_work_load{u64_max}; +// for (u64 proposed_worker = 0; proposed_worker < worker_count_; ++proposed_worker) { +// u64 current_work_load = worker_workloads_[proposed_worker]; +// if (current_work_load < 1) { +// scheduled_worker = proposed_worker; +// break; +// } else { +// if (current_work_load < min_work_load) { +// min_load_worker = proposed_worker; +// min_work_load = current_work_load; +// } +// } +// } + +// if (scheduled_worker == u64_max) { +// scheduled_worker = min_load_worker; +// } + +// worker_workloads_[scheduled_worker] += tasks.size(); +// LOG_TRACE(Format("Schedule {} tasks of query id: {} into worker: {} with ScheduleOneWorkerIfPossible policy", +// tasks.size(), +// query_context->query_id(), +// scheduled_worker)); +// for (const auto &fragment_task : tasks) { +// ScheduleTask(fragment_task, scheduled_worker); +// } +// } + +// void TaskScheduler::ScheduleRoundRobin(const Vector &tasks) { +// for (const auto &fragment_task : tasks) { +// ScheduleTask(fragment_task, last_cpu_id_++); +// last_cpu_id_ %= worker_count_; +// } +// } } // namespace infinity diff --git a/src/scheduler/task_scheduler.cppm b/src/scheduler/task_scheduler.cppm index e4781551d6..57c281b1f4 100644 --- a/src/scheduler/task_scheduler.cppm +++ b/src/scheduler/task_scheduler.cppm @@ -46,36 +46,35 @@ public: void UnInit(); - void Schedule(QueryContext* query_context, const Vector &tasks, PlanFragment* plan_fragment); + // Schedule start fragments + void Schedule(PlanFragment * plan_fragment_root); -private: - void ScheduleOneWorkerPerQuery(QueryContext* query_context, const Vector &tasks, PlanFragment* plan_fragment); - void ScheduleOneWorkerIfPossible(QueryContext* query_context, const Vector &tasks, PlanFragment* plan_fragment); - void ScheduleRoundRobin(QueryContext* query_context, const Vector &tasks, PlanFragment* plan_fragment); + // `plan_fragment` can be scheduled because all of its dependencies are met. + void ScheduleFragment(PlanFragment *plan_fragment); - inline void ScheduleTask(FragmentTask *task, u64 worker_id) { - worker_array_[worker_id].queue_->Enqueue(task); - } +private: + Vector GetStartFragments(PlanFragment* plan_fragment); - inline u64 ProposedWorkerID(u64 object_id) const { - return (object_id) % worker_count_; - } + // void ScheduleOneWorkerPerQuery(QueryContext* query_context, const Vector &tasks, PlanFragment* plan_fragment); + // void ScheduleOneWorkerIfPossible(QueryContext* query_context, const Vector &tasks, PlanFragment* plan_fragment); + // void ScheduleRoundRobin(const Vector &tasks); - void ToReadyQueue(FragmentTask *task); + void ScheduleTask(FragmentTask *task, u64 worker_id); - void CoordinatorLoop(FragmentTaskBlockQueue *ready_queue, i64 cpu_id); + // inline u64 ProposedWorkerID(u64 object_id) const { + // return (object_id) % worker_count_; + // } void WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id); private: + u64 last_cpu_id_{0}; + bool initialized_{false}; Vector worker_array_{}; Deque> worker_workloads_{}; - UniquePtr ready_queue_{}; - UniquePtr coordinator_{}; - u64 worker_count_{0}; }; diff --git a/src/unit_test/test_helper/sql_runner.cpp b/src/unit_test/test_helper/sql_runner.cpp index 72ebecce20..a14e54f853 100644 --- a/src/unit_test/test_helper/sql_runner.cpp +++ b/src/unit_test/test_helper/sql_runner.cpp @@ -99,11 +99,10 @@ SharedPtr SQLRunner::Run(const String &sql_text, bool print) { // Fragment Builder, only for test now. plan fragment is same as pipeline. auto plan_fragment = query_context_ptr->fragment_builder()->BuildFragment(physical_plan.get()); - Vector tasks; - FragmentContext::BuildTask(query_context_ptr.get(), nullptr, plan_fragment.get(), tasks); + FragmentContext::BuildTask(query_context_ptr.get(), nullptr, plan_fragment.get()); // Schedule the query tasks - query_context_ptr->scheduler()->Schedule(query_context_ptr.get(), tasks, plan_fragment.get()); + query_context_ptr->scheduler()->Schedule(plan_fragment.get()); // Initialize query result QueryResult query_result;