Skip to content

Commit

Permalink
Implement periodic concurrency manager/worker and inference profiler …
Browse files Browse the repository at this point in the history
…workflow
  • Loading branch information
matthewkotila committed Sep 26, 2023
1 parent 930749c commit ca9c1fa
Show file tree
Hide file tree
Showing 13 changed files with 463 additions and 37 deletions.
4 changes: 4 additions & 0 deletions src/c++/perf_analyzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ set(
sequence_manager.cc
profile_data_collector.cc
profile_data_exporter.cc
periodic_concurrency_manager.cc
periodic_concurrency_worker.cc
)

set(
Expand Down Expand Up @@ -109,6 +111,8 @@ set(
request_record.h
profile_data_collector.h
profile_data_exporter.h
periodic_concurrency_manager.h
periodic_concurrency_worker.h
)

add_executable(
Expand Down
9 changes: 8 additions & 1 deletion src/c++/perf_analyzer/command_line_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ struct PerfAnalyzerParameters {
{
return (
using_concurrency_range || using_old_options ||
!(using_request_rate_range || using_custom_intervals));
!(using_request_rate_range || using_custom_intervals ||
using_periodic_concurrency_mode));
}

// Sets the threshold for PA client overhead.
Expand All @@ -148,6 +149,12 @@ struct PerfAnalyzerParameters {

// The profile export file path.
std::string profile_export_file{""};

// Whether periodic concurrency mode is being used
bool using_periodic_concurrency_mode{false};

Range<uint64_t> periodic_concurrency_range{1, 1, 1};
uint64_t periodic_concurrency_request_period{10};
};

using PAParamsPtr = std::shared_ptr<PerfAnalyzerParameters>;
Expand Down
19 changes: 10 additions & 9 deletions src/c++/perf_analyzer/concurrency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ class ConcurrencyManager : public LoadManager {
std::shared_ptr<ThreadStat>,
std::shared_ptr<ConcurrencyWorker::ThreadConfig>);

private:
ConcurrencyManager(
const bool async, const bool streaming, const int32_t batch_size,
const size_t max_threads, const size_t max_concurrency,
const SharedMemoryType shared_memory_type, const size_t output_shm_size,
const std::shared_ptr<ModelParser>& parser,
const std::shared_ptr<cb::ClientBackendFactory>& factory);

// The number of worker threads with non-zero concurrencies
size_t active_threads_;

bool execute_;

size_t max_concurrency_;

std::vector<std::shared_ptr<ConcurrencyWorker::ThreadConfig>> threads_config_;

private:
void InitManagerFinalize() override;

// Pause all worker threads that are working on sequences
Expand All @@ -118,14 +127,6 @@ class ConcurrencyManager : public LoadManager {
//
void ResumeSequenceWorkers();

// The number of worker threads with non-zero concurrencies
size_t active_threads_;

bool execute_;

size_t max_concurrency_;
std::vector<std::shared_ptr<ConcurrencyWorker::ThreadConfig>> threads_config_;

#ifndef DOCTEST_CONFIG_DISABLE
friend TestConcurrencyManager;

Expand Down
43 changes: 26 additions & 17 deletions src/c++/perf_analyzer/concurrency_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,40 @@ ConcurrencyWorker::Infer()

// run inferencing until receiving exit signal to maintain server load.
do {
HandleExecuteOff();

if (HandleNoConcurrency()) {
return;
if (RunInference()) {
break;
}
} while (true);
}

CreateContextsAsNecessary();
bool
ConcurrencyWorker::RunInference()
{
HandleExecuteOff();

if (HandleExitConditions()) {
return;
}
if (HandleNoConcurrency()) {
return true;
}

SendInferRequests();
CreateContextsAsNecessary();

if (HandleExitConditions()) {
return;
}
if (HandleExitConditions()) {
return true;
}

WaitForResponses();
SendInferRequests();

if (HandleExitConditions()) {
return;
}
if (HandleExitConditions()) {
return true;
}

} while (true);
WaitForResponses();

if (HandleExitConditions()) {
return true;
}

return false;
}

void
Expand Down
23 changes: 14 additions & 9 deletions src/c++/perf_analyzer/concurrency_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ class NaggyMockConcurrencyWorker;
class ConcurrencyWorker : public LoadWorker {
public:
struct ThreadConfig {
ThreadConfig(size_t thread_id)
: thread_id_(thread_id), concurrency_(0), seq_stat_index_offset_(0),
is_paused_(false)
ThreadConfig(
size_t thread_id, size_t concurrency = 0,
size_t seq_stat_index_offset = 0)
: thread_id_(thread_id), concurrency_(concurrency),
seq_stat_index_offset_(seq_stat_index_offset), is_paused_(false)
{
}

Expand Down Expand Up @@ -91,7 +93,15 @@ class ConcurrencyWorker : public LoadWorker {
{
}

void Infer() override;
virtual void Infer() override;

protected:
bool RunInference();

void CreateCtxIdTracker();

// Reserve vector size for contexts
void ReserveContexts();

private:
const size_t max_concurrency_;
Expand All @@ -101,11 +111,6 @@ class ConcurrencyWorker : public LoadWorker {

std::shared_ptr<ThreadConfig> thread_config_;

void CreateCtxIdTracker();

// Reserve vector size for contexts
void ReserveContexts();

// Handle the case where execute_ is false
void HandleExecuteOff();

Expand Down
7 changes: 7 additions & 0 deletions src/c++/perf_analyzer/infer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
return;
}
it->second.response_times_.push_back(std::chrono::system_clock::now());
num_responses_++;
if (is_null_response == true) {
it->second.has_null_last_response_ = true;
}
Expand All @@ -267,6 +268,7 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
return;
}
if (is_final_response) {
has_received_final_response_ = is_final_response;
thread_stat_->request_records_.emplace_back(
it->second.start_time_, it->second.response_times_,
it->second.sequence_end_, it->second.delayed_,
Expand All @@ -279,8 +281,13 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
}
}

if (worker_callback_) {
worker_callback_(id_);
}

if (is_final_response) {
total_ongoing_requests_--;
num_responses_ = 0;

if (async_callback_finalize_func_ != nullptr) {
async_callback_finalize_func_(id_);
Expand Down
13 changes: 13 additions & 0 deletions src/c++/perf_analyzer/infer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,28 @@ class InferContext {
// object and have not returned
uint GetNumOngoingRequests() { return total_ongoing_requests_; }

// Returns the number of responses for the current request
uint64_t GetNumResponsesForCurrentRequest() { return num_responses_; }

// Register a function that will get called after every async request returns
void RegisterAsyncCallbackFinalize(std::function<void(uint32_t)> callback)
{
async_callback_finalize_func_ = callback;
}

void RegisterWorkerCallback(std::function<void(uint32_t)> worker_callback)
{
worker_callback_ = worker_callback;
}

// TODO REFACTOR TMA-1043 this should be in memory class
void SetNumActiveThreads(size_t num_threads)
{
num_active_threads_ = num_threads;
}

bool HasReceivedFinalResponse() { return has_received_final_response_; }

protected:
/// A helper function to issue inference request to the server.
/// \param request_id The unique id to be associated with the request.
Expand Down Expand Up @@ -191,6 +201,9 @@ class InferContext {
std::reference_wrapper<const bool> execute_{execute_placeholder_};

std::shared_ptr<SequenceManager> sequence_manager_{nullptr};
uint64_t num_responses_{0};
std::function<void(uint32_t)> worker_callback_{nullptr};
bool has_received_final_response_{false};

#ifndef DOCTEST_CONFIG_DISABLE
friend NaggyMockInferContext;
Expand Down
13 changes: 13 additions & 0 deletions src/c++/perf_analyzer/inference_profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "metrics_manager.h"
#include "model_parser.h"
#include "mpi_utils.h"
#include "periodic_concurrency_manager.h"
#include "profile_data_collector.h"
#include "request_rate_manager.h"

Expand Down Expand Up @@ -306,6 +307,18 @@ class InferenceProfiler {
return cb::Error::Success;
}

cb::Error ProfilePeriodicConcurrencyMode()
{
auto& manager{dynamic_cast<PeriodicConcurrencyManager&>(*manager_)};
std::vector<RequestRecord> request_records{manager.RunExperiment()};

InferenceLoadMode id{1, 0.0};
collector_->AddWindow(id, 0, UINT64_MAX);
collector_->AddData(id, std::move(request_records));

return cb::Error::Success;
}

bool IncludeServerStats() { return include_server_stats_; }

private:
Expand Down
18 changes: 17 additions & 1 deletion src/c++/perf_analyzer/perf_analyzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "perf_analyzer.h"

#include "perf_analyzer_exception.h"
#include "periodic_concurrency_manager.h"
#include "report_writer.h"
#include "request_rate_manager.h"

Expand Down Expand Up @@ -159,6 +160,12 @@ PerfAnalyzer::CreateAnalyzerObjects()
}

std::unique_ptr<pa::LoadManager> manager;
params_->using_periodic_concurrency_mode = true;
params_->periodic_concurrency_range = {
std::stoi(std::getenv("MY_START")), std::stoi(std::getenv("MY_END")),
std::stoi(std::getenv("MY_STEP"))};
params_->periodic_concurrency_request_period =
std::stoi(std::getenv("MY_REQUEST_PERIOD"));

if (params_->targeting_concurrency()) {
if ((parser_->SchedulerType() == pa::ModelParser::SEQUENCE) ||
Expand Down Expand Up @@ -209,6 +216,13 @@ PerfAnalyzer::CreateAnalyzerObjects()
factory, &manager),
"failed to create concurrency manager");

} else if (params_->using_periodic_concurrency_mode) {
manager = std::make_unique<pa::PeriodicConcurrencyManager>(
params_->async, params_->streaming, params_->batch_size,
params_->max_threads, params_->max_concurrency,
params_->shared_memory_type, params_->output_shm_size, parser_, factory,
params_->periodic_concurrency_range,
params_->periodic_concurrency_request_period);
} else if (params_->using_request_rate_range) {
if ((params_->sequence_id_range != 0) &&
(params_->sequence_id_range < params_->num_of_sequences)) {
Expand Down Expand Up @@ -370,6 +384,8 @@ PerfAnalyzer::Profile()
err = profiler_->Profile<size_t>(
params_->concurrency_range.start, params_->concurrency_range.end,
params_->concurrency_range.step, params_->search_mode, perf_statuses_);
} else if (params_->using_periodic_concurrency_mode) {
err = profiler_->ProfilePeriodicConcurrencyMode();
} else {
err = profiler_->Profile<double>(
params_->request_rate_range[pa::SEARCH_RANGE::kSTART],
Expand All @@ -393,7 +409,7 @@ PerfAnalyzer::Profile()
void
PerfAnalyzer::WriteReport()
{
if (!perf_statuses_.size()) {
if (!perf_statuses_.size() || params_->using_periodic_concurrency_mode) {
return;
}

Expand Down
Loading

0 comments on commit ca9c1fa

Please sign in to comment.