Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accepts as valid responses to staged requests. #171

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,15 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.

## Changelog

### Boost 1.85

* Fixes [issue 170](https://github.com/boostorg/redis/issues/170).
Under load and on low-latency networks it is possible to start
receiving responses before the write operation completed and while
the request is still marked as staged and not written. This messes
up with the heuristics that classifies responses as unsolicied or
not.

### Boost 1.84 (First release in Boost)

* Deprecates the `async_receive` overload that takes a response. Users
Expand Down
131 changes: 64 additions & 67 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct exec_op {
asio::coroutine coro{};

template <class Self>
void operator()(Self& self , system::error_code ec = {})
void operator()(Self& self , system::error_code ec = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro)
{
Expand All @@ -130,7 +130,6 @@ struct exec_op {
EXEC_OP_WAIT:
BOOST_ASIO_CORO_YIELD
info_->async_wait(std::move(self));
BOOST_ASSERT(ec == asio::error::operation_aborted);

if (info_->ec_) {
self.complete(info_->ec_, 0);
Expand All @@ -140,18 +139,18 @@ struct exec_op {
if (info_->stop_requested()) {
// Don't have to call remove_request as it has already
// been by cancel(exec).
return self.complete(ec, 0);
return self.complete(asio::error::operation_aborted, 0);
}

if (is_cancelled(self)) {
if (info_->is_written()) {
if (!info_->is_waiting()) {
using c_t = asio::cancellation_type;
auto const c = self.get_cancellation_state().cancelled();
if ((c & c_t::terminal) != c_t::none) {
// Cancellation requires closing the connection
// otherwise it stays in inconsistent state.
conn_->cancel(operation::run);
return self.complete(ec, 0);
return self.complete(asio::error::operation_aborted, 0);
} else {
// Can't implement other cancelation types, ignoring.
self.get_cancellation_state().clear();
Expand All @@ -163,7 +162,7 @@ struct exec_op {
} else {
// Cancelation can be honored.
conn_->remove_request(info_);
self.complete(ec, 0);
self.complete(asio::error::operation_aborted, 0);
return;
}
}
Expand Down Expand Up @@ -516,6 +515,7 @@ class connection_base {
using runner_type = runner<executor_type>;
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
using exec_notifier_type = receive_channel_type;

auto use_ssl() const noexcept
{ return runner_.get_config().use_ssl;}
Expand All @@ -527,10 +527,10 @@ class connection_base {
{
BOOST_ASSERT(ptr != nullptr);

if (ptr->is_written()) {
return !ptr->req_->get_config().cancel_if_unresponded;
} else {
if (ptr->is_waiting()) {
return !ptr->req_->get_config().cancel_on_connection_lost;
} else {
return !ptr->req_->get_config().cancel_if_unresponded;
}
};

Expand All @@ -544,7 +544,7 @@ class connection_base {

reqs_.erase(point, std::end(reqs_));
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->reset_status();
return ptr->mark_waiting();
});

return ret;
Expand All @@ -555,7 +555,7 @@ class connection_base {
auto f = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
return ptr->is_written();
return !ptr->is_waiting();
};

auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
Expand Down Expand Up @@ -615,25 +615,15 @@ class connection_base {
using node_type = resp3::basic_node<std::string_view>;
using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;

enum class action
{
stop,
proceed,
none,
};

explicit req_info(request const& req, adapter_type adapter, executor_type ex)
: timer_{ex}
, action_{action::none}
: notifier_{ex, 1}
, req_{&req}
, adapter_{}
, expected_responses_{req.get_expected_responses()}
, status_{status::none}
, status_{status::waiting}
, ec_{{}}
, read_size_{0}
{
timer_.expires_at((std::chrono::steady_clock::time_point::max)());

adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
{
auto const i = req_->get_expected_responses() - expected_responses_;
Expand All @@ -643,18 +633,16 @@ class connection_base {

auto proceed()
{
timer_.cancel();
action_ = action::proceed;
notifier_.try_send(std::error_code{}, 0);
}

void stop()
{
timer_.cancel();
action_ = action::stop;
notifier_.close();
}

[[nodiscard]] auto is_waiting_write() const noexcept
{ return !is_written() && !is_staged(); }
[[nodiscard]] auto is_waiting() const noexcept
{ return status_ == status::waiting; }

[[nodiscard]] auto is_written() const noexcept
{ return status_ == status::written; }
Expand All @@ -668,27 +656,26 @@ class connection_base {
void mark_staged() noexcept
{ status_ = status::staged; }

void reset_status() noexcept
{ status_ = status::none; }
void mark_waiting() noexcept
{ status_ = status::waiting; }

[[nodiscard]] auto stop_requested() const noexcept
{ return action_ == action::stop;}
{ return !notifier_.is_open();}

template <class CompletionToken>
auto async_wait(CompletionToken token)
{
return timer_.async_wait(std::move(token));
return notifier_.async_receive(std::move(token));
}

//private:
enum class status
{ none
{ waiting
, staged
, written
};

timer_type timer_;
action action_;
exec_notifier_type notifier_;
request const* req_;
wrapped_adapter_type adapter_;

Expand Down Expand Up @@ -716,7 +703,7 @@ class connection_base {
void cancel_push_requests()
{
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
});

std::for_each(point, std::end(reqs_), [](auto const& ptr) {
Expand All @@ -737,7 +724,7 @@ class connection_base {

if (info->req_->has_hello_priority()) {
auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
return e->is_waiting_write();
return e->is_waiting();
});

std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
Expand Down Expand Up @@ -781,7 +768,7 @@ class connection_base {
// Coalesces the requests and marks them staged. After a
// successful write staged requests will be marked as written.
auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
return !ri->is_waiting_write();
return !ri->is_waiting();
});

std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
Expand All @@ -798,7 +785,14 @@ class connection_base {

bool is_waiting_response() const noexcept
{
return !std::empty(reqs_) && reqs_.front()->is_written();
if (std::empty(reqs_))
return false;

// Under load and on low-latency networks we might start
// receiving responses before the write operation completed and
// the request is still maked as staged and not written. See
// https://github.com/boostorg/redis/issues/170
return !reqs_.front()->is_waiting();
}

void close()
Expand All @@ -814,36 +808,39 @@ class connection_base {

auto is_next_push()
{
// We handle unsolicited events in the following way
//
// 1. Its resp3 type is a push.
//
// 2. A non-push type is received with an empty requests
// queue. I have noticed this is possible (e.g. -MISCONF).
// I expect them to have type push so we can distinguish
// them from responses to commands, but it is a
// simple-error. If we are lucky enough to receive them
// when the command queue is empty we can treat them as
// server pushes, otherwise it is impossible to handle
// them properly
//
// 3. The request does not expect any response but we got
// one. This may happen if for example, subscribe with
// wrong syntax.
//
// Useful links:
BOOST_ASSERT(!read_buffer_.empty());

// Useful links to understand the heuristics below.
//
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
//

BOOST_ASSERT(!read_buffer_.empty());

return
(resp3::to_type(read_buffer_.front()) == resp3::type::push)
|| reqs_.empty()
|| (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
|| !is_waiting_response(); // Added to deal with MONITOR.
// - https://github.com/boostorg/redis/issues/170

// The message's resp3 type is a push.
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
return true;

// This is non-push type and the requests queue is empty. I have
// noticed this is possible, for example with -MISCONF. I don't
// know why they are not sent with a push type so we can
// distinguish them from responses to commands. If we are lucky
// enough to receive them when the command queue is empty they
// can be treated as server pushes, otherwise it is impossible
// to handle them properly
if (reqs_.empty())
return true;

// The request does not expect any response but we got one. This
// may happen if for example, subscribe with wrong syntax.
if (reqs_.front()->expected_responses_ == 0)
return true;

// Added to deal with MONITOR and also to fix PR170 which
// happens under load and on low-latency networks, where we
// might start receiving responses before the write operation
// completed and the request is still maked as staged and not
// written.
return reqs_.front()->is_waiting();
}

auto get_suggested_buffer_growth() const noexcept
Expand Down
28 changes: 14 additions & 14 deletions include/boost/redis/request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,31 @@ class request {
public:
/// Request configuration options.
struct config {
/** \brief If `true`
* `boost::redis::connection::async_exec` will complete with error if the
* connection is lost. Affects only requests that haven't been
* sent yet.
/** \brief If `true` calls to `connection::async_exec` will
* complete with error if the connection is lost while the
* request hasn't been sent yet.
*/
bool cancel_on_connection_lost = true;

/** \brief If `true` the request will complete with
* boost::redis::error::not_connected if `async_exec` is called before
* the connection with Redis was established.
/** \brief If `true` `connection::async_exec` will complete with
* `boost::redis::error::not_connected` if the call happens
* before the connection with Redis was established.
*/
bool cancel_if_not_connected = false;

/** \brief If `false` `boost::redis::connection::async_exec` will not
/** \brief If `false` `connection::async_exec` will not
* automatically cancel this request if the connection is lost.
* Affects only requests that have been written to the socket
* but remained unresponded when `boost::redis::connection::async_run`
* completed.
* but remained unresponded when
* `boost::redis::connection::async_run` completed.
*/
bool cancel_if_unresponded = true;

/** \brief If this request has a `HELLO` command and this flag is
* `true`, the `boost::redis::connection` will move it to the front of
* the queue of awaiting requests. This makes it possible to
* send `HELLO` and authenticate before other commands are sent.
/** \brief If this request has a `HELLO` command and this flag
* is `true`, the `boost::redis::connection` will move it to the
* front of the queue of awaiting requests. This makes it
* possible to send `HELLO` and authenticate before other
* commands are sent.
*/
bool hello_with_priority = true;
};
Expand Down
Loading