Skip to content

Commit

Permalink
remove the condition variable control and increase the number of work…
Browse files Browse the repository at this point in the history
…er requests to at least 2 per device to avoid deadlock.
  • Loading branch information
yangwang201911 committed Dec 4, 2024
1 parent 47bedb4 commit 81f346c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 48 deletions.
8 changes: 1 addition & 7 deletions src/plugins/auto/src/auto_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,10 @@ void AutoSchedule::init() {
// initialize containers before run async task
m_idle_worker_requests[device.device_name];
m_worker_requests[device.device_name];
m_worker_requests_conds[device.device_name];
m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr;
}
m_idle_worker_requests["CPU_HELP"];
m_worker_requests["CPU_HELP"];
m_worker_requests_conds["CPU_HELP"];
m_infer_pipeline_tasks_device_specific["CPU_HELP"] = nullptr;
m_executor->run(m_compile_context[CPU].m_task);
m_executor->run(m_compile_context[ACTUALDEVICE].m_task);
Expand Down Expand Up @@ -490,11 +488,7 @@ bool AutoSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline
if (!preferred_device.empty() && (device.device_name != preferred_device)) {
continue;
}
if (run_pipeline_task(pipeline_task,
m_idle_worker_requests[device.device_name],
preferred_device,
m_worker_requests_conds[device.device_name],
m_worker_infer_mutex)) {
if (run_pipeline_task(pipeline_task, m_idle_worker_requests[device.device_name], preferred_device)) {
return true;
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/plugins/auto/src/cumulative_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ void CumuSchedule::init() {
// initialize containers before run async task, if not initialized, it will hang during infer
m_idle_worker_requests[device.device_name];
m_worker_requests[device.device_name];
m_worker_requests_conds[device.device_name];
m_infer_pipeline_tasks_device_specific[device.device_name] = nullptr;
}
// load devices other than CPU first
Expand Down Expand Up @@ -248,11 +247,7 @@ bool CumuSchedule::schedule_to_worker_infer_request(ov::threading::Task pipeline
}
auto selected_device_name =
preferred_device.empty() ? schedule_to_next_device(devices, current_device_index) : preferred_device;
if (run_pipeline_task(pipeline_task,
m_idle_worker_requests[selected_device_name],
preferred_device,
m_worker_requests_conds[selected_device_name],
m_worker_infer_mutex)) {
if (run_pipeline_task(pipeline_task, m_idle_worker_requests[selected_device_name], preferred_device)) {
return true;
} else {
current_device_index++;
Expand Down
43 changes: 15 additions & 28 deletions src/plugins/auto/src/schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,10 @@ void Schedule::run(ov::threading::Task pipeline_task) {

bool Schedule::run_pipeline_task(ov::threading::Task& pipeline_task,
NotBusyPriorityWorkerRequests& idle_workerrequests,
const DeviceName& preferred_device,
std::condition_variable& idle_workerrequests_cv,
std::mutex& worker_infer_mutex) {
const DeviceName& preferred_device) {
WorkerInferRequest* worker_request_ptr = nullptr;
std::pair<int, WorkerInferRequest*> worker;
{
std::unique_lock<std::mutex> lck(worker_infer_mutex);
if (!idle_workerrequests.try_pop(worker)) {
idle_workerrequests_cv.wait(lck, [&idle_workerrequests, &worker] {
return idle_workerrequests.try_pop(worker);
});
}
}
if (worker.second) {
if (idle_workerrequests.try_pop(worker)) {
worker_request_ptr = worker.second;
IdleGuard<NotBusyPriorityWorkerRequests> idle_guard{worker_request_ptr, idle_workerrequests};
m_this_worker_infer_request = worker_request_ptr;
Expand Down Expand Up @@ -95,13 +85,15 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel
OPENVINO_THROW("Every device used with AUTO should support query optimal_number_of_infer_requests property from compiled model ",
iie.what());
}
auto num_requests =
(m_context->m_device_priorities.end() == it_numrequests || it_numrequests->num_requests_per_devices == -1)
? optimal_num
: it_numrequests->num_requests_per_devices;
auto num_requests = (m_context->m_device_priorities.end() == it_numrequests ||
it_numrequests->num_requests_per_devices == -1) ? optimal_num : it_numrequests->num_requests_per_devices;
// If the user creates only one infer request, we need to ensure at least 2 requests per device.
// This is necessary to handle the case where a request worker is popped from the idle queue before being pushed back.
// Without at least 2 requests, there could be a situation where no requests are available for inference,
// leading to potential deadlocks.
num_requests = num_requests <= 1 ? 2 : num_requests;
auto& worker_requests = m_worker_requests[device];
auto& idle_worker_requests = m_idle_worker_requests[device];
auto& worker_requests_cv = m_worker_requests_conds[device];
worker_requests.resize(num_requests);
m_infer_pipeline_tasks_device_specific[device] = std::unique_ptr<TaskQueue>(new TaskQueue);
auto* idle_workerrequests_ptr = &(idle_worker_requests);
Expand All @@ -111,11 +103,9 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel
worker_request.m_inferrequest = {compiled_model->create_infer_request(), compiled_model._so};
auto* worker_request_ptr = &worker_request;
worker_request_ptr->m_index = num++;
OPENVINO_ASSERT(
idle_worker_requests.try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr)) == true);
OPENVINO_ASSERT(idle_worker_requests.try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr)) == true);
worker_request.m_inferrequest->set_callback(
[worker_request_ptr, this, device, idle_workerrequests_ptr, &worker_requests_cv](
std::exception_ptr exception_ptr) mutable {
[worker_request_ptr, this, device, idle_workerrequests_ptr](std::exception_ptr exception_ptr) mutable {
IdleGuard<NotBusyPriorityWorkerRequests> idleGuard{worker_request_ptr, *idle_workerrequests_ptr};
worker_request_ptr->m_exception_ptr = std::move(exception_ptr);
{
Expand Down Expand Up @@ -143,20 +133,17 @@ void Schedule::generate_workers(const std::string& device, const SoCompiledModel
} else {
stop_retry_and_continue();
}
std::unique_lock<std::mutex> lck(m_worker_infer_mutex);
if (idleGuard.release()->try_push(
std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) {
// let's try to pop a task, as we know there is at least one idle request, schedule if
// succeeded if no device-agnostic tasks, let's try pop the device specific task, schedule
// if succeeded
// try to return the request to the idle list (fails if the overall object destruction has began)
if (idleGuard.release()->try_push(std::make_pair(worker_request_ptr->m_index, worker_request_ptr))) {
// let's try to pop a task, as we know there is at least one idle request, schedule if succeeded
// if no device-agnostic tasks, let's try pop the device specific task, schedule if succeeded
ov::threading::Task t;
do {
m_infer_pipeline_tasks.try_pop(t);
} while (t && schedule_to_worker_infer_request(std::move(t)));
do {
m_infer_pipeline_tasks_device_specific[device]->try_pop(t);
} while (t && schedule_to_worker_infer_request(std::move(t), device));
worker_requests_cv.notify_all();
}
}
});
Expand Down
9 changes: 2 additions & 7 deletions src/plugins/auto/src/schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ class Schedule : public std::enable_shared_from_this<Schedule>, public ov::threa

protected:
virtual void init() = 0;
static bool run_pipeline_task(ov::threading::Task& pipeline_task,
NotBusyPriorityWorkerRequests& idle_worker_request,
const DeviceName& preferred_device,
std::condition_variable& idle_worker_request_cv,
std::mutex& mutex);
static bool run_pipeline_task(ov::threading::Task& pipeline_task, NotBusyPriorityWorkerRequests& idle_worker_request,
const DeviceName& preferred_device);
virtual void generate_workers(const std::string& device, const SoCompiledModel& compiled_model);
virtual void try_to_compile_model(AutoCompileContext& context, const std::shared_ptr<ov::Model>& model) = 0;
virtual bool schedule_to_worker_infer_request(ov::threading::Task, DeviceName preferred_device = "") = 0;
Expand All @@ -43,7 +40,6 @@ class Schedule : public std::enable_shared_from_this<Schedule>, public ov::threa
std::shared_ptr<ov::threading::IStreamsExecutor> m_executor;
DeviceMap<NotBusyPriorityWorkerRequests> m_idle_worker_requests;
DeviceMap<std::vector<WorkerInferRequest>> m_worker_requests;
DeviceMap<std::condition_variable> m_worker_requests_conds;
TaskQueue m_infer_pipeline_tasks;
DeviceMap<std::unique_ptr<TaskQueue>> m_infer_pipeline_tasks_device_specific;
SoCompiledModel m_passthrough_compiled_model;
Expand All @@ -54,7 +50,6 @@ class Schedule : public std::enable_shared_from_this<Schedule>, public ov::threa
mutable std::atomic<std::size_t> m_request_id = {0};
std::mutex m_dev_infer_mutex;
std::unordered_map<IASyncInferPtr, WorkerInferRequest*> m_dev_infer;
std::mutex m_worker_infer_mutex;
};

} // namespace auto_plugin
Expand Down

0 comments on commit 81f346c

Please sign in to comment.