Skip to content

Commit

Permalink
Add tmp schedule strategy.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Dec 27, 2023
1 parent e235f36 commit d07eb01
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
13 changes: 0 additions & 13 deletions src/executor/operator/physical_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,6 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate
}
break;
}
case PhysicalOperatorType::kKnnScan: {
throw ExecutorException("KnnScan shouldn't be here");
KnnScanOperatorState *knn_output_state = static_cast<KnnScanOperatorState *>(task_op_state);
if (knn_output_state->data_block_array_.empty()) {
Error<ExecutorException>("Empty knn scan output");
}

for (auto &data_block : knn_output_state->data_block_array_) {
materialize_sink_state->data_block_array_.emplace_back(Move(data_block));
}
knn_output_state->data_block_array_.clear();
break;
}
case PhysicalOperatorType::kAggregate: {
AggregateOperatorState *agg_output_state = static_cast<AggregateOperatorState *>(task_op_state);
if (agg_output_state->data_block_array_.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resource_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public:
return cpu_count;
}

inline u64 GetCpuResource() { return GetCpuResource(4); }
inline u64 GetCpuResource() { return GetCpuResource(Thread::hardware_concurrency()); }

inline u64 GetMemoryResource(u64 memory_size) {
total_memory_ -= memory_size;
Expand Down
17 changes: 15 additions & 2 deletions src/scheduler/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import query_context;
import plan_fragment;
import fragment_context;
import default_values;
import physical_operator_type;

module task_scheduler;

Expand Down Expand Up @@ -93,8 +94,20 @@ void TaskScheduler::Schedule(QueryContext *query_context, const Vector<FragmentT
// if the first op isn't SCAN op, fragment task source type is kQueue and a task_result_queue need to be created.
// According to the fragment output type to set the correct fragment task sink type.
// Set the queue of parent fragment task.
// ScheduleOneWorkerIfPossible(query_context, tasks, plan_fwosfasdfragment);
ScheduleRoundRobin(query_context, tasks, plan_fragment);
if (plan_fragment->GetOperators().empty()) {
Error<SchedulerException>("Empty fragment");
}
auto *last_operator = plan_fragment->GetOperators()[0];
switch (last_operator->operator_type()) {
case PhysicalOperatorType::kCreateIndexFinish: {
ScheduleRoundRobin(query_context, tasks, plan_fragment);
break;
}
default: {
ScheduleOneWorkerIfPossible(query_context, tasks, plan_fragment);
break;
}
}
}

void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector<FragmentTask *> &tasks, PlanFragment *plan_fragment) {
Expand Down

0 comments on commit d07eb01

Please sign in to comment.