From d07eb017d9877800ae6b348a5da632f7cbb86bd5 Mon Sep 17 00:00:00 2001 From: shenyushi Date: Wed, 27 Dec 2023 13:40:00 +0800 Subject: [PATCH] Add tmp schedule strategy. --- src/executor/operator/physical_sink.cpp | 13 ------------- src/main/resource_manager.cppm | 2 +- src/scheduler/task_scheduler.cpp | 17 +++++++++++++++-- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index 0ce94760f5..1cfc3acd6d 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -131,19 +131,6 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate } break; } - case PhysicalOperatorType::kKnnScan: { - throw ExecutorException("KnnScan shouldn't be here"); - KnnScanOperatorState *knn_output_state = static_cast(task_op_state); - if (knn_output_state->data_block_array_.empty()) { - Error("Empty knn scan output"); - } - - for (auto &data_block : knn_output_state->data_block_array_) { - materialize_sink_state->data_block_array_.emplace_back(Move(data_block)); - } - knn_output_state->data_block_array_.clear(); - break; - } case PhysicalOperatorType::kAggregate: { AggregateOperatorState *agg_output_state = static_cast(task_op_state); if (agg_output_state->data_block_array_.empty()) { diff --git a/src/main/resource_manager.cppm b/src/main/resource_manager.cppm index 41380621b2..a8d1192d47 100644 --- a/src/main/resource_manager.cppm +++ b/src/main/resource_manager.cppm @@ -30,7 +30,7 @@ public: return cpu_count; } - inline u64 GetCpuResource() { return GetCpuResource(4); } + inline u64 GetCpuResource() { return GetCpuResource(Thread::hardware_concurrency()); } inline u64 GetMemoryResource(u64 memory_size) { total_memory_ -= memory_size; diff --git a/src/scheduler/task_scheduler.cpp b/src/scheduler/task_scheduler.cpp index 7f305f816d..47b6edf2b1 100644 --- a/src/scheduler/task_scheduler.cpp +++ b/src/scheduler/task_scheduler.cpp @@ -30,6 +30,7 @@ import query_context; import plan_fragment; import fragment_context; import default_values; +import physical_operator_type; module task_scheduler; @@ -93,8 +94,20 @@ void TaskScheduler::Schedule(QueryContext *query_context, const VectorGetOperators().empty()) { + Error("Empty fragment"); + } + auto *last_operator = plan_fragment->GetOperators()[0]; + switch (last_operator->operator_type()) { + case PhysicalOperatorType::kCreateIndexFinish: { + ScheduleRoundRobin(query_context, tasks, plan_fragment); + break; + } + default: { + ScheduleOneWorkerIfPossible(query_context, tasks, plan_fragment); + break; + } + } } void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) {