diff --git a/README.md b/README.md index 69335443..436aa687 100644 --- a/README.md +++ b/README.md @@ -676,6 +676,15 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ## Changelog +### Boost 1.85 + +* Fixes [issue 170](https://github.com/boostorg/redis/issues/170). + Under load and on low-latency networks it is possible to start + receiving responses before the write operation completed and while + the request is still marked as staged and not written. This messes + up with the heuristics that classifies responses as unsolicied or + not. + ### Boost 1.84 (First release in Boost) * Deprecates the `async_receive` overload that takes a response. Users diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index a954c0c8..3e9a461a 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -113,7 +113,7 @@ struct exec_op { asio::coroutine coro{}; template - void operator()(Self& self , system::error_code ec = {}) + void operator()(Self& self , system::error_code ec = {}, std::size_t = 0) { BOOST_ASIO_CORO_REENTER (coro) { @@ -130,7 +130,6 @@ struct exec_op { EXEC_OP_WAIT: BOOST_ASIO_CORO_YIELD info_->async_wait(std::move(self)); - BOOST_ASSERT(ec == asio::error::operation_aborted); if (info_->ec_) { self.complete(info_->ec_, 0); @@ -140,18 +139,18 @@ struct exec_op { if (info_->stop_requested()) { // Don't have to call remove_request as it has already // been by cancel(exec). - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } if (is_cancelled(self)) { - if (info_->is_written()) { + if (!info_->is_waiting()) { using c_t = asio::cancellation_type; auto const c = self.get_cancellation_state().cancelled(); if ((c & c_t::terminal) != c_t::none) { // Cancellation requires closing the connection // otherwise it stays in inconsistent state. conn_->cancel(operation::run); - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } else { // Can't implement other cancelation types, ignoring. self.get_cancellation_state().clear(); @@ -163,7 +162,7 @@ struct exec_op { } else { // Cancelation can be honored. conn_->remove_request(info_); - self.complete(ec, 0); + self.complete(asio::error::operation_aborted, 0); return; } } @@ -516,6 +515,7 @@ class connection_base { using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; using receiver_adapter_type = std::function const&, system::error_code&)>; + using exec_notifier_type = receive_channel_type; auto use_ssl() const noexcept { return runner_.get_config().use_ssl;} @@ -527,10 +527,10 @@ class connection_base { { BOOST_ASSERT(ptr != nullptr); - if (ptr->is_written()) { - return !ptr->req_->get_config().cancel_if_unresponded; - } else { + if (ptr->is_waiting()) { return !ptr->req_->get_config().cancel_on_connection_lost; + } else { + return !ptr->req_->get_config().cancel_if_unresponded; } }; @@ -544,7 +544,7 @@ class connection_base { reqs_.erase(point, std::end(reqs_)); std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return ptr->reset_status(); + return ptr->mark_waiting(); }); return ret; @@ -555,7 +555,7 @@ class connection_base { auto f = [](auto const& ptr) { BOOST_ASSERT(ptr != nullptr); - return ptr->is_written(); + return !ptr->is_waiting(); }; auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); @@ -615,25 +615,15 @@ class connection_base { using node_type = resp3::basic_node; using wrapped_adapter_type = std::function; - enum class action - { - stop, - proceed, - none, - }; - explicit req_info(request const& req, adapter_type adapter, executor_type ex) - : timer_{ex} - , action_{action::none} + : notifier_{ex, 1} , req_{&req} , adapter_{} , expected_responses_{req.get_expected_responses()} - , status_{status::none} + , status_{status::waiting} , ec_{{}} , read_size_{0} { - timer_.expires_at((std::chrono::steady_clock::time_point::max)()); - adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) { auto const i = req_->get_expected_responses() - expected_responses_; @@ -643,18 +633,16 @@ class connection_base { auto proceed() { - timer_.cancel(); - action_ = action::proceed; + notifier_.try_send(std::error_code{}, 0); } void stop() { - timer_.cancel(); - action_ = action::stop; + notifier_.close(); } - [[nodiscard]] auto is_waiting_write() const noexcept - { return !is_written() && !is_staged(); } + [[nodiscard]] auto is_waiting() const noexcept + { return status_ == status::waiting; } [[nodiscard]] auto is_written() const noexcept { return status_ == status::written; } @@ -668,27 +656,26 @@ class connection_base { void mark_staged() noexcept { status_ = status::staged; } - void reset_status() noexcept - { status_ = status::none; } + void mark_waiting() noexcept + { status_ = status::waiting; } [[nodiscard]] auto stop_requested() const noexcept - { return action_ == action::stop;} + { return !notifier_.is_open();} template auto async_wait(CompletionToken token) { - return timer_.async_wait(std::move(token)); + return notifier_.async_receive(std::move(token)); } //private: enum class status - { none + { waiting , staged , written }; - timer_type timer_; - action action_; + exec_notifier_type notifier_; request const* req_; wrapped_adapter_type adapter_; @@ -716,7 +703,7 @@ class connection_base { void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); + return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { @@ -737,7 +724,7 @@ class connection_base { if (info->req_->has_hello_priority()) { auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { - return e->is_waiting_write(); + return e->is_waiting(); }); std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); @@ -781,7 +768,7 @@ class connection_base { // Coalesces the requests and marks them staged. After a // successful write staged requests will be marked as written. auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { - return !ri->is_waiting_write(); + return !ri->is_waiting(); }); std::for_each(point, std::cend(reqs_), [this](auto const& ri) { @@ -798,7 +785,14 @@ class connection_base { bool is_waiting_response() const noexcept { - return !std::empty(reqs_) && reqs_.front()->is_written(); + if (std::empty(reqs_)) + return false; + + // Under load and on low-latency networks we might start + // receiving responses before the write operation completed and + // the request is still maked as staged and not written. See + // https://github.com/boostorg/redis/issues/170 + return !reqs_.front()->is_waiting(); } void close() @@ -814,36 +808,39 @@ class connection_base { auto is_next_push() { - // We handle unsolicited events in the following way - // - // 1. Its resp3 type is a push. - // - // 2. A non-push type is received with an empty requests - // queue. I have noticed this is possible (e.g. -MISCONF). - // I expect them to have type push so we can distinguish - // them from responses to commands, but it is a - // simple-error. If we are lucky enough to receive them - // when the command queue is empty we can treat them as - // server pushes, otherwise it is impossible to handle - // them properly - // - // 3. The request does not expect any response but we got - // one. This may happen if for example, subscribe with - // wrong syntax. - // - // Useful links: + BOOST_ASSERT(!read_buffer_.empty()); + + // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 - // - - BOOST_ASSERT(!read_buffer_.empty()); - - return - (resp3::to_type(read_buffer_.front()) == resp3::type::push) - || reqs_.empty() - || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0) - || !is_waiting_response(); // Added to deal with MONITOR. + // - https://github.com/boostorg/redis/issues/170 + + // The message's resp3 type is a push. + if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + return true; + + // This is non-push type and the requests queue is empty. I have + // noticed this is possible, for example with -MISCONF. I don't + // know why they are not sent with a push type so we can + // distinguish them from responses to commands. If we are lucky + // enough to receive them when the command queue is empty they + // can be treated as server pushes, otherwise it is impossible + // to handle them properly + if (reqs_.empty()) + return true; + + // The request does not expect any response but we got one. This + // may happen if for example, subscribe with wrong syntax. + if (reqs_.front()->expected_responses_ == 0) + return true; + + // Added to deal with MONITOR and also to fix PR170 which + // happens under load and on low-latency networks, where we + // might start receiving responses before the write operation + // completed and the request is still maked as staged and not + // written. + return reqs_.front()->is_waiting(); } auto get_suggested_buffer_growth() const noexcept diff --git a/include/boost/redis/request.hpp b/include/boost/redis/request.hpp index ebc94a22..0e62e0a9 100644 --- a/include/boost/redis/request.hpp +++ b/include/boost/redis/request.hpp @@ -47,31 +47,31 @@ class request { public: /// Request configuration options. struct config { - /** \brief If `true` - * `boost::redis::connection::async_exec` will complete with error if the - * connection is lost. Affects only requests that haven't been - * sent yet. + /** \brief If `true` calls to `connection::async_exec` will + * complete with error if the connection is lost while the + * request hasn't been sent yet. */ bool cancel_on_connection_lost = true; - /** \brief If `true` the request will complete with - * boost::redis::error::not_connected if `async_exec` is called before - * the connection with Redis was established. + /** \brief If `true` `connection::async_exec` will complete with + * `boost::redis::error::not_connected` if the call happens + * before the connection with Redis was established. */ bool cancel_if_not_connected = false; - /** \brief If `false` `boost::redis::connection::async_exec` will not + /** \brief If `false` `connection::async_exec` will not * automatically cancel this request if the connection is lost. * Affects only requests that have been written to the socket - * but remained unresponded when `boost::redis::connection::async_run` - * completed. + * but remained unresponded when + * `boost::redis::connection::async_run` completed. */ bool cancel_if_unresponded = true; - /** \brief If this request has a `HELLO` command and this flag is - * `true`, the `boost::redis::connection` will move it to the front of - * the queue of awaiting requests. This makes it possible to - * send `HELLO` and authenticate before other commands are sent. + /** \brief If this request has a `HELLO` command and this flag + * is `true`, the `boost::redis::connection` will move it to the + * front of the queue of awaiting requests. This makes it + * possible to send `HELLO` and authenticate before other + * commands are sent. */ bool hello_with_priority = true; }; diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index 5e135cac..c3f04134 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -6,6 +6,7 @@ #include #include +#include #define BOOST_TEST_MODULE conn-exec #include #include @@ -17,12 +18,13 @@ // container. namespace net = boost::asio; +using boost::redis::config; using boost::redis::connection; -using boost::redis::request; -using boost::redis::response; using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::operation; +using boost::redis::request; +using boost::redis::response; // Sends three requests where one of them has a hello with a priority // set, which means it should be executed first. @@ -153,3 +155,36 @@ BOOST_AUTO_TEST_CASE(correct_database) BOOST_CHECK_EQUAL(cfg.database_index.value(), index); } +BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170) +{ + // See https://github.com/boostorg/redis/issues/170 + + std::string payload; + payload.resize(1024); + std::fill(std::begin(payload), std::end(payload), 'A'); + + net::io_context ioc; + auto conn = std::make_shared(ioc); + + auto cfg = make_test_config(); + cfg.health_check_interval = std::chrono::seconds(0); + conn->async_run(cfg, {}, net::detached); + + int counter = 0; + int const repeat = 8000; + + for (int i = 0; i < repeat; ++i) { + auto req = std::make_shared(); + req->push("PING", payload); + conn->async_exec(*req, ignore, [req, &counter, conn](auto ec, auto) { + BOOST_TEST(!ec); + if (++counter == repeat) + conn->cancel(); + }); + } + + ioc.run(); + + BOOST_CHECK_EQUAL(counter, repeat); +} + diff --git a/test/test_conn_exec_retry.cpp b/test/test_conn_exec_retry.cpp index c464b342..99f68c39 100644 --- a/test/test_conn_exec_retry.cpp +++ b/test/test_conn_exec_retry.cpp @@ -57,12 +57,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false) auto c2 = [&](auto ec, auto){ std::cout << "c2" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c1 = [&](auto ec, auto){ std::cout << "c1" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c0 = [&](auto ec, auto){ diff --git a/test/test_conn_quit.cpp b/test/test_conn_quit.cpp index fcd580b9..9d5dd2f3 100644 --- a/test/test_conn_quit.cpp +++ b/test/test_conn_quit.cpp @@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits) auto c3 = [](auto ec, auto) { std::clog << "c3: " << ec.message() << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c2 = [&](auto ec, auto) diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index 5b45a127..d4605d5a 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -99,7 +99,7 @@ auto async_test_reconnect_timeout() -> net::awaitable std::cout << "ccc" << std::endl; - BOOST_CHECK_EQUAL(ec1, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec1, boost::asio::error::operation_aborted); } BOOST_AUTO_TEST_CASE(test_reconnect_and_idle)