Skip to content

Commit

Permalink
Merge 'Implement cross-shard consumption fairness'
Browse files Browse the repository at this point in the history
scylladb/seastar#2294

* xemul/br-io-queue-cross-shard-fairness:
  test: Add manual test for cross-shard balancing
  fair_queue: Amortize cross-shard balance checking
  fair_queue: Drop per-dispatch-loop threshold
  fair_queue: Introduce group-wide capacity balancing
  fair_queue: Define signed_capacity_t type in fair_group
  fair_queue tests: Remember it is time-based
  fair_queue: Scale fixed-point factor
  • Loading branch information
xemul committed Aug 15, 2024
2 parents ae05c13 + fb850f9 commit 502c703
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 24 deletions.
1 change: 0 additions & 1 deletion apps/io_tester/ioinfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ int main(int ac, char** av) {
out << YAML::EndMap;

const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx);
out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold();

const auto& tb = fg.token_bucket();
out << YAML::Key << "token_bucket" << YAML::BeginMap;
Expand Down
44 changes: 40 additions & 4 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/util/shared_token_bucket.hh>

#include <chrono>
Expand Down Expand Up @@ -108,6 +109,7 @@ public:
// a 'normalized' form -- converted from floating-point to fixed-point number
// and scaled accrding to fair-group's token-bucket duration
using capacity_t = uint64_t;
using signed_capacity_t = std::make_signed<capacity_t>::type;
friend class fair_queue;

private:
Expand Down Expand Up @@ -138,6 +140,7 @@ public:
class fair_group {
public:
using capacity_t = fair_queue_entry::capacity_t;
using signed_capacity_t = fair_queue_entry::signed_capacity_t;
using clock_type = std::chrono::steady_clock;

/*
Expand Down Expand Up @@ -189,7 +192,7 @@ public:
* time period for which the speeds from F (in above formula) are taken.
*/

static constexpr float fixed_point_factor = float(1 << 24);
static constexpr float fixed_point_factor = float(1 << 21);
using rate_resolution = std::milli;
using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;

Expand All @@ -215,10 +218,21 @@ private:
*/

token_bucket_t _token_bucket;
const capacity_t _per_tick_threshold;

// Capacities accumulated by queues in this group. Each queue tries not
// to run too far ahead of the others, if it does -- it skips dispatch
// loop until next tick in the hope that other shards would grab the
// unused disk capacity and will move their counters forward.
std::vector<capacity_t> _balance;

public:

// Maximum value the _balance entry can get
// It's also set when a queue goes idle and doesn't need to participate
// in accumulated races. This value is still suitable for comparisons
// of "active" queues
static constexpr capacity_t max_balance = std::numeric_limits<signed_capacity_t>::max();

// Convert internal capacity value back into the real token
static double capacity_tokens(capacity_t cap) noexcept {
return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
Expand Down Expand Up @@ -252,7 +266,6 @@ public:
fair_group(fair_group&&) = delete;

capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
capacity_t grab_capacity(capacity_t cap) noexcept;
clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
void replenish_capacity(clock_type::time_point now) noexcept;
Expand All @@ -266,6 +279,10 @@ public:
}

const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }

capacity_t current_balance() const noexcept;
void update_balance(capacity_t) noexcept;
void reset_balance() noexcept;
};

/// \brief Fair queuing class
Expand Down Expand Up @@ -301,7 +318,7 @@ public:
using class_id = unsigned int;
class priority_class_data;
using capacity_t = fair_group::capacity_t;
using signed_capacity_t = std::make_signed<capacity_t>::type;
using signed_capacity_t = fair_queue_entry::signed_capacity_t;

private:
using clock_type = std::chrono::steady_clock;
Expand Down Expand Up @@ -331,6 +348,23 @@ private:
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
size_t _nr_classes = 0;
capacity_t _last_accumulated = 0;
capacity_t _total_accumulated = 0;

// Amortize balance checking by assuming that once balance achieved,
// it would remain such for the "quiscent" duration. Increase this
// duration every time the assumption keeps true, but not more than
// tau. When the balance is lost, reset back to frequent checks.
static constexpr auto minimal_quiscent_duration = std::chrono::microseconds(100);
std::chrono::microseconds _balance_quiscent_duration = minimal_quiscent_duration;
timer<lowres_clock> _balance_timer;
// Maximum capacity that a queue can stay behind other shards
//
// This is similar to priority classes fall-back deviation and it's
// calculated as the number of capacity points a group with 1 share
// accumulates over tau
//
// Check max_deviation math in push_priority_class_from_idle())
const capacity_t _max_imbalance;

/*
* When the shared capacity os over the local queue delays
Expand Down Expand Up @@ -361,6 +395,8 @@ private:
enum class grab_result { grabbed, cant_preempt, pending };
grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;

bool balanced() noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
Expand Down
70 changes: 65 additions & 5 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ module seastar;

namespace seastar {

logger fq_log("fair_queue");

static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
Expand Down Expand Up @@ -107,7 +109,7 @@ fair_group::fair_group(config cfg, unsigned nr_queues)
std::max<capacity_t>(cfg.rate_factor * fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)),
tokens_capacity(cfg.min_tokens)
)
, _per_tick_threshold(_token_bucket.limit() / nr_queues)
, _balance(smp::count, max_balance)
{
if (cfg.rate_factor * fixed_point_factor > _token_bucket.max_rate) {
throw std::runtime_error("Fair-group rate_factor is too large");
Expand Down Expand Up @@ -141,6 +143,21 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity
return _token_bucket.deficiency(from);
}

auto fair_group::current_balance() const noexcept -> capacity_t {
return *std::min_element(_balance.begin(), _balance.end());
}

void fair_group::update_balance(capacity_t acc) noexcept {
_balance[this_shard_id()] = acc;
}

void fair_group::reset_balance() noexcept {
// Request cost can be up to half a million. Given 100K iops disk and a
// class with 1 share, the 64-bit accumulating counter would overflows
// once in few years.
on_internal_error_noexcept(fq_log, "cannot reset group balance");
}

// Priority class, to be used with a given fair_queue
class fair_queue::priority_class_data {
friend class fair_queue;
Expand Down Expand Up @@ -169,7 +186,15 @@ fair_queue::fair_queue(fair_group& group, config cfg)
: _config(std::move(cfg))
, _group(group)
, _group_replenish(clock_type::now())
, _balance_timer([this] {
auto new_qd = _balance_quiscent_duration * 2;
_balance_quiscent_duration = std::min(new_qd, _config.tau);
})
, _max_imbalance(fair_group::fixed_point_factor * fair_group::token_bucket_t::rate_cast(_config.tau).count())
{
if (fair_group::max_balance > std::numeric_limits<capacity_t>::max() - _max_imbalance) {
throw std::runtime_error("Too large tau parameter");
}
}

fair_queue::fair_queue(fair_queue&& other)
Expand All @@ -181,6 +206,10 @@ fair_queue::fair_queue(fair_queue&& other)
, _handles(std::move(other._handles))
, _priority_classes(std::move(other._priority_classes))
, _last_accumulated(other._last_accumulated)
, _total_accumulated(other._total_accumulated)
, _balance_quiscent_duration(other._balance_quiscent_duration)
, _balance_timer(std::move(other._balance_timer))
, _max_imbalance(other._max_imbalance)
{
}

Expand Down Expand Up @@ -210,6 +239,12 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept
// over signed maximum (see overflow check below)
pc._accumulated = std::max<signed_capacity_t>(_last_accumulated - max_deviation, pc._accumulated);
_handles.assert_enough_capacity();
if (_handles.empty()) {
capacity_t balance = _group.current_balance();
if (balance != fair_group::max_balance) {
_total_accumulated = std::max<signed_capacity_t>(balance - _max_imbalance, _total_accumulated);
}
}
_handles.push(&pc);
pc._queued = true;
}
Expand Down Expand Up @@ -346,11 +381,29 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept
return std::chrono::steady_clock::time_point::max();
}

bool fair_queue::balanced() noexcept {
if (_balance_timer.armed()) {
return true;
}

capacity_t balance = _group.current_balance();
if (_total_accumulated > balance + _max_imbalance) {
_balance_quiscent_duration = minimal_quiscent_duration;
return false;
}

_balance_timer.arm(_balance_quiscent_duration);
return true;
}

void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;

while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
if (!balanced()) {
return;
}

while (!_handles.empty()) {
priority_class_data& h = *_handles.top();
if (h._queue.empty() || !h._plugged) {
pop_priority_class(h);
Expand Down Expand Up @@ -378,7 +431,7 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
// has chances to be translated into zero cost which, in turn, will make the
// class show no progress and monopolize the queue.
auto req_cap = req._capacity;
auto req_cost = std::max(req_cap / h._shares, (capacity_t)1);
auto req_cost = std::max((req_cap + h._shares - 1) / h._shares, (capacity_t)1);
// signed overflow check to make push_priority_class_from_idle math work
if (h._accumulated >= std::numeric_limits<signed_capacity_t>::max() - req_cost) {
for (auto& pc : _priority_classes) {
Expand All @@ -392,9 +445,9 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
_last_accumulated = 0;
}
_total_accumulated += req_cost;
h._accumulated += req_cost;
h._pure_accumulated += req_cap;
dispatched += req_cap;

cb(req);

Expand All @@ -406,6 +459,13 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
for (auto&& h : preempt) {
push_priority_class(*h);
}

if (_total_accumulated >= fair_group::max_balance) [[unlikely]] {
_group.reset_balance();
_total_accumulated = 0;
_balance_quiscent_duration = minimal_quiscent_duration;
}
_group.update_balance(_handles.empty() ? fair_group::max_balance : _total_accumulated);
}

std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) {
Expand Down
Loading

0 comments on commit 502c703

Please sign in to comment.