Skip to content

Commit

Permalink
Merge branch 'main' into feat-retriever
Browse files Browse the repository at this point in the history
  • Loading branch information
dyastremsky authored Jun 29, 2024
2 parents 2d7cce0 + 5b2c1a4 commit 7eb8eb4
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 20 deletions.
57 changes: 45 additions & 12 deletions src/c++/perf_analyzer/inference_profiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -723,13 +723,22 @@ InferenceProfiler::ProfileHelper(
measurement_perf_status.request_rate = experiment_perf_status.request_rate;
RETURN_IF_ERROR(manager_->CheckHealth());

MeasureConfig measure_config;
if (measurement_mode_ == MeasurementMode::TIME_WINDOWS) {
error.push(
Measure(measurement_perf_status, measurement_window_ms_, false));
measure_config.measurement_window = measurement_window_ms_;
measure_config.is_count_based = false;
} else {
error.push(
Measure(measurement_perf_status, measurement_request_count_, true));
measure_config.measurement_window = measurement_request_count_;
measure_config.is_count_based = true;
}

// When request_count is not 0, the experiment will run for exactly X
// requests. In that case, we are not measuring based on window stability,
// and instead need to clamp the windows to be from the start of the
// first request to the end of the last request of the request count
//
measure_config.clamp_window = (request_count != 0);
error.push(Measure(measurement_perf_status, measure_config));
measurement_perf_statuses.push_back(measurement_perf_status);

if (error.size() > load_parameters_.stability_window) {
Expand Down Expand Up @@ -1169,8 +1178,7 @@ InferenceProfiler::GetServerSideStatus(

// Used for measurement
cb::Error
InferenceProfiler::Measure(
PerfStatus& perf_status, uint64_t measurement_window, bool is_count_based)
InferenceProfiler::Measure(PerfStatus& perf_status, MeasureConfig config)
{
std::map<cb::ModelIdentifier, cb::ModelStatistics> start_status;
std::map<cb::ModelIdentifier, cb::ModelStatistics> end_status;
Expand Down Expand Up @@ -1207,18 +1215,18 @@ InferenceProfiler::Measure(
}
}

if (!is_count_based) {
if (!config.is_count_based) {
// Wait for specified time interval in msec
std::this_thread::sleep_for(
std::chrono::milliseconds((uint64_t)(measurement_window_ms_ * 1.2)));
std::chrono::milliseconds((uint64_t)(config.measurement_window * 1.2)));
} else {
do {
// Check the health of the worker threads.
RETURN_IF_ERROR(manager_->CheckHealth());

// Wait for 1s until enough samples have been collected.
std::this_thread::sleep_for(std::chrono::milliseconds((uint64_t)1000));
} while (manager_->CountCollectedRequests() < measurement_window);
} while (manager_->CountCollectedRequests() < config.measurement_window);
}

uint64_t window_end_ns =
Expand Down Expand Up @@ -1249,7 +1257,7 @@ InferenceProfiler::Measure(

RETURN_IF_ERROR(Summarize(
start_status, end_status, start_stat, end_stat, perf_status,
window_start_ns, window_end_ns));
window_start_ns, window_end_ns, config.clamp_window));

return cb::Error::Success;
}
Expand All @@ -1259,21 +1267,28 @@ InferenceProfiler::Summarize(
const std::map<cb::ModelIdentifier, cb::ModelStatistics>& start_status,
const std::map<cb::ModelIdentifier, cb::ModelStatistics>& end_status,
const cb::InferStat& start_stat, const cb::InferStat& end_stat,
PerfStatus& summary, uint64_t window_start_ns, uint64_t window_end_ns)
PerfStatus& summary, uint64_t window_start_ns, uint64_t window_end_ns,
bool clamp_window)
{
size_t valid_sequence_count = 0;
size_t delayed_request_count = 0;
size_t response_count = 0;

// Get measurement from requests that fall within the time interval
std::pair<uint64_t, uint64_t> valid_range{window_start_ns, window_end_ns};
uint64_t window_duration_ns = valid_range.second - valid_range.first;
std::vector<uint64_t> latencies;
std::vector<RequestRecord> valid_requests{};
ValidLatencyMeasurement(
valid_range, valid_sequence_count, delayed_request_count, &latencies,
response_count, valid_requests);


if (clamp_window) {
auto [start, end] = ClampWindow(valid_requests);
}

uint64_t window_duration_ns = window_end_ns - window_start_ns;

if (should_collect_profile_data_) {
CollectData(
summary, window_start_ns, window_end_ns, std::move(valid_requests));
Expand Down Expand Up @@ -1366,6 +1381,24 @@ InferenceProfiler::ValidLatencyMeasurement(
std::sort(valid_latencies->begin(), valid_latencies->end());
}

std::pair<uint64_t, uint64_t>
InferenceProfiler::ClampWindow(std::vector<RequestRecord>& requests)
{
auto earliest_start =
std::chrono::time_point<std::chrono::system_clock>::max();
auto latest_end = std::chrono::time_point<std::chrono::system_clock>::min();

for (auto x : requests) {
earliest_start = std::min(earliest_start, x.start_time_);
latest_end = std::max(latest_end, x.response_timestamps_.back());
}

return std::make_pair(
earliest_start.time_since_epoch().count(),
latest_end.time_since_epoch().count());
}


void
InferenceProfiler::CollectData(
PerfStatus& summary, uint64_t window_start_ns, uint64_t window_end_ns,
Expand Down
28 changes: 20 additions & 8 deletions src/c++/perf_analyzer/inference_profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ struct LoadStatus {
uint64_t avg_latency = 0;
};

/// Configuration for the Measure function
struct MeasureConfig {
uint64_t measurement_window{0};
bool is_count_based{false};
bool clamp_window{false};
};

// Holds the total of the timiming components of composing models of an
// ensemble.
struct EnsembleDurations {
Expand Down Expand Up @@ -475,14 +482,9 @@ class InferenceProfiler {

/// Helper function to perform measurement.
/// \param status_summary The summary of this measurement.
/// \param measurement_window Indicating the number of requests or the
/// duration in milliseconds to collect requests.
/// \param is_count_based determines whether measurement_window is indicating
/// time or count.
/// \param config The configuration for measurement.
/// \return cb::Error object indicating success or failure.
cb::Error Measure(
PerfStatus& status_summary, uint64_t measurement_window,
bool is_count_based);
cb::Error Measure(PerfStatus& status_summary, MeasureConfig config);

/// Gets the server side statistics
/// \param model_status Returns the status of the models provided by
Expand All @@ -501,12 +503,15 @@ class InferenceProfiler {
/// \param summary Returns the summary of the measurement.
/// \param window_start_ns The window start timestamp in nanoseconds.
/// \param window_end_ns The window end timestamp in nanoseconds.
/// \param clamp_window If true, the actual window range is reduced to the
/// start of the first request to the final response.
/// \return cb::Error object indicating success or failure.
cb::Error Summarize(
const std::map<cb::ModelIdentifier, cb::ModelStatistics>& start_status,
const std::map<cb::ModelIdentifier, cb::ModelStatistics>& end_status,
const cb::InferStat& start_stat, const cb::InferStat& end_stat,
PerfStatus& summary, uint64_t window_start_ns, uint64_t window_end_ns);
PerfStatus& summary, uint64_t window_start_ns, uint64_t window_end_ns,
bool clamp_window);

/// \param valid_range The start and end timestamp of the measurement window.
/// \param valid_sequence_count Returns the number of completed sequences
Expand All @@ -522,6 +527,13 @@ class InferenceProfiler {
std::vector<uint64_t>* latencies, size_t& response_count,
std::vector<RequestRecord>& valid_requests);

/// Clamp a window around a set of requests, from the earliest start time to
/// the latest response
/// \param requests A vector of requests to clamp the window around.
/// \return std::pair object containing <start, end> of the window.
std::pair<uint64_t, uint64_t> ClampWindow(
std::vector<RequestRecord>& requests);

/// Add the data from the request records to the Raw Data Collector
/// \param perf_status PerfStatus of the current measurement
/// \param window_start_ns The window start timestamp in nanoseconds.
Expand Down
40 changes: 40 additions & 0 deletions src/c++/perf_analyzer/test_inference_profiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class TestInferenceProfiler : public InferenceProfiler {
return ip.IsDoneProfiling(ls, &is_stable);
};

std::pair<uint64_t, uint64_t> ClampWindow(std::vector<RequestRecord>& reqs)
{
return InferenceProfiler::ClampWindow(reqs);
}

cb::Error MergeMetrics(
const std::vector<std::reference_wrapper<const Metrics>>& all_metrics,
Metrics& merged_metrics)
Expand Down Expand Up @@ -1060,6 +1065,41 @@ TEST_CASE(
}
}

TEST_CASE("clamp window")
{
TestInferenceProfiler tip{};
std::vector<RequestRecord> reqs{};

auto clock_epoch{std::chrono::time_point<std::chrono::system_clock>()};

auto request1_timestamp{clock_epoch + std::chrono::nanoseconds(5)};
auto response1_timestamp{clock_epoch + std::chrono::nanoseconds(20)};

reqs.emplace_back(
request1_timestamp,
std::vector<std::chrono::time_point<std::chrono::system_clock>>{
response1_timestamp});

auto request2_timestamp{clock_epoch + std::chrono::nanoseconds(3)};
auto response2_timestamp{clock_epoch + std::chrono::nanoseconds(15)};
reqs.emplace_back(
request2_timestamp,
std::vector<std::chrono::time_point<std::chrono::system_clock>>{
response2_timestamp});

auto request3_timestamp{clock_epoch + std::chrono::nanoseconds(7)};
auto response3_timestamp{clock_epoch + std::chrono::nanoseconds(17)};
reqs.emplace_back(
request3_timestamp,
std::vector<std::chrono::time_point<std::chrono::system_clock>>{
response3_timestamp});

auto window = tip.ClampWindow(reqs);

CHECK(window.first == 3);
CHECK(window.second == 20);
}

TEST_CASE("summarize_client_stat: testing the SummarizeClientStat function")
{
MockInferenceProfiler mock_inference_profiler{};
Expand Down

0 comments on commit 7eb8eb4

Please sign in to comment.