Skip to content

Commit

Permalink
Response sender to check for improper non-decoupled model usage (#363)
Browse files Browse the repository at this point in the history
* Response sender to check for improper non-decoupled model usage

* Force close response sender on exception

* Rename functions
  • Loading branch information
kthui authored Jun 4, 2024
1 parent 9f2865d commit 4961e24
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 32 deletions.
12 changes: 7 additions & 5 deletions src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ InferRequest::InferRequest(
pb_cancel_ =
std::make_shared<PbCancel>(response_factory_address_, request_address_);
response_sender_ = std::make_shared<ResponseSender>(
request_address_, response_factory_address_,
request_address_, response_factory_address_, nullptr /* is_decoupled */,
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
#endif
}
Expand Down Expand Up @@ -272,7 +272,8 @@ InferRequest::SaveToSharedMemory(std::unique_ptr<SharedMemoryManager>& shm_pool)
std::unique_ptr<InferRequest>
InferRequest::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle)
bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle,
bool const* is_model_decoupled)
{
AllocatedSharedMemory<char> infer_request_shm =
shm_pool->Load<char>(request_handle);
Expand Down Expand Up @@ -328,7 +329,7 @@ InferRequest::LoadFromSharedMemory(
return std::unique_ptr<InferRequest>(new InferRequest(
infer_request_shm, request_id_shm, correlation_id_shm,
requested_output_names_shm, model_name_shm, input_tensors, parameters_shm,
infer_trace_shm));
infer_trace_shm, is_model_decoupled));
}

InferRequest::InferRequest(
Expand All @@ -339,7 +340,8 @@ InferRequest::InferRequest(
std::unique_ptr<PbString>& model_name_shm,
std::vector<std::shared_ptr<PbTensor>>& input_tensors,
std::unique_ptr<PbString>& parameters_shm,
std::unique_ptr<InferenceTrace>& infer_trace_shm)
std::unique_ptr<InferenceTrace>& infer_trace_shm,
bool const* is_model_decoupled)
: infer_request_shm_(std::move(infer_request_shm)),
request_id_shm_(std::move(request_id_shm)),
requested_output_names_shm_(std::move(requested_output_names_shm)),
Expand Down Expand Up @@ -387,7 +389,7 @@ InferRequest::InferRequest(
pb_cancel_ =
std::make_shared<PbCancel>(response_factory_address_, request_address_);
response_sender_ = std::make_shared<ResponseSender>(
request_address_, response_factory_address_,
request_address_, response_factory_address_, is_model_decoupled,
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
#endif
}
Expand Down
5 changes: 3 additions & 2 deletions src/infer_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class InferRequest {
static std::unique_ptr<InferRequest> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t request_handle,
bool open_cuda_handle);
bool open_cuda_handle, bool const* is_model_decoupled);

/// Disallow copying the inference request object.
DISALLOW_COPY_AND_ASSIGN(InferRequest);
Expand All @@ -135,7 +135,8 @@ class InferRequest {
std::unique_ptr<PbString>& model_name_shm,
std::vector<std::shared_ptr<PbTensor>>& input_tensors,
std::unique_ptr<PbString>& parameters_shm,
std::unique_ptr<InferenceTrace>& infer_trace_shm);
std::unique_ptr<InferenceTrace>& infer_trace_shm,
bool const* is_model_decoupled);

std::string request_id_;
CorrelationId correlation_id_;
Expand Down
11 changes: 10 additions & 1 deletion src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,8 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr)
for (size_t i = 0; i < batch_size; i++) {
std::shared_ptr<InferRequest> infer_request =
InferRequest::LoadFromSharedMemory(
shm_pool_, request_shm_handle[i], true /* open_cuda_handle */);
shm_pool_, request_shm_handle[i], true /* open_cuda_handle */,
&ipc_control_->decoupled /* is_model_decoupled */);
py_request_list.append(infer_request);
}

Expand Down Expand Up @@ -740,6 +741,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
error_string_shm = PbString::Create(shm_pool_, err_message);
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
response_batch_shm_ptr->is_error_set = true;
// Once the error is sent to the backend, the backend is supposed to close
// all response factories if not already closed, so closing all response
// senders if not already closed to prevent the model from sending more
// responses after the factories are closed.
for (py::handle py_request : py_request_list) {
InferRequest* request = py_request.cast<InferRequest*>();
request->GetResponseSender()->Close();
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,8 @@ ModelInstanceState::ExecuteBLSRequest(
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
request_batch.data_.get() + sizeof(RequestBatch));
infer_request = InferRequest::LoadFromSharedMemory(
Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */);
Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */,
nullptr /* is_model_decoupled */);

// If the BLS inputs are in GPU an additional round trip between the
// stub process and the main process is required. The reason is that we
Expand Down
88 changes: 67 additions & 21 deletions src/response_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,31 @@

namespace triton { namespace backend { namespace python {

void
CheckResponseSenderArguments(
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
{
// Check the correctness of the provided flags.
if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) {
throw PythonBackendException(
"Unable to send response. Unsupported flag provided.");
}

if (flags == 0 && response == nullptr) {
throw PythonBackendException(
"Inference Response object must be provided when the response flags is "
"set to zero.");
}
}

ResponseSender::ResponseSender(
intptr_t request_address, intptr_t response_factory_address,
std::unique_ptr<SharedMemoryManager>& shm_pool,
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
const std::shared_ptr<PbCancel>& pb_cancel)
: request_address_(request_address),
response_factory_address_(response_factory_address), shm_pool_(shm_pool),
closed_(false), pb_cancel_(pb_cancel)
response_factory_address_(response_factory_address),
is_decoupled_(is_decoupled), shm_pool_(shm_pool), pb_cancel_(pb_cancel),
closed_(false), number_of_response_sent_(0)
{
}

Expand All @@ -54,15 +72,32 @@ ResponseSender::~ResponseSender()
}

void
ResponseSender::Send(
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
ResponseSender::UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
{
// Release the GIL. This avoids a potential deadlock situation in the parent
// process, where every thread in the thread pool is indirectly waiting for a
// function in the stub process that acquires the GIL. Meanwhile, the current
// thread, which holds the GIL, is also waiting for the parent side to have
// the next available thread to pick up the job during resource contention.
py::gil_scoped_release release;
if (is_decoupled_ == nullptr) {
// TODO: Can a model access the response sender on a BLS infer request?
throw PythonBackendException(
"Unable to send response. Response sender has no reference to the "
"decoupled state of the model.");
}
bool is_decoupled = *is_decoupled_;

std::lock_guard<std::mutex> lk(mu_);

if (!is_decoupled) {
if (response != nullptr && number_of_response_sent_ > 0) {
throw PythonBackendException(
"Unable to send response. Non-decoupled model cannot send more than "
"one response.");
}
if (response == nullptr && flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL &&
number_of_response_sent_ == 0) {
throw PythonBackendException(
"Unable to send response. Non-decoupled model cannot send complete "
"final before sending a response.");
}
}

if (closed_) {
throw PythonBackendException(
Expand All @@ -72,18 +107,22 @@ ResponseSender::Send(
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
closed_ = true;
}
number_of_response_sent_++;
}

// Check the correctness of the provided flags.
if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) {
throw PythonBackendException(
"Unable to send response. Unsupported flag provided.");
}
void
ResponseSender::Send(
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
{
// Release the GIL. This avoids a potential deadlock situation in the parent
// process, where every thread in the thread pool is indirectly waiting for a
// function in the stub process that acquires the GIL. Meanwhile, the current
// thread, which holds the GIL, is also waiting for the parent side to have
// the next available thread to pick up the job during resource contention.
py::gil_scoped_release release;

if (flags == 0 && infer_response == nullptr) {
throw PythonBackendException(
"Inference Response object must be provided when the response flags is "
"set to zero.");
}
CheckResponseSenderArguments(infer_response, flags);
UpdateStateAndCounters(infer_response, flags);

std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();

Expand Down Expand Up @@ -211,4 +250,11 @@ ResponseSender::IsCancelled()
return pb_cancel_->IsCancelled();
}

void
ResponseSender::Close()
{
std::lock_guard<std::mutex> lk(mu_);
closed_ = true;
}

}}} // namespace triton::backend::python
16 changes: 14 additions & 2 deletions src/response_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#pragma once

#include <mutex>

#include "infer_response.h"
#include "pb_cancel.h"
#include "shm_manager.h"
Expand All @@ -36,17 +38,27 @@ class ResponseSender {
public:
ResponseSender(
intptr_t request_address, intptr_t response_factory_address,
std::unique_ptr<SharedMemoryManager>& shm_pool,
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
const std::shared_ptr<PbCancel>& pb_cancel);
~ResponseSender();
void Send(std::shared_ptr<InferResponse> response, const uint32_t flags);
bool IsCancelled();

// Can be useful at stopping the model from sending any more responses.
void Close();

private:
void UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags);

intptr_t request_address_;
intptr_t response_factory_address_;
bool const* is_decoupled_;
std::unique_ptr<SharedMemoryManager>& shm_pool_;
bool closed_;
std::shared_ptr<PbCancel> pb_cancel_;

std::mutex mu_;
bool closed_;
size_t number_of_response_sent_;
};
}}} // namespace triton::backend::python

0 comments on commit 4961e24

Please sign in to comment.