Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cross-shard consumption fairness #2294

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/io_tester/ioinfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ int main(int ac, char** av) {
out << YAML::EndMap;

const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx);
out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold();

const auto& tb = fg.token_bucket();
out << YAML::Key << "token_bucket" << YAML::BeginMap;
Expand Down
44 changes: 40 additions & 4 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/util/shared_token_bucket.hh>

#include <chrono>
Expand Down Expand Up @@ -108,6 +109,7 @@ public:
// a 'normalized' form -- converted from floating-point to fixed-point number
// and scaled accrding to fair-group's token-bucket duration
using capacity_t = uint64_t;
using signed_capacity_t = std::make_signed_t<capacity_t>;
friend class fair_queue;

private:
Expand Down Expand Up @@ -138,6 +140,7 @@ public:
class fair_group {
public:
using capacity_t = fair_queue_entry::capacity_t;
using signed_capacity_t = fair_queue_entry::signed_capacity_t;
using clock_type = std::chrono::steady_clock;

/*
Expand Down Expand Up @@ -189,7 +192,7 @@ public:
* time period for which the speeds from F (in above formula) are taken.
*/

static constexpr float fixed_point_factor = float(1 << 24);
static constexpr float fixed_point_factor = float(1 << 21);
using rate_resolution = std::milli;
using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;

Expand All @@ -215,10 +218,21 @@ private:
*/

token_bucket_t _token_bucket;
const capacity_t _per_tick_threshold;

// Capacities accumulated by queues in this group. Each queue tries not
// to run too far ahead of the others, if it does -- it skips dispatch
// loop until next tick in the hope that other shards would grab the
// unused disk capacity and will move their counters forward.
std::vector<capacity_t> _balance;

public:

// Maximum value the _balance entry can get
// It's also set when a queue goes idle and doesn't need to participate
// in accumulated races. This value is still suitable for comparisons
// of "active" queues
static constexpr capacity_t max_balance = std::numeric_limits<signed_capacity_t>::max();

// Convert internal capacity value back into the real token
static double capacity_tokens(capacity_t cap) noexcept {
return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
Expand Down Expand Up @@ -251,7 +265,6 @@ public:
fair_group(fair_group&&) = delete;

capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
capacity_t grab_capacity(capacity_t cap) noexcept;
clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
void replenish_capacity(clock_type::time_point now) noexcept;
Expand All @@ -265,6 +278,10 @@ public:
}

const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }

capacity_t current_balance() const noexcept;
void update_balance(capacity_t) noexcept;
void reset_balance() noexcept;
};

/// \brief Fair queuing class
Expand Down Expand Up @@ -300,7 +317,7 @@ public:
using class_id = unsigned int;
class priority_class_data;
using capacity_t = fair_group::capacity_t;
using signed_capacity_t = std::make_signed_t<capacity_t>;
using signed_capacity_t = fair_queue_entry::signed_capacity_t;

private:
using clock_type = std::chrono::steady_clock;
Expand Down Expand Up @@ -330,6 +347,23 @@ private:
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
size_t _nr_classes = 0;
capacity_t _last_accumulated = 0;
capacity_t _total_accumulated = 0;

// Amortize balance checking by assuming that once balance achieved,
// it would remain such for the "quiscent" duration. Increase this
// duration every time the assumption keeps true, but not more than
// tau. When the balance is lost, reset back to frequent checks.
static constexpr auto minimal_quiscent_duration = std::chrono::microseconds(100);
std::chrono::microseconds _balance_quiscent_duration = minimal_quiscent_duration;
timer<lowres_clock> _balance_timer;
// Maximum capacity that a queue can stay behind other shards
//
// This is similar to priority classes fall-back deviation and it's
// calculated as the number of capacity points a group with 1 share
// accumulates over tau
//
// Check max_deviation math in push_priority_class_from_idle())
const capacity_t _max_imbalance;

/*
* When the shared capacity os over the local queue delays
Expand Down Expand Up @@ -360,6 +394,8 @@ private:
enum class grab_result { grabbed, cant_preempt, pending };
grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;

bool balanced() noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
Expand Down
66 changes: 61 additions & 5 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ module seastar;

namespace seastar {

logger fq_log("fair_queue");

static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
Expand Down Expand Up @@ -107,7 +109,7 @@ fair_group::fair_group(config cfg, unsigned nr_queues)
std::max<capacity_t>(fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)),
tokens_capacity(cfg.min_tokens)
)
, _per_tick_threshold(_token_bucket.limit() / nr_queues)
, _balance(smp::count, max_balance)
{
if (tokens_capacity(cfg.min_tokens) > _token_bucket.threshold()) {
throw std::runtime_error("Fair-group replenisher limit is lower than threshold");
Expand Down Expand Up @@ -137,6 +139,21 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity
return _token_bucket.deficiency(from);
}

auto fair_group::current_balance() const noexcept -> capacity_t {
return *std::min_element(_balance.begin(), _balance.end());
}

void fair_group::update_balance(capacity_t acc) noexcept {
_balance[this_shard_id()] = acc;
}

void fair_group::reset_balance() noexcept {
// Request cost can be up to half a million. Given 100K iops disk and a
// class with 1 share, the 64-bit accumulating counter would overflows
// once in few years.
on_internal_error_noexcept(fq_log, "cannot reset group balance");
}

// Priority class, to be used with a given fair_queue
class fair_queue::priority_class_data {
friend class fair_queue;
Expand Down Expand Up @@ -165,7 +182,15 @@ fair_queue::fair_queue(fair_group& group, config cfg)
: _config(std::move(cfg))
, _group(group)
, _group_replenish(clock_type::now())
, _balance_timer([this] {
auto new_qd = _balance_quiscent_duration * 2;
_balance_quiscent_duration = std::min(new_qd, _config.tau);
})
, _max_imbalance(fair_group::fixed_point_factor * fair_group::token_bucket_t::rate_cast(_config.tau).count())
{
if (fair_group::max_balance > std::numeric_limits<capacity_t>::max() - _max_imbalance) {
throw std::runtime_error("Too large tau parameter");
}
}

fair_queue::~fair_queue() {
Expand Down Expand Up @@ -194,6 +219,12 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept
// over signed maximum (see overflow check below)
pc._accumulated = std::max<signed_capacity_t>(_last_accumulated - max_deviation, pc._accumulated);
_handles.assert_enough_capacity();
if (_handles.empty()) {
capacity_t balance = _group.current_balance();
if (balance != fair_group::max_balance) {
_total_accumulated = std::max<signed_capacity_t>(balance - _max_imbalance, _total_accumulated);
}
}
_handles.push(&pc);
pc._queued = true;
}
Expand Down Expand Up @@ -330,11 +361,29 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept
return std::chrono::steady_clock::time_point::max();
}

bool fair_queue::balanced() noexcept {
if (_balance_timer.armed()) {
return true;
}

capacity_t balance = _group.current_balance();
if (_total_accumulated > balance + _max_imbalance) {
_balance_quiscent_duration = minimal_quiscent_duration;
return false;
}

_balance_timer.arm(_balance_quiscent_duration);
return true;
}

void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;

while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
if (!balanced()) {
return;
}

while (!_handles.empty()) {
priority_class_data& h = *_handles.top();
if (h._queue.empty() || !h._plugged) {
pop_priority_class(h);
Expand Down Expand Up @@ -362,7 +411,7 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
// has chances to be translated into zero cost which, in turn, will make the
// class show no progress and monopolize the queue.
auto req_cap = req._capacity;
auto req_cost = std::max(req_cap / h._shares, (capacity_t)1);
auto req_cost = std::max((req_cap + h._shares - 1) / h._shares, (capacity_t)1);
// signed overflow check to make push_priority_class_from_idle math work
if (h._accumulated >= std::numeric_limits<signed_capacity_t>::max() - req_cost) {
for (auto& pc : _priority_classes) {
Expand All @@ -376,9 +425,9 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
_last_accumulated = 0;
}
_total_accumulated += req_cost;
h._accumulated += req_cost;
h._pure_accumulated += req_cap;
dispatched += req_cap;

cb(req);

Expand All @@ -390,6 +439,13 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
for (auto&& h : preempt) {
push_priority_class(*h);
}

if (_total_accumulated >= fair_group::max_balance) [[unlikely]] {
_group.reset_balance();
_total_accumulated = 0;
_balance_quiscent_duration = minimal_quiscent_duration;
}
_group.update_balance(_handles.empty() ? fair_group::max_balance : _total_accumulated);
}

std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) {
Expand Down
Loading