From 22405e6cfbb957e53e870d17707cc1eaa38d3893 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 16 Dec 2023 20:56:22 +0100 Subject: [PATCH 1/7] Accepts as valid responses to staged requests. Before these changes the request had to be marked as written in order to interpret incoming responses as belonging to that request. On fast networks however, like on localhost and underload the responses might arrive before the write operation completed. --- README.md | 9 ++ .../boost/redis/detail/connection_base.hpp | 131 +++++++++--------- include/boost/redis/request.hpp | 28 ++-- test/test_conn_exec.cpp | 39 +++++- test/test_conn_exec_retry.cpp | 4 +- test/test_conn_quit.cpp | 2 +- test/test_conn_reconnect.cpp | 2 +- 7 files changed, 128 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index 69335443..436aa687 100644 --- a/README.md +++ b/README.md @@ -676,6 +676,15 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ## Changelog +### Boost 1.85 + +* Fixes [issue 170](https://github.com/boostorg/redis/issues/170). + Under load and on low-latency networks it is possible to start + receiving responses before the write operation completed and while + the request is still marked as staged and not written. This messes + up with the heuristics that classifies responses as unsolicied or + not. + ### Boost 1.84 (First release in Boost) * Deprecates the `async_receive` overload that takes a response. Users diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index a954c0c8..3e9a461a 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -113,7 +113,7 @@ struct exec_op { asio::coroutine coro{}; template - void operator()(Self& self , system::error_code ec = {}) + void operator()(Self& self , system::error_code ec = {}, std::size_t = 0) { BOOST_ASIO_CORO_REENTER (coro) { @@ -130,7 +130,6 @@ struct exec_op { EXEC_OP_WAIT: BOOST_ASIO_CORO_YIELD info_->async_wait(std::move(self)); - BOOST_ASSERT(ec == asio::error::operation_aborted); if (info_->ec_) { self.complete(info_->ec_, 0); @@ -140,18 +139,18 @@ struct exec_op { if (info_->stop_requested()) { // Don't have to call remove_request as it has already // been by cancel(exec). - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } if (is_cancelled(self)) { - if (info_->is_written()) { + if (!info_->is_waiting()) { using c_t = asio::cancellation_type; auto const c = self.get_cancellation_state().cancelled(); if ((c & c_t::terminal) != c_t::none) { // Cancellation requires closing the connection // otherwise it stays in inconsistent state. conn_->cancel(operation::run); - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } else { // Can't implement other cancelation types, ignoring. self.get_cancellation_state().clear(); @@ -163,7 +162,7 @@ struct exec_op { } else { // Cancelation can be honored. conn_->remove_request(info_); - self.complete(ec, 0); + self.complete(asio::error::operation_aborted, 0); return; } } @@ -516,6 +515,7 @@ class connection_base { using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; using receiver_adapter_type = std::function const&, system::error_code&)>; + using exec_notifier_type = receive_channel_type; auto use_ssl() const noexcept { return runner_.get_config().use_ssl;} @@ -527,10 +527,10 @@ class connection_base { { BOOST_ASSERT(ptr != nullptr); - if (ptr->is_written()) { - return !ptr->req_->get_config().cancel_if_unresponded; - } else { + if (ptr->is_waiting()) { return !ptr->req_->get_config().cancel_on_connection_lost; + } else { + return !ptr->req_->get_config().cancel_if_unresponded; } }; @@ -544,7 +544,7 @@ class connection_base { reqs_.erase(point, std::end(reqs_)); std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return ptr->reset_status(); + return ptr->mark_waiting(); }); return ret; @@ -555,7 +555,7 @@ class connection_base { auto f = [](auto const& ptr) { BOOST_ASSERT(ptr != nullptr); - return ptr->is_written(); + return !ptr->is_waiting(); }; auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); @@ -615,25 +615,15 @@ class connection_base { using node_type = resp3::basic_node; using wrapped_adapter_type = std::function; - enum class action - { - stop, - proceed, - none, - }; - explicit req_info(request const& req, adapter_type adapter, executor_type ex) - : timer_{ex} - , action_{action::none} + : notifier_{ex, 1} , req_{&req} , adapter_{} , expected_responses_{req.get_expected_responses()} - , status_{status::none} + , status_{status::waiting} , ec_{{}} , read_size_{0} { - timer_.expires_at((std::chrono::steady_clock::time_point::max)()); - adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) { auto const i = req_->get_expected_responses() - expected_responses_; @@ -643,18 +633,16 @@ class connection_base { auto proceed() { - timer_.cancel(); - action_ = action::proceed; + notifier_.try_send(std::error_code{}, 0); } void stop() { - timer_.cancel(); - action_ = action::stop; + notifier_.close(); } - [[nodiscard]] auto is_waiting_write() const noexcept - { return !is_written() && !is_staged(); } + [[nodiscard]] auto is_waiting() const noexcept + { return status_ == status::waiting; } [[nodiscard]] auto is_written() const noexcept { return status_ == status::written; } @@ -668,27 +656,26 @@ class connection_base { void mark_staged() noexcept { status_ = status::staged; } - void reset_status() noexcept - { status_ = status::none; } + void mark_waiting() noexcept + { status_ = status::waiting; } [[nodiscard]] auto stop_requested() const noexcept - { return action_ == action::stop;} + { return !notifier_.is_open();} template auto async_wait(CompletionToken token) { - return timer_.async_wait(std::move(token)); + return notifier_.async_receive(std::move(token)); } //private: enum class status - { none + { waiting , staged , written }; - timer_type timer_; - action action_; + exec_notifier_type notifier_; request const* req_; wrapped_adapter_type adapter_; @@ -716,7 +703,7 @@ class connection_base { void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); + return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { @@ -737,7 +724,7 @@ class connection_base { if (info->req_->has_hello_priority()) { auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { - return e->is_waiting_write(); + return e->is_waiting(); }); std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); @@ -781,7 +768,7 @@ class connection_base { // Coalesces the requests and marks them staged. After a // successful write staged requests will be marked as written. auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { - return !ri->is_waiting_write(); + return !ri->is_waiting(); }); std::for_each(point, std::cend(reqs_), [this](auto const& ri) { @@ -798,7 +785,14 @@ class connection_base { bool is_waiting_response() const noexcept { - return !std::empty(reqs_) && reqs_.front()->is_written(); + if (std::empty(reqs_)) + return false; + + // Under load and on low-latency networks we might start + // receiving responses before the write operation completed and + // the request is still maked as staged and not written. See + // https://github.com/boostorg/redis/issues/170 + return !reqs_.front()->is_waiting(); } void close() @@ -814,36 +808,39 @@ class connection_base { auto is_next_push() { - // We handle unsolicited events in the following way - // - // 1. Its resp3 type is a push. - // - // 2. A non-push type is received with an empty requests - // queue. I have noticed this is possible (e.g. -MISCONF). - // I expect them to have type push so we can distinguish - // them from responses to commands, but it is a - // simple-error. If we are lucky enough to receive them - // when the command queue is empty we can treat them as - // server pushes, otherwise it is impossible to handle - // them properly - // - // 3. The request does not expect any response but we got - // one. This may happen if for example, subscribe with - // wrong syntax. - // - // Useful links: + BOOST_ASSERT(!read_buffer_.empty()); + + // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 - // - - BOOST_ASSERT(!read_buffer_.empty()); - - return - (resp3::to_type(read_buffer_.front()) == resp3::type::push) - || reqs_.empty() - || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0) - || !is_waiting_response(); // Added to deal with MONITOR. + // - https://github.com/boostorg/redis/issues/170 + + // The message's resp3 type is a push. + if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + return true; + + // This is non-push type and the requests queue is empty. I have + // noticed this is possible, for example with -MISCONF. I don't + // know why they are not sent with a push type so we can + // distinguish them from responses to commands. If we are lucky + // enough to receive them when the command queue is empty they + // can be treated as server pushes, otherwise it is impossible + // to handle them properly + if (reqs_.empty()) + return true; + + // The request does not expect any response but we got one. This + // may happen if for example, subscribe with wrong syntax. + if (reqs_.front()->expected_responses_ == 0) + return true; + + // Added to deal with MONITOR and also to fix PR170 which + // happens under load and on low-latency networks, where we + // might start receiving responses before the write operation + // completed and the request is still maked as staged and not + // written. + return reqs_.front()->is_waiting(); } auto get_suggested_buffer_growth() const noexcept diff --git a/include/boost/redis/request.hpp b/include/boost/redis/request.hpp index ebc94a22..0e62e0a9 100644 --- a/include/boost/redis/request.hpp +++ b/include/boost/redis/request.hpp @@ -47,31 +47,31 @@ class request { public: /// Request configuration options. struct config { - /** \brief If `true` - * `boost::redis::connection::async_exec` will complete with error if the - * connection is lost. Affects only requests that haven't been - * sent yet. + /** \brief If `true` calls to `connection::async_exec` will + * complete with error if the connection is lost while the + * request hasn't been sent yet. */ bool cancel_on_connection_lost = true; - /** \brief If `true` the request will complete with - * boost::redis::error::not_connected if `async_exec` is called before - * the connection with Redis was established. + /** \brief If `true` `connection::async_exec` will complete with + * `boost::redis::error::not_connected` if the call happens + * before the connection with Redis was established. */ bool cancel_if_not_connected = false; - /** \brief If `false` `boost::redis::connection::async_exec` will not + /** \brief If `false` `connection::async_exec` will not * automatically cancel this request if the connection is lost. * Affects only requests that have been written to the socket - * but remained unresponded when `boost::redis::connection::async_run` - * completed. + * but remained unresponded when + * `boost::redis::connection::async_run` completed. */ bool cancel_if_unresponded = true; - /** \brief If this request has a `HELLO` command and this flag is - * `true`, the `boost::redis::connection` will move it to the front of - * the queue of awaiting requests. This makes it possible to - * send `HELLO` and authenticate before other commands are sent. + /** \brief If this request has a `HELLO` command and this flag + * is `true`, the `boost::redis::connection` will move it to the + * front of the queue of awaiting requests. This makes it + * possible to send `HELLO` and authenticate before other + * commands are sent. */ bool hello_with_priority = true; }; diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index 5e135cac..c3f04134 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -6,6 +6,7 @@ #include #include +#include #define BOOST_TEST_MODULE conn-exec #include #include @@ -17,12 +18,13 @@ // container. namespace net = boost::asio; +using boost::redis::config; using boost::redis::connection; -using boost::redis::request; -using boost::redis::response; using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::operation; +using boost::redis::request; +using boost::redis::response; // Sends three requests where one of them has a hello with a priority // set, which means it should be executed first. @@ -153,3 +155,36 @@ BOOST_AUTO_TEST_CASE(correct_database) BOOST_CHECK_EQUAL(cfg.database_index.value(), index); } +BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170) +{ + // See https://github.com/boostorg/redis/issues/170 + + std::string payload; + payload.resize(1024); + std::fill(std::begin(payload), std::end(payload), 'A'); + + net::io_context ioc; + auto conn = std::make_shared(ioc); + + auto cfg = make_test_config(); + cfg.health_check_interval = std::chrono::seconds(0); + conn->async_run(cfg, {}, net::detached); + + int counter = 0; + int const repeat = 8000; + + for (int i = 0; i < repeat; ++i) { + auto req = std::make_shared(); + req->push("PING", payload); + conn->async_exec(*req, ignore, [req, &counter, conn](auto ec, auto) { + BOOST_TEST(!ec); + if (++counter == repeat) + conn->cancel(); + }); + } + + ioc.run(); + + BOOST_CHECK_EQUAL(counter, repeat); +} + diff --git a/test/test_conn_exec_retry.cpp b/test/test_conn_exec_retry.cpp index c464b342..99f68c39 100644 --- a/test/test_conn_exec_retry.cpp +++ b/test/test_conn_exec_retry.cpp @@ -57,12 +57,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false) auto c2 = [&](auto ec, auto){ std::cout << "c2" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c1 = [&](auto ec, auto){ std::cout << "c1" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c0 = [&](auto ec, auto){ diff --git a/test/test_conn_quit.cpp b/test/test_conn_quit.cpp index fcd580b9..9d5dd2f3 100644 --- a/test/test_conn_quit.cpp +++ b/test/test_conn_quit.cpp @@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits) auto c3 = [](auto ec, auto) { std::clog << "c3: " << ec.message() << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c2 = [&](auto ec, auto) diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index 5b45a127..d4605d5a 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -99,7 +99,7 @@ auto async_test_reconnect_timeout() -> net::awaitable std::cout << "ccc" << std::endl; - BOOST_CHECK_EQUAL(ec1, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec1, boost::asio::error::operation_aborted); } BOOST_AUTO_TEST_CASE(test_reconnect_and_idle) From 2685d44ff2865404fd86f22d1722a5e82317aae0 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 17 Dec 2023 22:22:38 +0100 Subject: [PATCH 2/7] Article about the costs of async abstractions. --- doc/on-the-costs-of-async-abstractions.md | 671 ++++++++++++++++++++++ 1 file changed, 671 insertions(+) create mode 100644 doc/on-the-costs-of-async-abstractions.md diff --git a/doc/on-the-costs-of-async-abstractions.md b/doc/on-the-costs-of-async-abstractions.md new file mode 100644 index 00000000..55615318 --- /dev/null +++ b/doc/on-the-costs-of-async-abstractions.md @@ -0,0 +1,671 @@ +# On the costs of asynchronous abstractions + +The biggest force behind the evolution of +[Boost.Redis](https://github.com/boostorg/redis) was my struggling in +coming up with a high-level connection abstraction that was capable of +multiplexing Redis commands from independent sources while +concurrently handling server pushes. This journey taught me many +important lessons, many of which are related to the design and +performance of asynchronous programs based on Boost.Asio. + +In this article I will share some of the lessons learned, specially +those related to the performance costs of _abstractions_ such as +`async_read_until` that tend to overschedule into the event-loop. In +this context I will also briefly comment on how the topics discussed +here influenced my views on the proposed +[P2300](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2300r7.html) +(a.k.a. Senders and Receivers), which is likely to become the basis of +networking in upcoming C++ standards. + +Although the analysis presented here uses the Redis communication +protocol for illustration I expect it to be useful in general since +[RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) shares +many similarities with other widely used protocols such as HTTP. + +## Parsing `\r\n`-delimited messages + +The Redis server communicates with its clients by exchanging data +serialized in +[RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) format. +Among the data types supported by this specification, the +`\r\n`-delimited messages are some of the most frequent in a typical +session. The table below shows some examples + + Command | Response | Wire format | RESP3 name + ---------|----------|---------------|--------------------- + PING | PONG | `+PONG\r\n` | simple-string + INCR | 42 | `:42\r\n` | number + GET | null | `_\r\n` | null + +Redis also supports command pipelines, which provide a way of +optimizing round-trip times by batching commands. A pipeline composed +by the commands shown in the previous table look like this + +``` + | Sent in a | + | single write | ++--------+ | | +-------+ +| | --------> PING + INCR + GET --------> | | +| | | | +| Client | | Redis | +| | | | +| | <-------- "+PONG\r\n:42\r\n_\r\n" <-------- | | ++--------+ |<------>|<---->|<-->| +-------+ + | | + | Responses | +``` + +Messages that use delimiters are so common in networking that a +facility called `async_read_until` for reading them incrementally from +a socket is already part of Boost.Asio. The coroutine below uses it to +print message contents to the screen + +```cpp +awaitable parse_resp3_simple_msgs(tcp::socket socket) +{ + for (std::string buffer;;) { + auto n = co_await async_read_until(socket, dynamic_buffer(buffer), "\r\n"); + + std::cout << buffer.substr(1, n - 3) << std::endl; + + // Consume the buffer. + buffer.erase(0, n); + } +} +``` + +If we pay attention to the buffer content as it is parsed by the code +above we can see it is rotated fairly often, for example + +``` + "+PONG\r\n:100\r\n+OK\r\n_\r\n" + ":100\r\n+OK\r\n_\r\n" + "+OK\r\n_\r\n" + "_\r\n" + "" +``` + +When I first realized these, apparently excessive, buffer rotations I +was concerned they would impact the performance of Boost.Redis in a +severe way. To measure the magnitude of this impact I came up with an +experimental implementation of Asio's `dynamic_buffer` that consumed +the buffer less eagerly than the `std::string::erase` function used +above. For that, the implementation increased a buffer offset up +to a certain threshold and only then triggered a (larger) rotation. +This is illustrated in the diagram below + +``` + |<---- offset threshold ---->| + | | + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + | # Initial message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<------>| # After 1st message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<-------------->| # After 2nd message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<--------------------->| # After 3rd message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<-------------------------->| # 4th message crosses the threashold + + "+PONG\r\n" + | # After rotation +``` + +After comparing the performance differences between the two versions I +was surprised there wasn't any! But that was also very suspicious +since some RESP3 aggregate types contain a considerable number of +separators. For example, a map with two pairs `[(key1, value1), +(key2, value2)]` encoded in RESP3 requires ten rotations in total + +``` + "%2\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + "$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + "key1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + "$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + ... +``` + +It was evident something more costly was shadowing the buffer +rotations. But it couldn't be the search for the separator since it +performs equivalently to rotations. It is also easy to show that the +overhead is not related to any IO operation since the problem persists +if the buffer is never consumed (which causes the function to be +called with the same string repeatedly). Once these two factors +are removed from the table, we are driven into the conclusion that +calling `async_read_until` has an intrinsic cost, let us see what +that is. + +### Async operations that complete synchronously considered harmful + +Assume the scenario described earlier where `async_read_until` is used +to parse multiple `\r\n`-delimited messages. The following is a +detailed description of what happens behind the scenes + + 1. `async_read_until` calls `socket.async_read_some` repeatedly + until the separator `\r\n` shows up in the buffer + +``` + "" # Read 1: needs more data. + "" # Read 2: needs more data. + "" # Read 3: needs more data. + "" # Read 4: needs more data. + "\r\n" # separator found, done. +``` + + 2. The last call to `socket.async_read_some` happens to read past + the separator `\r\n` (depicted as `` above), + resulting in bonus (maybe incomplete) messages in the buffer + +``` + | 1st async_read_some | 2nd async_read_some | + | | | + "+message content here \r\n:100\r\n+OK\r\n_\r\n+incomplete respo" + | | | | + | Message wanted |<-- bonus msgs --->|<--incomplete-->| + | | msg | + | | | + | |<---------- bonus bytes ----------->| +``` + + 3. The buffer is consumed and `async_read_until` is called again. + However, since the buffer already contains the next message this + is an IO-less call + +``` + ":100\r\n+OK\r\n_\r\n+not enough byt" + | | | + | No IO required | Need more | + | to parse these | data | + | messages. | | +``` + +The fact that step 3. doesn't perform any IO implies the operation can +complete synchronously, but because this is an asynchronous function +Boost.Asio by default won't call the continuation before the +function returns. The implementation must therefore enqueue it for +execution, as depicted below + +``` + OP5 ---> OP4 ---> OP3 ---> OP2 ---> OP1 # Reschedules the continuation + | + OP1 schedules its continuation | + +-----------------------------------+ + | + | + OP6 ---> OP5 ---> OP4 ---> OP3 ---> OP2 # Reschedules the continuation + | + OP2 schedules its continuation | + +-----------------------------------+ + | + | + OP7 ---> OP6 ---> OP5 ---> OP4 ---> OP3 +``` + +When summed up, the excessive rescheduling of continuations lead to +performance degradation at scale. But since this is an event-loop +there is no way around rescheduling as doing otherwise would mean +allowing a task to monopolize the event-loop, preventing other tasks +from making progress. The best that can be done is to avoid +_overscheduling_, so let us determine how much rescheduling is too +much. + +## The intrinsic latency of an event-loop + +An event-loop is a design pattern originally used to handle events +external to the application, such as GUIs, networking and other forms +of IO. If we take this literally, it becomes evident that the way +`async_read_until` works is incompatible with an event-loop since +_searching for the separator_ is not an external event and as such +should not have to be enqueued for execution. + +Once we constrain ourselves to events that have an external origin, +such as anything related to IO and including any form of IPC, the +scheduling overhead is reduced considerably since the latency +of the transport layer eclipses whatever time it takes to schedule the +continuation, for example, according to +[these](https://www.boost.org/doc/libs/develop/libs/cobalt/doc/html/index.html#posting_to_an_executor) +benchmarks, the time it takes to schedule a task in the +`asio::io_context ` is approximately `50ns`. + +To give the reader an idea about the magnitude of this number, if +rescheduling alone were to account for 1% of the runtime of an app +that uses asynchronous IO to move around data in chunks of size 128kb, +then this app would have a throughput of approximately 24Gbs. At such +high throughput multiple other factors kick in before any scheduling +overhead even starts to manifest. + +It is therefore safe to say that only asynchronous operations that +don't perform or are not bound to any IO are ever likely to +overschedule in the sense described above. Those cases can be usually +avoided, this is what worked for Boost.Redis + + 1. `async_read_until` was replaced with calls to + `socket.async_read_some` and an incremental parser that does not + do any IO. + + 2. Channel `try_` functions are used to check if send and receive + operations can be called without suspension. For example, + `try_send` before `async_send` and `try_receive` before + `async_receive` ([see also](https://github.com/chriskohlhoff/asio/commit/fe4fd7acf145335eeefdd19708483c46caeb45e5) + `try_send_via_dispatch` for a more aggressive optimization). + + 3. Coalescing of individual requests into a single payload to reduce + the number of necessary writes on the socket,this is only + possible because Redis supports pipelining (good protocols + help!). + + 4. Increased the socket read sizes to 4kb to reduce the number of + reads (which is outweighed by the costs of rotating data in the + buffer). + + 5. Dropped the `resp3::async_read` abstraction. When I started + developing Boost.Redis there was convincing precedent for having + a `resp3::async_read` function to read complete RESP3 messages + from a socket + + Name | Description + ---------------------------------------|------------------- + `asio::ip::tcp::async_read` | Reads `n` bytes from a stream. + `beast::http::async_read` | Reads a complete HTTP message. + `beast::websocket::stream::async_read` | Reads a complete Websocket message. + `redis::async_read` | Reads a complete RESP3 message. + + It turns out however that this function is also vulnerable to + immediate completions since in command pipelines multiple + responses show up in the buffer after a call to + `socket.async_read_some`. When that happens each call to + `resp3::async_read` is IO-less. + +Sometimes it is not possible to avoid asynchronous operations that +complete synchronously, in the following sections we will therefore +see how favoring throughput over fairness works in Boost.Asio. + +### Calling the continuation inline + +In Boost.Asio it is possible to customize how an algorithm executes +the continuation when an immediate completion occurs, this includes +the ability of calling it inline, thereby avoiding the costs of +excessive rescheduling. Here is how it works + +```cpp +// (default) The continuation is enqueued for execution, regardless of +// whether it is immediate or not. +async_read_until(socket, buffer, "\r\n", continuation); + +// Immediate completions are executed in exec2 (otherwise equal to the +// version above). The completion is called inline if exec2 is the +same // executor that is running the operation. +async_read_until(socket, buffer, "\r\n", bind_immediate_executor(exec2, completion)); +``` + +To compare the performance of both cases I have written a small +function that calls `async_read_until` in a loop with a buffer that is +never consumed so that all completions are immediate. The version +below uses the default behaviour + +```cpp +void read_safe(tcp::socket& s, std::string& buffer) +{ + auto continuation = [&s, &buffer](auto ec, auto n) + { + read_safe(s, buffer); // Recursive call + }; + + // This won't cause stack exhaustion because the continuation is + // not called inline but posted in the event loop. + async_read_until(s, dynamic_buffer(buffer), "\r\n", continuation); +} +``` + +To optimize away some of the rescheduling the version below uses the +`bind_immediate_executor` customization to call the continuation +reentrantly and then breaks the stack from time to time to avoid +exhausting it + +```cpp +void read_reentrant(tcp::socket& s, std::string& buffer) +{ + auto cont = [&](auto, auto) + { + read_reentrant(s, buffer); // Recursive call + }; + + // Breaks the callstack after 16 inline calls. + if (counter % 16 == 0) { + post(s.get_executor(), [cont](){cont({}, 0);}); + return; + } + + // Continuation called reentrantly. + async_read_until(s, dynamic_buffer(buffer), "\r\n", + bind_immediate_executor(s.get_executor(), cont)); +} +``` + +The diagram below shows what the reentrant chain of calls in the code +above look like from the event-loop point of view + +``` + OP5 ---> OP4 ---> OP3 ---> OP2 ---> OP1a # Completes immediately + | + | + ... | + OP1b # Completes immediately + | + Waiting for OP5 to | + reschedule its | + continuation OP1c # Completes immediately + | + | + ... | + OP1d # Break the call-stack + | + +-----------------------------------+ + | + OP6 ---> OP5 ---> OP4 ---> OP3 ---> OP2 +``` + +Unsurprisingly, the reentrant code is 3x faster than the one that +relies on the default behaviour (don't forget that this is a best case +scenario, in the general case not all completions are immediate). +Although faster, this strategy has some downsides + + - The overall operation is not as fast as possible since it still + has to reschedule from time to time to break the call stack. The + less it reschedules the higher the risk of exhausting it. + + - It is too easy to forget to break the stack. For example, the + programmer might decide to branch somewhere into another chain of + asynchronous calls that also use this strategy. To avoid + exhaustion all such branches would have to be safeguarded with a + manual rescheduling i.e. `post`. + + - Requires additional layers of complexity such as + `bind_immediate_executor` in addition to `bind_executor`. + + - Not compliat with more strict + [guidelines](https://en.wikipedia.org/wiki/The_Power_of_10:_Rules_for_Developing_Safety-Critical_Code) + that prohibits reentrat code. + + - There is no simple way of choosing the maximum allowed number of + reentrant calls for each function in a way that covers different + use cases and users. Library writers and users would be tempted + into using a small value reducing the performance advantage. + + - If the socket is always ready for reading the task will + monopolize IO for up to `16` interactions which might cause + stutter in unrelated tasks as depicted below + +``` + Unfairness + + +----+----+----+ +----+----+----+ +----+----+----+ +Socket-1 | | | | | | | | | | | | + +----+----+----+----+----+----+----+----+----+----+----+----+ +Socket-2 | | | | | | + +----+ +----+ +----+ +``` + +From the aesthetic point of view the code above is also unpleasant as +it breaks the function asynchronous contract by injecting a reentrant +behaviour. It gives me the same kind of feeling I have about +[recursive +mutexes](http://www.zaval.org/resources/library/butenhof1.html). + +Note: It is worth mentioning here that a similar +[strategy](https://github.com/NVIDIA/stdexec/blob/6f23dd5b1d523541ce28af32fc2603403ebd36ed/include/exec/trampoline_scheduler.hpp#L52) +is used to break the call stack of repeating algorithms in +[stdexec](https://github.com/NVIDIA/stdexec), but in this time +based on +[P2300](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2300r7.html) +and not on Boost.Asio. + +### Coroutine tail-calls + +In the previous section we have seen how to avoid overscheduling by +instructing the asynchronous operation to call the completion inline +on immediate completion. It turns out however that coroutine support +for _tail-calls_ provide a way to completely sidestep this problem. +This feature is described by +[Backer](https://lewissbaker.github.io/2020/05/11/understanding_symmetric_transfer) +as follows + +> A tail-call is one where the current stack-frame is popped before +> the call and the current function’s return address becomes the +> return-address for the callee. ie. the callee will return directly +> the the [sic] caller of this function. + +This means (at least in principle) that a library capable of using +tail-calls when an immediate completion occurs neither has to +reschedule the continuation nor call it inline. To test how this +feature compares to the other styles I have used Boost.Cobalt. The +code looks as follows + +```cpp +// Warning: risks unfairness and starvation of other tasks. +task read_until_unfair() +{ + for (int i = 0; i != repeat; ++i) { + co_await async_read_until(s, dynamic_buffer(buffer), "\r\n", cobalt::use_op); + } +} +``` + +The result of this comparison as listed in the table below + +Time/s | Style | Configuration | Library +-------|-----------|-----------------------------|------------- + 1,0 | Coroutine | `await_ready` optimization | Boost.Cobalt + 4.8 | Callback | Reentant | Boost.Asio +10.3 | Coroutine | `use_op` | Boost.Cobalt +14.9 | Callback | Regular | Boost.Asio +15.6 | Coroutine | `asio::deferred` | Boost.Asio + +As the reader can see, `cobalt::use_op` ranks 3rd and is considerably +faster (10.3 vs 15.6) than the Asio equivalent that uses +default-rescheduling. However, by trading rescheduling with tail-calls +the code above can now monopolize the event-loop, resulting in +unfairness if the socket happens to receive data at a higher rate +than other tasks. If by chance data is received continuously +on a socket that is always ready for reading, other tasks will starve + +``` + Starvation + + +----+----+----+----+----+----+----+----+----+----+----+----+ +Socket-1 | | | | | | | | | | | | | + +----+----+----+----+----+----+----+----+----+----+----+----+ + +Socket-2 Starving ... + +``` + +To avoid this problem the programmer is forced to reschedule from time +to time, in the same way we did for the reentrant calls + +```cpp +task read_until_fair() +{ + for (int i = 0; i != repeat; ++i) { + if (repeat % 16 == 0) { + // Reschedules to address unfairness and starvation of + // other tasks. + co_await post(cobalt::use_op); + continue; + } + + co_await async_read_until(s, dynamic_buffer(buffer), "\r\n", cobalt::use_op); + } +} +``` + +Delegating fairness-safety to applications is a dangerous game. +This is a +[problem](https://tokio.rs/blog/2020-04-preemption) the Tokio +community had to deal with before Tokio runtime started enforcing +rescheduling (after 256 successful operations) + +> If data is received faster than it can be processed, it is possible +> that more data will have already been received by the time the +> processing of a data chunk completes. In this case, .await will +> never yield control back to the scheduler, other tasks will not be +> scheduled, resulting in starvation and large latency variance. + +> Currently, the answer to this problem is that the user of Tokio is +> responsible for adding yield points in both the application and +> libraries. In practice, very few actually do this and end up being +> vulnerable to this sort of problem. + +### Safety in P2300 (Senders and Receivers) + +As of this writing, the C++ standards committee (WG21) has been +pursuing the standardization of a networking library for almost 20 +years. One of the biggest obstacles that prevented it from happening +was a disagreement on what the _asynchronous model_ that underlies +networking should look like. Until 2021 that model was basically +Boost.Asio _executors_, but in this +[poll](https://www.reddit.com/r/cpp/comments/q6tgod/c_committee_polling_results_for_asynchronous/) +the committee decided to abandon that front and concentrate efforts on +the new [P2300](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2300r7.html) +proposal, also known as _senders and receivers_. The decision was +quite [abrupt](https://isocpp.org/files/papers/P2464R0.html) + +> The original plan about a week earlier than the actual writing of +> this paper was to write a paper that makes a case for standardizing +> the Networking TS. + +and opinions turned out to be very strong against Boost.Asio (see +[this](https://api.csswg.org/bikeshed/?force=1&url=https://raw.githubusercontent.com/brycelelbach/wg21_p2459_2022_january_library_evolution_poll_outcomes/main/2022_january_library_evolution_poll_outcomes.bs) +for how each voter backed their vote) + +> The whole concept is completely useless, there's no composed code +> you can write with it. + +The part of that debate that interests us most here is stated in +[P2471](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p2471r1.pdf), +that compares Boost.Asio with P2300 + +> Yes, default rescheduling each operation and default not +> rescheduling each operation, is a poor trade off. IMO both options +> are poor. The one good option that I know of that can prevent stack +> exhaustion is first-class tail-recursion in library or language + +> ASIO has chosen to require that every async operation must schedule +> the completion on a scheduler (every read, every write, etc..). + +> sender/receiver has not decided to +> require that the completion be scheduled. + +> This is why I consider tail-call the only good solution. Scheduling +> solutions are all inferior (give thanks to Lewis for this shift in +> my understanding :) ). + +Although tail-calls solve the problem of stack-exhaustion as we have +seen above, it makes the code vulnerable to unfairness and starvation +and therefore it is not an alternative to default-rescheduling as the +quotation above is implying. To deal with the lack of +default-rescheduling, libraries and applications built on top of P2300 +have to address the aforementioned problems, layer after layer. For +example, +[stdexec](https://github.com/NVIDIA/stdexec) has invented something +called +_[trampoline-scheduler](https://github.com/NVIDIA/stdexec/blob/e7cd275273525dbc693f4bf5f6dc4d4181b639e4/include/exec/trampoline_scheduler.hpp)_ +to protect repeating algorithms such as `repeat_effect_until` from +exhausting the stack. This construct however is built around +reentracy, allowing +[sixteen](https://github.com/NVIDIA/stdexec/blob/83cdb92d316e8b3bca1357e2cf49fc39e9bed403/include/exec/trampoline_scheduler.hpp#L52) +levels of inline calls by default. While in Boost.Asio it is possible to use +reentracy as an optimization for a corner cases, here it is made its +_modus operandi_, my opinion about this has already been stated in a +previous section so I won't repeat it here. + +Also the fact that a special scheduler is needed by specific +algorithms is a problem on its own since it contradicts one of the +main selling points of P2300 which is that of being _generic_. For +example, [P2464R0](https://isocpp.org/files/papers/P2464R0.html) uses +the code below as an example + +```cpp +void +run_that_io_operation( + scheduler auto sched, + sender_of auto wrapping_continuation) +{ + // snip +} +``` + +and states + +> I have no idea what the sched's concrete type is. I have no idea +> what the wrapping_continuation's concrete type is. They're none of +> my business, ... + +Hence, by being generic, the algorithms built on top of P2300 are also +unsafe (against stack-exhaustion, unfairness and starvation). Otherwise, +if library writers require a specific scheduler to ensure safety, then +the algorithms become automatically non-generic, pick your poison! + +The proposers of P2300 claim that it doesn't address safety because it +should be seen as the low-level building blocks of asynchronous +programming and that its the role of higher-level libraries, to deal +with that. This claim however does not hold since, as we have just +seen, Boost.Asio also provides those building blocks but does so in a +safe way. In fact during the whole development of Boost.Redis I never +had to think about these kinds of problems because safety is built +from the ground up. + +### Avoiding coroutine suspension with `await_ready` + +Now let us get back to the first place in the table above, which uses +the `await_ready` optimization from Boost.Cobalt. This API provides +users with the ability to avoid coroutine suspension altogether in +case the separator is already present in the buffer. It works by +defining a `struct` with the following interface + +```cpp +struct read_until : cobalt::op { + ... + + void ready(cobalt::handler handler) override + { + // Search for the separator in buffer and call the handler if found + } + + void initiate(cobalt::completion_handler complete) override + { + // Regular call to async_read_until. + async_read_until(socket, buffer, delim, std::move(complete)); + } +}; +``` + +and the code that uses it + +```cpp +for (int i = 0; i != repeat; ++i) { + co_await read_until(socket, dynamic_buffer(buffer)); +} +``` + +In essence, what the code above does is to skip a call to +`async_read_unil` by first checking with the ready function whether +the forthcoming operation is going to complete immediately. The +nice thing about it is that the programmer can use this optimization +only when a performance bottleneck is detected, without planing for it +in advance. The drawback however is that it requires reimplementing +the search for the separator in the body of the `ready` function, +defeating the purpose of using `async_read_until` in first place as +(again) it would have been simpler to reformulate the operation in +terms of `socket.async_read_some` directly. + +## Acknowledgements + +Thanks to Klemens Morgenstern for answering questions about +Boost.Cobalt. + From be1a1fb80d081919e9811575d3f1f77e976c1751 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Thu, 28 Dec 2023 23:09:39 +0100 Subject: [PATCH 3/7] In-tree cmake builds instead of FindBoost. --- CMakeLists.txt | 69 ++++++++++++++++++++++++++++++++++++++++-- example/CMakeLists.txt | 5 ++- example/cpp20_json.cpp | 4 +-- test/CMakeLists.txt | 3 +- 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f442d55c..60d07175 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,13 +19,76 @@ target_compile_features(boost_redis INTERFACE cxx_std_17) # Dependencies if (BOOST_REDIS_MAIN_PROJECT) - # If we're the root project, error if a dependency is not found - find_package(Boost 1.83 REQUIRED COMPONENTS headers) + # TODO: Understand why we have to list all dependencies below + # instead of + #set(BOOST_INCLUDE_LIBRARIES redis) + #set(BOOST_EXCLUDE_LIBRARIES redis) + #add_subdirectory(../.. boostorg/boost EXCLUDE_FROM_ALL) + + set(deps + system + assert + config + throw_exception + asio + variant2 + mp11 + winapi + predef + align + context + core + coroutine + static_assert + pool + date_time + smart_ptr + exception + integer + move + type_traits + algorithm + utility + io + lexical_cast + numeric/conversion + mpl + range + tokenizer + tuple + array + bind + concept_check + function + iterator + regex + unordered + preprocessor + container + conversion + container_hash + detail + optional + function_types + fusion + intrusive + describe + typeof + functional + test + json + ) + + foreach(dep IN LISTS deps) + add_subdirectory(../${dep} boostorg/${dep}) + endforeach() + find_package(Threads REQUIRED) find_package(OpenSSL REQUIRED) target_link_libraries(boost_redis INTERFACE - Boost::headers + Boost::system + Boost::asio Threads::Threads OpenSSL::Crypto OpenSSL::SSL diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 16531341..a727b75b 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -10,6 +10,9 @@ macro(make_example EXAMPLE_NAME STANDARD) if (${STANDARD} STREQUAL "20") target_link_libraries(${EXAMPLE_NAME} PRIVATE examples_main) endif() + if (${STANDARD} STREQUAL "20") + target_link_libraries(${EXAMPLE_NAME} PRIVATE Boost::json) + endif() endmacro() macro(make_testable_example EXAMPLE_NAME STANDARD) @@ -46,4 +49,4 @@ endif() if (NOT MSVC) make_example(cpp20_chat_room 20) -endif() \ No newline at end of file +endif() diff --git a/example/cpp20_json.cpp b/example/cpp20_json.cpp index 2f0674f0..261a3f2f 100644 --- a/example/cpp20_json.cpp +++ b/example/cpp20_json.cpp @@ -15,13 +15,11 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -#define BOOST_JSON_NO_LIB -#define BOOST_CONTAINER_NO_LIB #include #include #include +#include #include -#include namespace asio = boost::asio; using namespace boost::describe; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6d615c5b..40edb275 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -23,6 +23,7 @@ macro(make_test TEST_NAME STANDARD) boost_redis_src boost_redis_tests_common boost_redis_project_options + Boost::unit_test_framework ) target_compile_features(${EXE_NAME} PRIVATE cxx_std_${STANDARD}) add_test(${EXE_NAME} ${EXE_NAME}) @@ -70,4 +71,4 @@ add_custom_target( COMMAND ${COVERAGE_HTML_COMMAND} COMMENT "Generating coverage report" VERBATIM -) \ No newline at end of file +) From e041661b7ed93cedd12e178c6cff3bc67a16387e Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 30 Dec 2023 21:39:31 +0100 Subject: [PATCH 4/7] Adds missing ssl-context getters. --- README.md | 10 +++++++++- include/boost/redis/connection.hpp | 8 ++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 436aa687..aa6d550d 100644 --- a/README.md +++ b/README.md @@ -678,13 +678,21 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ### Boost 1.85 -* Fixes [issue 170](https://github.com/boostorg/redis/issues/170). +* ([Issue 170](https://github.com/boostorg/redis/issues/170)) Under load and on low-latency networks it is possible to start receiving responses before the write operation completed and while the request is still marked as staged and not written. This messes up with the heuristics that classifies responses as unsolicied or not. +* ([Issue 168](https://github.com/boostorg/redis/issues/168)). + Provides SSL context getters. The user wants to be able to pass the + `ssl::context` in the connection constructor as is done in Boost.Beast + and Boost.MySql. However, doing so would penalize users on plain + connections, which would have to start passing a dummy context on + every instantiation. If there is more convincing argument I will + change this in the future. + ### Boost 1.84 (First release in Boost) * Deprecates the `async_receive` overload that takes a response. Users diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 664c6e5d..36131c3a 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -423,6 +423,14 @@ class connection { usage get_usage() const noexcept { return impl_.get_usage(); } + /// Returns the ssl context. + auto const& get_ssl_context() const noexcept + { return impl_.get_ssl_context();} + + /// Returns the ssl context. + auto& get_ssl_context() noexcept + { return impl_.get_ssl_context();} + private: void async_run_impl( From 587c5feabab23edf4565b1225b741812c2a7f189 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 31 Dec 2023 16:15:41 +0100 Subject: [PATCH 5/7] Provides a way of passing a custom ssl context to the connection. --- README.md | 12 +++--- include/boost/redis/connection.hpp | 28 ++++++------- .../boost/redis/detail/connection_base.hpp | 8 +--- include/boost/redis/impl/connection.ipp | 8 ++-- test/test_conn_tls.cpp | 42 +++++++++++++++---- 5 files changed, 60 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index aa6d550d..25e5817b 100644 --- a/README.md +++ b/README.md @@ -686,12 +686,12 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. not. * ([Issue 168](https://github.com/boostorg/redis/issues/168)). - Provides SSL context getters. The user wants to be able to pass the - `ssl::context` in the connection constructor as is done in Boost.Beast - and Boost.MySql. However, doing so would penalize users on plain - connections, which would have to start passing a dummy context on - every instantiation. If there is more convincing argument I will - change this in the future. + Provides a way of passing a custom SSL context to the connection. + The design here differs from that of Boost.Beast and Boost.MySql + since in Boost.Redis the connection owns the context instead of only + storing a reference to a user provided one. This is ok so because + apps need only one connection for their entire application, which + makes the overhead of one ssl-context per connection negligible. ### Boost 1.84 (First release in Boost) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 36131c3a..71348f33 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -86,13 +86,19 @@ class basic_connection { using other = basic_connection; }; - /// Contructs from an executor. + /** @brief Constructor + * + * @param ex Executor on which connection operation will run. + * @param ctx SSL context. + * @param max_read_size Maximum read size that is passed to + * the internal `asio::dynamic_buffer` constructor. + */ explicit basic_connection( executor_type ex, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()) - : impl_{ex, method, max_read_size} + : impl_{ex, std::move(ctx), max_read_size} , timer_{ex} { } @@ -100,9 +106,9 @@ class basic_connection { explicit basic_connection( asio::io_context& ioc, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()) - : basic_connection(ioc.get_executor(), method, max_read_size) + : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size) { } /** @brief Starts underlying connection operations. @@ -286,10 +292,6 @@ class basic_connection { auto const& get_ssl_context() const noexcept { return impl_.get_ssl_context();} - /// Returns the ssl context. - auto& get_ssl_context() noexcept - { return impl_.get_ssl_context();} - /// Resets the underlying stream. void reset_stream() { impl_.reset_stream(); } @@ -343,14 +345,14 @@ class connection { explicit connection( executor_type ex, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()); /// Contructs from a context. explicit connection( asio::io_context& ioc, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()); /// Returns the underlying executor. @@ -427,10 +429,6 @@ class connection { auto const& get_ssl_context() const noexcept { return impl_.get_ssl_context();} - /// Returns the ssl context. - auto& get_ssl_context() noexcept - { return impl_.get_ssl_context();} - private: void async_run_impl( diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index 3e9a461a..043203bc 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -394,9 +394,9 @@ class connection_base { /// Constructs from an executor. connection_base( executor_type ex, - asio::ssl::context::method method, + asio::ssl::context ctx, std::size_t max_read_size) - : ctx_{method} + : ctx_{std::move(ctx)} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} , receive_channel_{ex, 256} @@ -411,10 +411,6 @@ class connection_base { auto const& get_ssl_context() const noexcept { return ctx_;} - /// Returns the ssl context. - auto& get_ssl_context() noexcept - { return ctx_;} - /// Resets the underlying stream. void reset_stream() { diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 796573e9..12abc996 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -10,16 +10,16 @@ namespace boost::redis { connection::connection( executor_type ex, - asio::ssl::context::method method, + asio::ssl::context ctx, std::size_t max_read_size) -: impl_{ex, method, max_read_size} +: impl_{ex, std::move(ctx), max_read_size} { } connection::connection( asio::io_context& ioc, - asio::ssl::context::method method, + asio::ssl::context ctx, std::size_t max_read_size) -: impl_{ioc.get_executor(), method, max_read_size} +: impl_{ioc.get_executor(), std::move(ctx), max_read_size} { } void diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index 5e38ef3c..eb45dde3 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -25,7 +25,7 @@ bool verify_certificate(bool, net::ssl::verify_context&) return true; } -BOOST_AUTO_TEST_CASE(ping) +config make_tls_config() { config cfg; cfg.use_ssl = true; @@ -34,7 +34,12 @@ BOOST_AUTO_TEST_CASE(ping) cfg.addr.host = "db.occase.de"; cfg.addr.port = "6380"; //cfg.health_check_interval = std::chrono::seconds{0}; + return cfg; +} +BOOST_AUTO_TEST_CASE(ping_internal_ssl_context) +{ + auto const cfg = make_tls_config(); std::string const in = "Kabuf"; request req; @@ -59,14 +64,37 @@ BOOST_AUTO_TEST_CASE(ping) BOOST_CHECK_EQUAL(in, std::get<0>(resp).value()); } +BOOST_AUTO_TEST_CASE(ping_custom_ssl_context) +{ + auto const cfg = make_tls_config(); + std::string const in = "Kabuf"; + + request req; + req.push("PING", in); + + response resp; + + net::io_context ioc; + net::ssl::context ctx{boost::asio::ssl::context::tls_client}; + connection conn{ioc, std::move(ctx)}; + conn.next_layer().set_verify_mode(net::ssl::verify_peer); + conn.next_layer().set_verify_callback(verify_certificate); + + conn.async_exec(req, resp, [&](auto ec, auto) { + BOOST_TEST(!ec); + conn.cancel(); + }); + + conn.async_run(cfg, {}, [](auto) { }); + + ioc.run(); + + BOOST_CHECK_EQUAL(in, std::get<0>(resp).value()); +} + BOOST_AUTO_TEST_CASE(acl_does_not_allow_select) { - config cfg; - cfg.use_ssl = true; - cfg.username = "aedis"; - cfg.password = "aedis"; - cfg.addr.host = "db.occase.de"; - cfg.addr.port = "6380"; + auto cfg = make_tls_config(); cfg.database_index = 22; cfg.reconnect_wait_interval = std::chrono::seconds::zero(); From 1160347f6aabf409a5623103bb379d003aff9f97 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Mon, 1 Jan 2024 12:35:16 +0100 Subject: [PATCH 6/7] Fixes the CMake file. --- example/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index a727b75b..a400d21b 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -10,8 +10,8 @@ macro(make_example EXAMPLE_NAME STANDARD) if (${STANDARD} STREQUAL "20") target_link_libraries(${EXAMPLE_NAME} PRIVATE examples_main) endif() - if (${STANDARD} STREQUAL "20") - target_link_libraries(${EXAMPLE_NAME} PRIVATE Boost::json) + if (${EXAMPLE_NAME} STREQUAL "cpp20_json") + target_link_libraries(${EXAMPLE_NAME} PRIVATE Boost::json Boost::container_hash) endif() endmacro() From 7ff82f319b7de2cd79b8a2c4ab10aa5e2d3346e0 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Wed, 3 Jan 2024 22:34:13 +0100 Subject: [PATCH 7/7] Fixes TLS reconnection. --- README.md | 5 +++ example/cpp20_intro_tls.cpp | 12 ++++--- include/boost/redis/connection.hpp | 51 +++++++++++++++++++++++++- test/test_conn_tls.cpp | 57 ++++++++++++++++++++++++++---- 4 files changed, 114 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 25e5817b..5435e51d 100644 --- a/README.md +++ b/README.md @@ -693,6 +693,11 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. apps need only one connection for their entire application, which makes the overhead of one ssl-context per connection negligible. +* ([Issue 169](https://github.com/boostorg/redis/issues/169)). + Allows setting a callback that is called before every attempt to + stablish a connection or reconnection. See `cpp20_intro_tls.cpp` for + an example. + ### Boost 1.84 (First release in Boost) * Deprecates the `async_receive` overload that takes a response. Users diff --git a/example/cpp20_intro_tls.cpp b/example/cpp20_intro_tls.cpp index b98028ce..9f26ad1e 100644 --- a/example/cpp20_intro_tls.cpp +++ b/example/cpp20_intro_tls.cpp @@ -22,10 +22,16 @@ using boost::redis::connection; auto verify_certificate(bool, asio::ssl::verify_context&) -> bool { - std::cout << "set_verify_callback" << std::endl; + std::cout << "verify_certificate called" << std::endl; return true; } +auto prepare_callback = [](connection::next_layer_type& stream) +{ + stream.set_verify_mode(asio::ssl::verify_peer); + stream.set_verify_callback(verify_certificate); +}; + auto co_main(config cfg) -> asio::awaitable { cfg.use_ssl = true; @@ -35,6 +41,7 @@ auto co_main(config cfg) -> asio::awaitable cfg.addr.port = "6380"; auto conn = std::make_shared(co_await asio::this_coro::executor); + conn->set_prepare_callback(prepare_callback); conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); request req; @@ -42,9 +49,6 @@ auto co_main(config cfg) -> asio::awaitable response resp; - conn->next_layer().set_verify_mode(asio::ssl::verify_peer); - conn->next_layer().set_verify_callback(verify_certificate); - co_await conn->async_exec(req, resp, asio::deferred); conn->cancel(); diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 71348f33..e1fbab1c 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -34,6 +34,8 @@ struct reconnection_op { { BOOST_ASIO_CORO_REENTER (coro_) for (;;) { + conn_->m_prepare_callback(conn_->next_layer()); + BOOST_ASIO_CORO_YIELD conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self)); conn_->cancel(operation::receive); @@ -78,6 +80,15 @@ class basic_connection { executor_type get_executor() noexcept { return impl_.get_executor(); } + /// Next layer type. + using next_layer_type = asio::ssl::stream>; + + /** Prepare callback type + * + * See set_prepare_callback for more information. + */ + using prepare_callback_type = std::function; + /// Rebinds the socket type to another executor. template struct rebind_executor @@ -313,6 +324,30 @@ class basic_connection { usage get_usage() const noexcept { return impl_.get_usage(); } + /** @brief Set the prepare callback + * + * This callback is called before every new connect or reconnect + * attempt. It is specially useful for SSL connections, for example + * + * @code + * auto verify_certificate(bool, asio::ssl::verify_context&) -> bool + * { + * std::cout << "verify_certificate called" << std::endl; + * return true; + * } + * + * auto prepare_callback = [](connection::next_layer_type& stream) + * { + * stream.set_verify_mode(asio::ssl::verify_peer); + * stream.set_verify_callback(verify_certificate); + * }; + * @endcode + */ + void set_prepare_callback(prepare_callback_type callback) + { + m_prepare_callback = std::move(callback); + } + private: using timer_type = asio::basic_waitable_timer< @@ -325,6 +360,7 @@ class basic_connection { config cfg_; detail::connection_base impl_; timer_type timer_; + prepare_callback_type m_prepare_callback = [](next_layer_type&){ }; }; /** \brief A basic_connection that type erases the executor. @@ -341,6 +377,15 @@ class connection { /// Executor type. using executor_type = asio::any_io_executor; + /// Underlying connection type. + using underlying_type = basic_connection; + + /// Next layer type. + using next_layer_type = underlying_type::next_layer_type; + + /// Prepare callback type + using prepare_callback_type = underlying_type::prepare_callback_type; + /// Contructs from an executor. explicit connection( @@ -429,6 +474,10 @@ class connection { auto const& get_ssl_context() const noexcept { return impl_.get_ssl_context();} + /// Calls `boost::redis::basic_connection::set_prepare_callback`. + void set_prepare_callback(prepare_callback_type callback) + { impl_.set_prepare_callback(std::move(callback)); } + private: void async_run_impl( @@ -436,7 +485,7 @@ class connection { logger l, asio::any_completion_handler token); - basic_connection impl_; + underlying_type impl_; }; } // boost::redis diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index eb45dde3..b4779a0f 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -17,6 +17,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::redis::operation; +using boost::redis::ignore; using boost::system::error_code; bool verify_certificate(bool, net::ssl::verify_context&) @@ -25,6 +26,12 @@ bool verify_certificate(bool, net::ssl::verify_context&) return true; } +auto prepare_callback = [](connection::next_layer_type& stream) +{ + stream.set_verify_mode(net::ssl::verify_peer); + stream.set_verify_callback(verify_certificate); +}; + config make_tls_config() { config cfg; @@ -49,8 +56,7 @@ BOOST_AUTO_TEST_CASE(ping_internal_ssl_context) net::io_context ioc; connection conn{ioc}; - conn.next_layer().set_verify_mode(net::ssl::verify_peer); - conn.next_layer().set_verify_callback(verify_certificate); + conn.set_prepare_callback(prepare_callback); conn.async_exec(req, resp, [&](auto ec, auto) { BOOST_TEST(!ec); @@ -77,8 +83,7 @@ BOOST_AUTO_TEST_CASE(ping_custom_ssl_context) net::io_context ioc; net::ssl::context ctx{boost::asio::ssl::context::tls_client}; connection conn{ioc, std::move(ctx)}; - conn.next_layer().set_verify_mode(net::ssl::verify_peer); - conn.next_layer().set_verify_callback(verify_certificate); + conn.set_prepare_callback(prepare_callback); conn.async_exec(req, resp, [&](auto ec, auto) { BOOST_TEST(!ec); @@ -107,8 +112,7 @@ BOOST_AUTO_TEST_CASE(acl_does_not_allow_select) net::io_context ioc; connection conn{ioc}; - conn.next_layer().set_verify_mode(net::ssl::verify_peer); - conn.next_layer().set_verify_callback(verify_certificate); + conn.set_prepare_callback(prepare_callback); conn.async_exec(req, resp, [&](auto, auto) { // TODO: We should not need this cancel here because @@ -126,3 +130,44 @@ BOOST_AUTO_TEST_CASE(acl_does_not_allow_select) BOOST_TEST(!!ec2); } + +BOOST_AUTO_TEST_CASE(tls_and_reconnection) +{ + net::io_context ioc; + connection conn{ioc}; + + int counter = 0; + auto prepare_callback = [&](auto& stream) + { + ++counter; + }; + + conn.set_prepare_callback(prepare_callback); + + request req; + req.get_config().cancel_on_connection_lost = false; + req.push("PING", "str1"); + req.push("QUIT"); + + conn.async_exec(req, ignore, [&](auto ec, auto) { + std::cout << "First: " << ec.message() << std::endl; + BOOST_TEST(!ec); + conn.async_exec(req, ignore, [&](auto ec, auto) { + std::cout << "Second: " << ec.message() << std::endl; + BOOST_TEST(!ec); + conn.async_exec(req, ignore, [&](auto ec, auto) { + std::cout << "Third: " << ec.message() << std::endl; + BOOST_TEST(!ec); + conn.cancel(); + }); + }); + }); + + auto const cfg = make_tls_config(); + conn.async_run(cfg, {}, [](auto) { }); + + ioc.run(); + + BOOST_CHECK_EQUAL(counter, 3); +} +