Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lock when set fragment task status. #401

Merged
merged 4 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/executor/operator/physical_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,13 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_
return;
}
SizeT output_data_block_count = task_operator_state->data_block_array_.size();
if (output_data_block_count == 0) {
for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) {
next_fragment_queue->Enqueue(MakeShared<FragmentNone>(queue_sink_state->fragment_id_));
}
return;
// Error<ExecutorException>("No output from last operator.");
}
// if (output_data_block_count == 0) {
// for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) {
// next_fragment_queue->Enqueue(MakeShared<FragmentNone>(queue_sink_state->fragment_id_));
// }
// return;
// // Error<ExecutorException>("No output from last operator.");
// }
for (SizeT idx = 0; idx < output_data_block_count; ++idx) {
auto fragment_data = MakeShared<FragmentData>(queue_sink_state->fragment_id_,
Move(task_operator_state->data_block_array_[idx]),
Expand Down
3 changes: 3 additions & 0 deletions src/executor/operator_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module;

import data_block;
import stl;
import third_party;
import logger;
import physical_operator_type;
import fragment_data;
import infinity_exception;
Expand Down Expand Up @@ -95,6 +97,7 @@ bool QueueSourceState::GetData() {
case PhysicalOperatorType::kMergeLimit: {
auto *fragment_data = static_cast<FragmentData *>(fragment_data_base.get());
MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state;
LOG_TRACE(Format("Get data, data_count: {}, row_count: {}", fragment_data->data_count_, fragment_data->data_block_->row_count()));
limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_));
if (!limit_op_state->input_complete_) {
limit_op_state->input_complete_ = completed;
Expand Down
53 changes: 31 additions & 22 deletions src/scheduler/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,32 +443,41 @@ FragmentContext::FragmentContext(PlanFragment *fragment_ptr, QueryContext *query

void FragmentContext::TryFinishFragment() {
if (!TryFinishFragmentInner()) {
LOG_TRACE(Format("{} tasks in fragment are not completed: {} are not completed", unfinished_task_n_.load(), fragment_ptr_->FragmentID()));
return;
}
LOG_TRACE(Format("All tasks in fragment: {} are completed", fragment_ptr_->FragmentID()));
fragment_status_ = FragmentStatus::kFinish;
LOG_TRACE(Format("{} tasks in fragment {} are not completed", unfinished_task_n_.load(), fragment_ptr_->FragmentID()));
if (fragment_type_ != FragmentType::kParallelStream) {
return;
}
auto *parent_plan_fragment = fragment_ptr_->GetParent();
if (parent_plan_fragment == nullptr) {
return;
}

auto *sink_op = GetSinkOperator();
if (sink_op->sink_type() == SinkType::kResult) {
Complete();
return;
}
auto *scheduler = query_context_->scheduler();
LOG_TRACE(Format("Schedule fragment: {} before fragment {} has finished.", parent_plan_fragment->FragmentID(), fragment_ptr_->FragmentID()));
scheduler->ScheduleFragment(parent_plan_fragment);
} else {
LOG_TRACE(Format("All tasks in fragment: {} are completed", fragment_ptr_->FragmentID()));
fragment_status_ = FragmentStatus::kFinish;
auto *sink_op = GetSinkOperator();
if (sink_op->sink_type() == SinkType::kResult) {
Complete();
return;
}
auto *parent_plan_fragment = fragment_ptr_->GetParent();
if (parent_plan_fragment == nullptr) {
return;
}

// Try to schedule parent fragment
auto *parent_plan_fragment = fragment_ptr_->GetParent();
if (parent_plan_fragment == nullptr) {
return;
}
auto *parent_fragment_ctx = parent_plan_fragment->GetContext();
auto *parent_fragment_ctx = parent_plan_fragment->GetContext();
if (!parent_fragment_ctx->TryStartFragment()) {
return;
}
// All child fragment are finished.

if (!parent_fragment_ctx->TryStartFragment() && parent_fragment_ctx->fragment_type_ != FragmentType::kParallelStream) {
return;
auto *scheduler = query_context_->scheduler();
LOG_TRACE(Format("Schedule fragment: {} because fragment {} has finished.", parent_plan_fragment->FragmentID(), fragment_ptr_->FragmentID()));
scheduler->ScheduleFragment(parent_plan_fragment);
}
// All child fragment are finished.
auto *scheduler = query_context_->scheduler();
scheduler->ScheduleFragment(parent_plan_fragment);
return;
}

Vector<PhysicalOperator *> &FragmentContext::GetOperators() { return fragment_ptr_->GetOperators(); }
Expand Down
13 changes: 13 additions & 0 deletions src/scheduler/fragment_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ export enum class FragmentType {
kParallelStream,
};

export String FragmentType2String(FragmentType type) {
switch (type) {
case FragmentType::kInvalid:
return String("Invalid");
case FragmentType::kSerialMaterialize:
return String("SerialMaterialize");
case FragmentType::kParallelMaterialize:
return String("ParallelMaterialize");
case FragmentType::kParallelStream:
return String("ParallelStream");
}
}

class PlanFragment;

export class FragmentContext {
Expand Down
8 changes: 7 additions & 1 deletion src/scheduler/fragment_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
module;

#include <sstream>
#include <thread>

import fragment_context;
import profiler;
Expand Down Expand Up @@ -44,6 +45,7 @@ void FragmentTask::Init() {
}

void FragmentTask::OnExecute(i64) {
LOG_TRACE(Format("Task: {} of Fragment: {} is running", task_id_, FragmentId()));
// infinity::BaseProfiler prof;
// prof.Begin();
FragmentContext *fragment_context = (FragmentContext *)fragment_context_;
Expand Down Expand Up @@ -111,7 +113,10 @@ u64 FragmentTask::FragmentId() const {
}

// Finished **OR** Error
bool FragmentTask::IsComplete() const { return sink_state_->prev_op_state_->Complete() || status_ == FragmentTaskStatus::kError; }
bool FragmentTask::IsComplete() {
UniqueLock<Mutex> lock(mutex_);
return sink_state_->prev_op_state_->Complete() || status_ == FragmentTaskStatus::kError;
}

// Stream fragment source has no data
bool FragmentTask::QuitFromWorkerLoop() {
Expand All @@ -127,6 +132,7 @@ bool FragmentTask::QuitFromWorkerLoop() {

UniqueLock<Mutex> lock(mutex_);
if (status_ == FragmentTaskStatus::kRunning && queue_state->source_queue_.Empty()) {
LOG_TRACE(Format("Task: {} of Fragment: {} is quit from worker loop", task_id_, FragmentId()));
status_ = FragmentTaskStatus::kPending;
return true;
}
Expand Down
22 changes: 20 additions & 2 deletions src/scheduler/fragment_task.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ export enum class FragmentTaskStatus {
kError,
};

export String FragmentTaskStatus2String(FragmentTaskStatus status) {
switch (status) {
case FragmentTaskStatus::kPending:
return String("Pending");
case FragmentTaskStatus::kRunning:
return String("Running");
case FragmentTaskStatus::kFinished:
return String("Finished");
case FragmentTaskStatus::kError:
return String("Error");
}
}

export class FragmentTask {
public:
explicit FragmentTask(bool terminator = true) : is_terminator_(terminator) {}
Expand All @@ -54,7 +67,7 @@ public:

[[nodiscard]] inline i64 TaskID() const { return task_id_; }

[[nodiscard]] bool IsComplete() const;
[[nodiscard]] bool IsComplete();

bool TryIntoWorkerLoop() {
UniqueLock<Mutex> lock(mutex_);
Expand All @@ -73,7 +86,12 @@ public:

String PhysOpsToString();

inline void set_status(FragmentTaskStatus new_status) { status_ = new_status; }
inline void set_status(FragmentTaskStatus new_status) {
UniqueLock<Mutex> lock(mutex_);
status_ = new_status;
}

[[nodiscard]] inline FragmentTaskStatus status() const { return status_; }

FragmentContext *fragment_context() const;

Expand Down
21 changes: 20 additions & 1 deletion src/scheduler/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ module;

#include <list>
#include <sched.h>
#include <vector>

import stl;
import config;
Expand All @@ -31,6 +30,7 @@ import plan_fragment;
import fragment_context;
import default_values;
import physical_operator_type;
import physical_operator;

module task_scheduler;

Expand Down Expand Up @@ -172,6 +172,25 @@ void TaskScheduler::WorkerLoop(FragmentTaskBlockQueue *task_queue, i64 worker_id
}
}

void TaskScheduler::DumpPlanFragment(PlanFragment *root) {
StdFunction<void(PlanFragment *)> TraverseFragmentTree = [&](PlanFragment *fragment) {
LOG_TRACE(Format("Fragment id: {}, type: {}", fragment->FragmentID(), FragmentType2String(fragment->GetFragmentType())));
auto *fragment_ctx = fragment->GetContext();
for (auto &task : fragment_ctx->Tasks()) {
LOG_TRACE(Format("Task id: {}, status: {}", task->TaskID(), FragmentTaskStatus2String(task->status())));
}
for (auto iter = fragment_ctx->GetOperators().begin(); iter != fragment_ctx->GetOperators().end(); ++iter) {
LOG_TRACE(Format("Operator type: {}", PhysicalOperatorToString((*iter)->operator_type())));
}
for (auto &child : fragment->Children()) {
TraverseFragmentTree(child.get());
}
};
TraverseFragmentTree(root);
LOG_TRACE("");
}


// void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector<FragmentTask *> &tasks, PlanFragment *plan_fragment) {
// LOG_TRACE(Format("Schedule {} tasks of query id: {} into scheduler with OneWorkerPerQuery policy", tasks.size(), query_context->query_id()));
// u64 worker_id = ProposedWorkerID(query_context->GetTxn()->TxnID());
Expand Down
2 changes: 2 additions & 0 deletions src/scheduler/task_scheduler.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public:
// `plan_fragment` can be scheduled because all of its dependencies are met.
void ScheduleFragment(PlanFragment *plan_fragment);

void DumpPlanFragment(PlanFragment *plan_fragment);

private:
Vector<PlanFragment *> GetStartFragments(PlanFragment* plan_fragment);

Expand Down