diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index 113af3d041..d379ee0620 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -370,13 +370,13 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_ return; } SizeT output_data_block_count = task_operator_state->data_block_array_.size(); - if (output_data_block_count == 0) { - for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { - next_fragment_queue->Enqueue(MakeShared(queue_sink_state->fragment_id_)); - } - return; - // Error("No output from last operator."); - } + // if (output_data_block_count == 0) { + // for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { + // next_fragment_queue->Enqueue(MakeShared(queue_sink_state->fragment_id_)); + // } + // return; + // // Error("No output from last operator."); + // } for (SizeT idx = 0; idx < output_data_block_count; ++idx) { auto fragment_data = MakeShared(queue_sink_state->fragment_id_, Move(task_operator_state->data_block_array_[idx]), diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index f2ee0e3602..7d9b2b3697 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -16,6 +16,8 @@ module; import data_block; import stl; +import third_party; +import logger; import physical_operator_type; import fragment_data; import infinity_exception; @@ -95,6 +97,7 @@ bool QueueSourceState::GetData() { case PhysicalOperatorType::kMergeLimit: { auto *fragment_data = static_cast(fragment_data_base.get()); MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; + LOG_TRACE(Format("Get data, data_count: {}, row_count: {}", fragment_data->data_count_, fragment_data->data_block_->row_count())); limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); if (!limit_op_state->input_complete_) { limit_op_state->input_complete_ = completed; diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index 6920268e43..e3cdc047af 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -443,32 +443,41 @@ FragmentContext::FragmentContext(PlanFragment *fragment_ptr, QueryContext *query void FragmentContext::TryFinishFragment() { if (!TryFinishFragmentInner()) { - LOG_TRACE(Format("{} tasks in fragment are not completed: {} are not completed", unfinished_task_n_.load(), fragment_ptr_->FragmentID())); - return; - } - LOG_TRACE(Format("All tasks in fragment: {} are completed", fragment_ptr_->FragmentID())); - fragment_status_ = FragmentStatus::kFinish; + LOG_TRACE(Format("{} tasks in fragment {} are not completed", unfinished_task_n_.load(), fragment_ptr_->FragmentID())); + if (fragment_type_ != FragmentType::kParallelStream) { + return; + } + auto *parent_plan_fragment = fragment_ptr_->GetParent(); + if (parent_plan_fragment == nullptr) { + return; + } - auto *sink_op = GetSinkOperator(); - if (sink_op->sink_type() == SinkType::kResult) { - Complete(); - return; - } + auto *scheduler = query_context_->scheduler(); + LOG_TRACE(Format("Schedule fragment: {} before fragment {} has finished.", parent_plan_fragment->FragmentID(), fragment_ptr_->FragmentID())); + scheduler->ScheduleFragment(parent_plan_fragment); + } else { + LOG_TRACE(Format("All tasks in fragment: {} are completed", fragment_ptr_->FragmentID())); + fragment_status_ = FragmentStatus::kFinish; + auto *sink_op = GetSinkOperator(); + if (sink_op->sink_type() == SinkType::kResult) { + Complete(); + return; + } + auto *parent_plan_fragment = fragment_ptr_->GetParent(); + if (parent_plan_fragment == nullptr) { + return; + } - // Try to schedule parent fragment - auto *parent_plan_fragment = fragment_ptr_->GetParent(); - if (parent_plan_fragment == nullptr) { - return; - } - auto *parent_fragment_ctx = parent_plan_fragment->GetContext(); + auto *parent_fragment_ctx = parent_plan_fragment->GetContext(); + if (!parent_fragment_ctx->TryStartFragment()) { + return; + } + // All child fragment are finished. - if (!parent_fragment_ctx->TryStartFragment() && parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) { - return; + auto *scheduler = query_context_->scheduler(); + LOG_TRACE(Format("Schedule fragment: {} because fragment {} has finished.", parent_plan_fragment->FragmentID(), fragment_ptr_->FragmentID())); + scheduler->ScheduleFragment(parent_plan_fragment); } - // All child fragment are finished. - auto *scheduler = query_context_->scheduler(); - scheduler->ScheduleFragment(parent_plan_fragment); - return; } Vector &FragmentContext::GetOperators() { return fragment_ptr_->GetOperators(); } diff --git a/src/scheduler/fragment_context.cppm b/src/scheduler/fragment_context.cppm index 1f2978a8f8..dde116d449 100644 --- a/src/scheduler/fragment_context.cppm +++ b/src/scheduler/fragment_context.cppm @@ -46,6 +46,19 @@ export enum class FragmentType { kParallelStream, }; +export String FragmentType2String(FragmentType type) { + switch (type) { + case FragmentType::kInvalid: + return String("Invalid"); + case FragmentType::kSerialMaterialize: + return String("SerialMaterialize"); + case FragmentType::kParallelMaterialize: + return String("ParallelMaterialize"); + case FragmentType::kParallelStream: + return String("ParallelStream"); + } +} + class PlanFragment; export class FragmentContext { diff --git a/src/scheduler/fragment_task.cpp b/src/scheduler/fragment_task.cpp index 5b37623b04..d43a738b13 100644 --- a/src/scheduler/fragment_task.cpp +++ b/src/scheduler/fragment_task.cpp @@ -15,6 +15,7 @@ module; #include +#include import fragment_context; import profiler; @@ -44,6 +45,7 @@ void FragmentTask::Init() { } void FragmentTask::OnExecute(i64) { + LOG_TRACE(Format("Task: {} of Fragment: {} is running", task_id_, FragmentId())); // infinity::BaseProfiler prof; // prof.Begin(); FragmentContext *fragment_context = (FragmentContext *)fragment_context_; @@ -111,7 +113,10 @@ u64 FragmentTask::FragmentId() const { } // Finished **OR** Error -bool FragmentTask::IsComplete() const { return sink_state_->prev_op_state_->Complete() || status_ == FragmentTaskStatus::kError; } +bool FragmentTask::IsComplete() { + UniqueLock lock(mutex_); + return sink_state_->prev_op_state_->Complete() || status_ == FragmentTaskStatus::kError; +} // Stream fragment source has no data bool FragmentTask::QuitFromWorkerLoop() { @@ -127,6 +132,7 @@ bool FragmentTask::QuitFromWorkerLoop() { UniqueLock lock(mutex_); if (status_ == FragmentTaskStatus::kRunning && queue_state->source_queue_.Empty()) { + LOG_TRACE(Format("Task: {} of Fragment: {} is quit from worker loop", task_id_, FragmentId())); status_ = FragmentTaskStatus::kPending; return true; } diff --git a/src/scheduler/fragment_task.cppm b/src/scheduler/fragment_task.cppm index 740c755b5f..008845b928 100644 --- a/src/scheduler/fragment_task.cppm +++ b/src/scheduler/fragment_task.cppm @@ -31,6 +31,19 @@ export enum class FragmentTaskStatus { kError, }; +export String FragmentTaskStatus2String(FragmentTaskStatus status) { + switch (status) { + case FragmentTaskStatus::kPending: + return String("Pending"); + case FragmentTaskStatus::kRunning: + return String("Running"); + case FragmentTaskStatus::kFinished: + return String("Finished"); + case FragmentTaskStatus::kError: + return String("Error"); + } +} + export class FragmentTask { public: explicit FragmentTask(bool terminator = true) : is_terminator_(terminator) {} @@ -54,7 +67,7 @@ public: [[nodiscard]] inline i64 TaskID() const { return task_id_; } - [[nodiscard]] bool IsComplete() const; + [[nodiscard]] bool IsComplete(); bool TryIntoWorkerLoop() { UniqueLock lock(mutex_); @@ -73,7 +86,12 @@ public: String PhysOpsToString(); - inline void set_status(FragmentTaskStatus new_status) { status_ = new_status; } + inline void set_status(FragmentTaskStatus new_status) { + UniqueLock lock(mutex_); + status_ = new_status; + } + + [[nodiscard]] inline FragmentTaskStatus status() const { return status_; } FragmentContext *fragment_context() const; diff --git a/src/scheduler/task_scheduler.cpp b/src/scheduler/task_scheduler.cpp index 52fb513ec6..c518b4c96e 100644 --- a/src/scheduler/task_scheduler.cpp +++ b/src/scheduler/task_scheduler.cpp @@ -16,7 +16,6 @@ module; #include #include -#include import stl; import config; @@ -31,6 +30,7 @@ import plan_fragment; import fragment_context; import default_values; import physical_operator_type; +import physical_operator; module task_scheduler; @@ -172,6 +172,25 @@ void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id } } +void TaskScheduler::DumpPlanFragment(PlanFragment *root) { + StdFunction TraverseFragmentTree = [&](PlanFragment *fragment) { + LOG_TRACE(Format("Fragment id: {}, type: {}", fragment->FragmentID(), FragmentType2String(fragment->GetFragmentType()))); + auto *fragment_ctx = fragment->GetContext(); + for (auto &task : fragment_ctx->Tasks()) { + LOG_TRACE(Format("Task id: {}, status: {}", task->TaskID(), FragmentTaskStatus2String(task->status()))); + } + for (auto iter = fragment_ctx->GetOperators().begin(); iter != fragment_ctx->GetOperators().end(); ++iter) { + LOG_TRACE(Format("Operator type: {}", PhysicalOperatorToString((*iter)->operator_type()))); + } + for (auto &child : fragment->Children()) { + TraverseFragmentTree(child.get()); + } + }; + TraverseFragmentTree(root); + LOG_TRACE(""); +} + + // void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { // LOG_TRACE(Format("Schedule {} tasks of query id: {} into scheduler with OneWorkerPerQuery policy", tasks.size(), query_context->query_id())); // u64 worker_id = ProposedWorkerID(query_context->GetTxn()->TxnID()); diff --git a/src/scheduler/task_scheduler.cppm b/src/scheduler/task_scheduler.cppm index 57c281b1f4..08a6b3c4de 100644 --- a/src/scheduler/task_scheduler.cppm +++ b/src/scheduler/task_scheduler.cppm @@ -52,6 +52,8 @@ public: // `plan_fragment` can be scheduled because all of its dependencies are met. void ScheduleFragment(PlanFragment *plan_fragment); + void DumpPlanFragment(PlanFragment *plan_fragment); + private: Vector GetStartFragments(PlanFragment* plan_fragment);