From 4547e1ac0792f22be45cd4a2a89b6ebf2f15b57b Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Mon, 4 Sep 2023 14:00:12 +0200 Subject: [PATCH 1/2] First steps with using adapters to process a generic_response. --- .../boost/redis/adapter/detail/adapters.hpp | 36 ++++++++++++------- .../redis/adapter/detail/response_traits.hpp | 12 ++++--- .../redis/adapter/detail/result_traits.hpp | 6 ++-- tests/test_low_level.cpp | 13 +++++++ 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 43bf6866..82875ec4 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -92,7 +92,8 @@ class general_aggregate { public: explicit general_aggregate(Result* c = nullptr): result_(c) {} - void operator()(resp3::basic_node const& nd, system::error_code&) + template + void operator()(resp3::basic_node const& nd, system::error_code&) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); switch (nd.data_type) { @@ -114,7 +115,8 @@ class general_simple { public: explicit general_simple(Node* t = nullptr) : result_(t) {} - void operator()(resp3::basic_node const& nd, system::error_code&) + template + void operator()(resp3::basic_node const& nd, system::error_code&) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); switch (nd.data_type) { @@ -136,10 +138,11 @@ class simple_impl { public: void on_value_available(Result&) {} + template void operator()( Result& result, - resp3::basic_node const& n, + resp3::basic_node const& n, system::error_code& ec) { if (is_aggregate(n.data_type)) { @@ -160,10 +163,11 @@ class set_impl { void on_value_available(Result& result) { hint_ = std::end(result); } + template void operator()( Result& result, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { @@ -195,10 +199,11 @@ class map_impl { void on_value_available(Result& result) { current_ = std::end(result); } + template void operator()( Result& result, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { @@ -233,10 +238,11 @@ class vector_impl { public: void on_value_available(Result& ) { } + template void operator()( Result& result, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { @@ -257,10 +263,11 @@ class array_impl { public: void on_value_available(Result& ) { } + template void operator()( Result& result, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { @@ -292,10 +299,11 @@ struct list_impl { void on_value_available(Result& ) { } + template void operator()( Result& result, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { if (!is_aggregate(nd.data_type)) { @@ -365,7 +373,8 @@ class wrapper> { response_type* result_; typename impl_map::type impl_; - bool set_if_resp3_error(resp3::basic_node const& nd) noexcept + template + bool set_if_resp3_error(resp3::basic_node const& nd) noexcept { switch (nd.data_type) { case resp3::type::null: @@ -387,9 +396,10 @@ class wrapper> { } } + template void operator()( - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); @@ -414,7 +424,8 @@ class wrapper>> { response_type* result_; typename impl_map::type impl_{}; - bool set_if_resp3_error(resp3::basic_node const& nd) noexcept + template + bool set_if_resp3_error(resp3::basic_node const& nd) noexcept { switch (nd.data_type) { case resp3::type::blob_error: @@ -429,9 +440,10 @@ class wrapper>> { public: explicit wrapper(response_type* o = nullptr) : result_(o) {} + template void operator()( - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index 919ed255..78bd1e28 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -23,8 +23,9 @@ namespace boost::redis::adapter::detail class ignore_adapter { public: + template void - operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) + operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) { switch (nd.data_type) { case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break; @@ -59,10 +60,11 @@ class static_adapter { auto get_supported_response_size() const noexcept { return size;} + template void operator()( std::size_t i, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { using std::visit; @@ -88,10 +90,11 @@ class vector_adapter { get_supported_response_size() const noexcept { return static_cast(-1);} + template void operator()( std::size_t, - resp3::basic_node const& nd, + resp3::basic_node const& nd, system::error_code& ec) { adapter_(nd, ec); @@ -142,7 +145,8 @@ class wrapper { public: explicit wrapper(Adapter adapter) : adapter_{adapter} {} - void operator()(resp3::basic_node const& nd, system::error_code& ec) + template + void operator()(resp3::basic_node const& nd, system::error_code& ec) { return adapter_(0, nd, ec); } [[nodiscard]] diff --git a/include/boost/redis/adapter/detail/result_traits.hpp b/include/boost/redis/adapter/detail/result_traits.hpp index 09c3b520..22cdebd3 100644 --- a/include/boost/redis/adapter/detail/result_traits.hpp +++ b/include/boost/redis/adapter/detail/result_traits.hpp @@ -116,7 +116,8 @@ class static_aggregate_adapter> { } } - void count(resp3::basic_node const& nd) + template + void count(resp3::basic_node const& nd) { if (nd.depth == 1) { if (is_aggregate(nd.data_type)) @@ -131,7 +132,8 @@ class static_aggregate_adapter> { ++i_; } - void operator()(resp3::basic_node const& nd, system::error_code& ec) + template + void operator()(resp3::basic_node const& nd, system::error_code& ec) { using std::visit; diff --git a/tests/test_low_level.cpp b/tests/test_low_level.cpp index 44419386..6bfe8d0f 100644 --- a/tests/test_low_level.cpp +++ b/tests/test_low_level.cpp @@ -589,3 +589,16 @@ BOOST_AUTO_TEST_CASE(adapter) BOOST_CHECK_EQUAL(std::get<1>(resp).value(), 42); BOOST_TEST(!ec); } + +// TODO: This was an experiment, I will resume implementing this +// later. +BOOST_AUTO_TEST_CASE(adapter_as) +{ + result> set; + auto adapter = adapt2(set); + + for (auto const& e: set_expected1a.value()) { + error_code ec; + adapter(e, ec); + } +} From 2a4936a9e1520c8ef037ecdebc856457788e051c Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Thu, 7 Sep 2023 00:08:26 +0200 Subject: [PATCH 2/2] Implements batch reads for server pushes. --- README.md | 26 ++++-- examples/cpp20_subscriber.cpp | 16 +++- .../boost/redis/adapter/detail/adapters.hpp | 41 ++------- .../redis/adapter/detail/response_traits.hpp | 15 +-- include/boost/redis/connection.hpp | 23 +++++ .../boost/redis/detail/connection_base.hpp | 28 +++++- include/boost/redis/error.hpp | 6 ++ include/boost/redis/impl/error.ipp | 3 + include/boost/redis/impl/response.ipp | 48 ++++++++++ include/boost/redis/response.hpp | 47 +++++++++- include/boost/redis/src.hpp | 1 + tests/common.hpp | 2 +- tests/test_conn_check_health.cpp | 4 +- tests/test_conn_echo_stress.cpp | 48 ++++------ tests/test_conn_push.cpp | 42 ++++++--- tests/test_low_level.cpp | 91 ++++++++++++++++--- 16 files changed, 323 insertions(+), 118 deletions(-) create mode 100644 include/boost/redis/impl/response.ipp diff --git a/README.md b/README.md index e6b70566..6a69e0db 100644 --- a/README.md +++ b/README.md @@ -674,22 +674,34 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ## Changelog -### develop (incorporates changes to conform the boost review and more) +### develop * Deprecates the `async_receive` overload that takes a response. Users - should now first call `set_receive_response` to avoid contantly seting - the same response. + should now first call `set_receive_response` to avoid constantly and + unnecessarily setting the same response. * Uses `std::function` to type erase the response adapter. This change should not influence users in any way but allowed important - simplification in the connections internals. This resulted in big - performance improvement where one of my benchmark programs passed - from 190k/s to 473k/s. + simplification in the connections internals. This resulted in + massive performance improvement. * The connection has a new member `get_usage()` that returns the - connection usage information, such as number of bytes writen, + connection usage information, such as number of bytes written, received etc. +* There are massive performance improvements in the consuming of + server pushes which are now communicated with an `asio::channel` and + therefore can be buffered which avoids blocking the socket read-loop. + Batch reads are also supported by means of `channel.try_send` and + buffered messages can be consumed synchronously with + `connection::receive`. The function `boost::redis::cancel_one` has + been added to simplify processing multiple server pushes contained + in the same `generic_response`. *IMPORTANT*: These changes may + result in more than one push in the response when + `connection::async_receive` resumes. The user must therefore be + careful when calling `resp.clear()`: either ensure that all message + have been processed or just use `consume_one`. + ### v1.4.2 (incorporates changes to conform the boost review and more) * Adds `boost::redis::config::database_index` to make it possible to diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index ac1cc884..c112d75c 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -22,9 +22,11 @@ namespace asio = boost::asio; using namespace std::chrono_literals; using boost::redis::request; using boost::redis::generic_response; +using boost::redis::consume_one; using boost::redis::logger; using boost::redis::config; using boost::redis::ignore; +using boost::redis::error; using boost::system::error_code; using boost::redis::connection; using signal_set = asio::deferred_t::as_default_on_t; @@ -58,20 +60,28 @@ receiver(std::shared_ptr conn) -> asio::awaitable // Loop while reconnection is enabled while (conn->will_reconnect()) { - // Reconnect to channels. + // Reconnect to the channels. co_await conn->async_exec(req, ignore, asio::deferred); // Loop reading Redis pushs messages. for (error_code ec;;) { - co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); + // First tries to read any buffered pushes. + conn->receive(ec); + if (ec == error::sync_receive_push_failed) { + ec = {}; + co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); + } + if (ec) break; // Connection lost, break so we can reconnect to channels. + std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " << resp.value().at(3).value << std::endl; - resp.value().clear(); + + consume_one(resp); } } } diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 82875ec4..a1f91076 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -139,11 +139,7 @@ class simple_impl { void on_value_available(Result&) {} template - void - operator()( - Result& result, - resp3::basic_node const& n, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& n, system::error_code& ec) { if (is_aggregate(n.data_type)) { ec = redis::error::expects_resp3_simple_type; @@ -164,11 +160,7 @@ class set_impl { { hint_ = std::end(result); } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (nd.data_type != resp3::type::set) @@ -200,11 +192,7 @@ class map_impl { { current_ = std::end(result); } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (element_multiplicity(nd.data_type) != 2) @@ -239,11 +227,7 @@ class vector_impl { void on_value_available(Result& ) { } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { auto const m = element_multiplicity(nd.data_type); @@ -264,11 +248,7 @@ class array_impl { void on_value_available(Result& ) { } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (i_ != -1) { @@ -300,11 +280,7 @@ struct list_impl { void on_value_available(Result& ) { } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (!is_aggregate(nd.data_type)) { BOOST_ASSERT(nd.aggregate_size == 1); @@ -397,10 +373,7 @@ class wrapper> { } template - void - operator()( - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(resp3::basic_node const& nd, system::error_code& ec) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index 78bd1e28..3ba5bfec 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -24,8 +24,7 @@ namespace boost::redis::adapter::detail class ignore_adapter { public: template - void - operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) + void operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) { switch (nd.data_type) { case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break; @@ -61,11 +60,7 @@ class static_adapter { { return size;} template - void - operator()( - std::size_t i, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(std::size_t i, resp3::basic_node const& nd, system::error_code& ec) { using std::visit; // I am usure whether this should be an error or an assertion. @@ -91,11 +86,7 @@ class vector_adapter { { return static_cast(-1);} template - void - operator()( - std::size_t, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) { adapter_(nd, ec); } diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c1cb7dea..0b0c6267 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -188,6 +188,23 @@ class basic_connection { auto async_receive(CompletionToken token = CompletionToken{}) { return impl_.async_receive(std::move(token)); } + + /** @brief Receives server pushes synchronously without blocking. + * + * Receives a server push synchronously by calling `try_receive` on + * the underlying channel. If the operation fails because + * `try_receive` returns `false`, `ec` will be set to + * `boost::redis::error::sync_receive_push_failed`. + * + * @param ec Contains the error if any occurred. + * + * @returns The number of bytes read from the socket. + */ + std::size_t receive(system::error_code& ec) + { + return impl_.receive(ec); + } + template < class Response = ignore_t, class CompletionToken = asio::default_completion_token_t @@ -367,6 +384,12 @@ class connection { auto async_receive(CompletionToken token) { return impl_.async_receive(std::move(token)); } + /// Calls `boost::redis::basic_connection::receive`. + std::size_t receive(system::error_code& ec) + { + return impl_.receive(ec); + } + /// Calls `boost::redis::basic_connection::async_exec`. template auto async_exec(request const& req, Response& resp, CompletionToken token) diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index 1c4ff578..6afea4cc 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -348,8 +348,10 @@ struct reader_op { } if (res_.first == parse_result::push) { - BOOST_ASIO_CORO_YIELD - conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); + if (!conn_->receive_channel_.try_send(ec, res_.second)) { + BOOST_ASIO_CORO_YIELD + conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); + } if (ec) { logger_.trace("reader-op: error. Exiting ..."); @@ -398,7 +400,7 @@ class connection_base { : ctx_{method} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} - , receive_channel_{ex} + , receive_channel_{ex, 256} , runner_{ex, {}} , dbuf_{read_buffer_, max_read_size} { @@ -470,6 +472,26 @@ class connection_base { auto async_receive(CompletionToken token) { return receive_channel_.async_receive(std::move(token)); } + std::size_t receive(system::error_code& ec) + { + std::size_t size = 0; + + auto f = [&](system::error_code const& ec2, std::size_t n) + { + ec = ec2; + size = n; + }; + + auto const res = receive_channel_.try_receive(f); + if (ec) + return 0; + + if (!res) + ec = error::sync_receive_push_failed; + + return size; + } + template auto async_run(config const& cfg, Logger l, CompletionToken token) { diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 7424aea7..85b152d2 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -75,6 +75,12 @@ enum class error /// SSL handshake timeout ssl_handshake_timeout, + + /// Can't receive push synchronously without blocking + sync_receive_push_failed, + + /// Incompatible node depth. + incompatible_node_depth, }; /** \internal diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index 9f5c06eb..6a5c8cb8 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -41,6 +41,9 @@ struct error_category_impl : system::error_category { case error::resolve_timeout: return "Resolve timeout."; case error::connect_timeout: return "Connect timeout."; case error::pong_timeout: return "Pong timeout."; + case error::ssl_handshake_timeout: return "SSL handshake timeout."; + case error::sync_receive_push_failed: return "Can't receive server push synchronously without blocking."; + case error::incompatible_node_depth: return "Incompatible node depth."; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/response.ipp b/include/boost/redis/impl/response.ipp new file mode 100644 index 00000000..c2306e1e --- /dev/null +++ b/include/boost/redis/impl/response.ipp @@ -0,0 +1,48 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#include + +namespace boost::redis +{ + +void consume_one(generic_response& r, system::error_code& ec) +{ + if (r.has_error()) + return; // Nothing to consume. + + if (std::empty(r.value())) + return; // Nothing to consume. + + auto const depth = r.value().front().depth; + + // To simplify we will refuse to consume any data-type that is not + // a root node. I think there is no use for that and it is complex + // since it requires updating parent nodes. + if (depth != 0) { + ec = error::incompatible_node_depth; + return; + } + + auto f = [depth](auto const& e) + { return e.depth == depth; }; + + auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f); + + r.value().erase(std::cbegin(r.value()), match); +} + +void consume_one(generic_response& r) +{ + system::error_code ec; + consume_one(r, ec); + if (ec) + throw system::system_error(ec); +} + +} // boost::redis::resp3 diff --git a/include/boost/redis/response.hpp b/include/boost/redis/response.hpp index 5f6c5c37..b3f76ad5 100644 --- a/include/boost/redis/response.hpp +++ b/include/boost/redis/response.hpp @@ -9,12 +9,14 @@ #include #include +#include #include #include #include -namespace boost::redis { +namespace boost::redis +{ /** @brief Response with compile-time size. * @ingroup high-level-api @@ -32,6 +34,47 @@ using response = std::tuple...>; */ using generic_response = adapter::result>; -} // boost::redis::resp3 +/** @brief Consume on response from a generic response + * + * This function rotates the elements so that the start of the next + * response becomes the new front element. For example the output of + * the following code + * + * @code + * request req; + * req.push("PING", "one"); + * req.push("PING", "two"); + * req.push("PING", "three"); + * + * generic_response resp; + * co_await conn->async_exec(req, resp, asio::deferred); + * + * std::cout << "PING: " << resp.value().front().value << std::endl; + * consume_one(resp); + * std::cout << "PING: " << resp.value().front().value << std::endl; + * consume_one(resp); + * std::cout << "PING: " << resp.value().front().value << std::endl; + * @code + * + * is + * + * @code + * PING: one + * PING: two + * PING: three + * @code + * + * Given that this function rotates elements, it won't be very + * efficient for responses with a large number of elements. It was + * introduced mainly to deal with buffers server pushes as shown in + * the cpp20_subscriber.cpp example. In the future queue-like + * responses might be introduced to consume in O(1) operations. + */ +void consume_one(generic_response& r, system::error_code& ec); + +/// Throwing overload of `consume_one`. +void consume_one(generic_response& r); + +} // boost::redis #endif // BOOST_REDIS_RESPONSE_HPP diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 3a06c3e0..7075bf13 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/tests/common.hpp b/tests/common.hpp index b5bc7dab..bbe3e3f2 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -21,5 +21,5 @@ run( boost::redis::config cfg = {}, boost::system::error_code ec = boost::asio::error::operation_aborted, boost::redis::operation op = boost::redis::operation::receive, - boost::redis::logger::level l = boost::redis::logger::level::info); + boost::redis::logger::level l = boost::redis::logger::level::disabled); diff --git a/tests/test_conn_check_health.cpp b/tests/test_conn_check_health.cpp index 91ece76a..3038ae0d 100644 --- a/tests/test_conn_check_health.cpp +++ b/tests/test_conn_check_health.cpp @@ -5,6 +5,7 @@ */ #include +#include #include #define BOOST_TEST_MODULE check-health #include @@ -21,6 +22,7 @@ using boost::redis::ignore; using boost::redis::operation; using boost::redis::generic_response; using boost::redis::logger; +using boost::redis::consume_one; using redis::config; // TODO: Test cancel(health_check) @@ -39,7 +41,6 @@ struct push_callback { { BOOST_ASIO_CORO_REENTER (coro) for (;;) { - resp2->value().clear(); BOOST_ASIO_CORO_YIELD conn2->async_receive(*this); if (ec) { @@ -50,6 +51,7 @@ struct push_callback { BOOST_TEST(resp2->has_value()); BOOST_TEST(!resp2->value().empty()); std::clog << "Event> " << resp2->value().front().value << std::endl; + consume_one(*resp2); ++i; diff --git a/tests/test_conn_echo_stress.cpp b/tests/test_conn_echo_stress.cpp index 0cbf3a4c..95115584 100644 --- a/tests/test_conn_echo_stress.cpp +++ b/tests/test_conn_echo_stress.cpp @@ -27,6 +27,7 @@ using boost::redis::logger; using boost::redis::config; using boost::redis::connection; using boost::redis::usage; +using boost::redis::error; std::ostream& operator<<(std::ostream& os, usage const& u) { @@ -45,8 +46,16 @@ auto push_consumer(std::shared_ptr conn, int expected) -> net::await { int c = 0; for (error_code ec;;) { - co_await conn->async_receive(redirect_error(net::use_awaitable, ec)); + conn->receive(ec); + if (ec == error::sync_receive_push_failed) { + ec = {}; + co_await conn->async_receive(redirect_error(net::use_awaitable, ec)); + } else if (!ec) { + //std::cout << "Skipping suspension." << std::endl; + } + if (ec) { + BOOST_TEST(false); std::cout << "push_consumer error: " << ec.message() << std::endl; co_return; } @@ -61,30 +70,10 @@ auto echo_session( std::shared_ptr conn, std::shared_ptr pubs, - std::string id, int n) -> net::awaitable { - auto ex = co_await net::this_coro::executor; - - request req; - response resp; - - for (auto i = 0; i < n; ++i) { - auto const msg = id + "/" + std::to_string(i); - //std::cout << msg << std::endl; - req.push("HELLO", 3); // Just to mess around. - req.push("PING", msg); - req.push("PING", "lsls"); // TODO: Change to HELLO after fixing issue 105. - boost::system::error_code ec; - co_await conn->async_exec(req, resp, redir(ec)); - - BOOST_TEST(!ec); - BOOST_REQUIRE_EQUAL(msg, std::get<1>(resp).value()); - req.clear(); - std::get<1>(resp).value().clear(); - + for (auto i = 0; i < n; ++i) co_await conn->async_exec(*pubs, ignore, net::deferred); - } } auto async_echo_stress(std::shared_ptr conn) -> net::awaitable @@ -103,19 +92,20 @@ auto async_echo_stress(std::shared_ptr conn) -> net::awaitable // Number of coroutines that will send pings sharing the same // connection to redis. - int const sessions = 500; + int const sessions = 1000; // The number of pings that will be sent by each session. - int const msgs = 1000; + int const msgs = 500; // The number of publishes that will be sent by each session with // each message. - int const n_pubs = 10; + int const n_pubs = 100; // This is the total number of pushes we will receive. int total_pushes = sessions * msgs * n_pubs + 1; auto pubs = std::make_shared(); + pubs->push("PING"); for (int i = 0; i < n_pubs; ++i) pubs->push("PUBLISH", "channel", "payload"); @@ -124,7 +114,7 @@ auto async_echo_stress(std::shared_ptr conn) -> net::awaitable net::co_spawn(ex, push_consumer(conn, total_pushes), net::detached); for (int i = 0; i < sessions; ++i) - net::co_spawn(ex, echo_session(conn, pubs, std::to_string(i), msgs), net::detached); + net::co_spawn(ex, echo_session(conn, pubs, msgs), net::detached); } BOOST_AUTO_TEST_CASE(echo_stress) @@ -134,8 +124,10 @@ BOOST_AUTO_TEST_CASE(echo_stress) net::co_spawn(ioc, async_echo_stress(conn), net::detached); ioc.run(); - std::cout << "-------------------\n" - << conn->get_usage() << std::endl; + std::cout + << "-------------------\n" + << conn->get_usage() + << std::endl; } #else diff --git a/tests/test_conn_push.cpp b/tests/test_conn_push.cpp index ab2572ce..40eff4ea 100644 --- a/tests/test_conn_push.cpp +++ b/tests/test_conn_push.cpp @@ -26,6 +26,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore; using boost::redis::ignore_t; +using boost::system::error_code; using redis::config; using boost::redis::logger; using namespace std::chrono_literals; @@ -49,7 +50,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) auto c3 =[](auto ec, auto...) { - BOOST_TEST(!!ec); + std::cout << "c3: " << ec.message() << std::endl; }; auto c2 =[&, conn](auto ec, auto...) @@ -73,8 +74,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) std::cout << "async_receive" << std::endl; BOOST_TEST(!ec); push_received = true; - conn->cancel(operation::run); - conn->cancel(operation::reconnection); + conn->cancel(); }); ioc.run(); @@ -87,29 +87,45 @@ BOOST_AUTO_TEST_CASE(push_received1) net::io_context ioc; auto conn = std::make_shared(ioc); + // Trick: Uses SUBSCRIBE because this command has no response or + // better said, its response is a server push, which is what we + // want to test. We send two because we want to test both + // async_receive and receive. request req; - //req.push("HELLO", 3); - req.push("SUBSCRIBE", "channel"); + req.push("SUBSCRIBE", "channel1"); + req.push("SUBSCRIBE", "channel2"); conn->async_exec(req, ignore, [conn](auto ec, auto){ std::cout << "async_exec" << std::endl; BOOST_TEST(!ec); }); - run(conn); - - bool push_received = false; + bool push_async_received = false; conn->async_receive([&, conn](auto ec, auto){ - std::cout << "async_receive" << std::endl; + std::cout << "(1) async_receive" << std::endl; + BOOST_TEST(!ec); - push_received = true; - conn->cancel(operation::run); - conn->cancel(operation::reconnection); + push_async_received = true; + + // Receives the second push synchronously. + error_code ec2; + std::size_t res = 0; + res = conn->receive(ec2); + BOOST_TEST(!ec2); + BOOST_TEST(res != std::size_t(0)); + + // Tries to receive a third push synchronously. + ec2 = {}; + res = conn->receive(ec2); + BOOST_CHECK_EQUAL(ec2, boost::redis::make_error_code(boost::redis::error::sync_receive_push_failed)); + + conn->cancel(); }); + run(conn); ioc.run(); - BOOST_TEST(push_received); + BOOST_TEST(push_async_received); } BOOST_AUTO_TEST_CASE(push_filtered_out) diff --git a/tests/test_low_level.cpp b/tests/test_low_level.cpp index 6bfe8d0f..dadbe454 100644 --- a/tests/test_low_level.cpp +++ b/tests/test_low_level.cpp @@ -30,15 +30,17 @@ namespace resp3 = boost::redis::resp3; using boost::system::error_code; using boost::redis::request; using boost::redis::response; +using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::adapter::result; using boost::redis::resp3::parser; using boost::redis::resp3::parse; +using boost::redis::consume_one; +using boost::redis::error; using boost::redis::adapter::adapt2; using node_type = result; -using vec_node_type = result>; using vec_type = result>; using op_vec_type = result>>; @@ -154,7 +156,7 @@ result> op_bool_ok = true; // TODO: Test a streamed string that is not finished with a string of // size 0 but other command comes in. -vec_node_type streamed_string_e1 +generic_response streamed_string_e1 {{ {boost::redis::resp3::type::streamed_string, 0, 1, ""} , {boost::redis::resp3::type::streamed_string_part, 1, 1, "Hell"} , {boost::redis::resp3::type::streamed_string_part, 1, 1, "o wor"} @@ -162,10 +164,10 @@ vec_node_type streamed_string_e1 , {boost::redis::resp3::type::streamed_string_part, 1, 1, ""} }}; -vec_node_type streamed_string_e2 +generic_response streamed_string_e2 {{{resp3::type::streamed_string, 0UL, 1UL, {}}, {resp3::type::streamed_string_part, 1UL, 1UL, {}} }}; -vec_node_type const push_e1a +generic_response const push_e1a {{ {resp3::type::push, 4UL, 0UL, {}} , {resp3::type::simple_string, 1UL, 1UL, "pubsub"} , {resp3::type::simple_string, 1UL, 1UL, "message"} @@ -173,10 +175,10 @@ vec_node_type const push_e1a , {resp3::type::simple_string, 1UL, 1UL, "some message"} }}; -vec_node_type const push_e1b +generic_response const push_e1b {{{resp3::type::push, 0UL, 0UL, {}}}}; -vec_node_type const set_expected1a +generic_response const set_expected1a {{{resp3::type::set, 6UL, 0UL, {}} , {resp3::type::simple_string, 1UL, 1UL, {"orange"}} , {resp3::type::simple_string, 1UL, 1UL, {"apple"}} @@ -192,7 +194,7 @@ muset_type const set_e1g{{"apple", "one", "orange", "orange", "three", "two"}}; vec_type const set_e1d = {{"orange", "apple", "one", "two", "three", "orange"}}; op_vec_type const set_expected_1e = set_e1d; -vec_node_type const array_e1a +generic_response const array_e1a {{ {resp3::type::array, 3UL, 0UL, {}} , {resp3::type::blob_string, 1UL, 1UL, {"11"}} , {resp3::type::blob_string, 1UL, 1UL, {"22"}} @@ -202,12 +204,12 @@ vec_node_type const array_e1a result> const array_e1b{{11, 22, 3}}; result> const array_e1c{{"11", "22", "3"}}; result> const array_e1d{}; -vec_node_type const array_e1e{{{resp3::type::array, 0UL, 0UL, {}}}}; +generic_response const array_e1e{{{resp3::type::array, 0UL, 0UL, {}}}}; array_type const array_e1f{{11, 22, 3}}; result> const array_e1g{{11, 22, 3}}; result> const array_e1h{{11, 22, 3}}; -vec_node_type const map_expected_1a +generic_response const map_expected_1a {{ {resp3::type::map, 4UL, 0UL, {}} , {resp3::type::blob_string, 1UL, 1UL, {"key1"}} , {resp3::type::blob_string, 1UL, 1UL, {"value1"}} @@ -263,7 +265,7 @@ tuple8_type const map_e1f , std::string{"key3"}, std::string{"value3"} }; -vec_node_type const attr_e1a +generic_response const attr_e1a {{ {resp3::type::attribute, 1UL, 0UL, {}} , {resp3::type::simple_string, 1UL, 1UL, "key-popularity"} , {resp3::type::map, 2UL, 1UL, {}} @@ -273,7 +275,7 @@ vec_node_type const attr_e1a , {resp3::type::doublean, 1UL, 2UL, "0.0012"} } }; -vec_node_type const attr_e1b +generic_response const attr_e1b {{{resp3::type::attribute, 0UL, 0UL, {}} }}; #define S01a "#11\r\n" @@ -407,7 +409,7 @@ vec_node_type const attr_e1b test(make_expected(S04e, array_type2{}, boost::redis::error::incompatible_size));\ test(make_expected(S04e, tuple_int_2{}, boost::redis::error::incompatible_size));\ test(make_expected(S04f, array_type2{}, boost::redis::error::nested_aggregate_not_supported));\ - test(make_expected(S04g, vec_node_type{}, boost::redis::error::exceeeds_max_nested_depth));\ + test(make_expected(S04g, generic_response{}, boost::redis::error::exceeeds_max_nested_depth));\ test(make_expected(S04h, array_e1d));\ test(make_expected(S04h, array_e1e));\ test(make_expected(S04i, set_type{}, boost::redis::error::expects_resp3_set)); \ @@ -418,7 +420,7 @@ vec_node_type const attr_e1b test(make_expected(S09a, set_expected1a)); \ test(make_expected(S09a, set_expected_1e)); \ test(make_expected(S09a, set_type{{"apple", "one", "orange", "three", "two"}})); \ - test(make_expected(S09b, vec_node_type{{{resp3::type::set, 0UL, 0UL, {}}}})); \ + test(make_expected(S09b, generic_response{{{resp3::type::set, 0UL, 0UL, {}}}})); \ test(make_expected(S03c, map_type{}));\ test(make_expected(S11a, node_type{{resp3::type::doublean, 1UL, 0UL, {"1.23"}}}));\ test(make_expected(S11b, node_type{{resp3::type::doublean, 1UL, 0UL, {"inf"}}}));\ @@ -496,7 +498,7 @@ void check_error(char const* name, boost::redis::error ev) static_cast::type>(ev))); } -BOOST_AUTO_TEST_CASE(error) +BOOST_AUTO_TEST_CASE(cover_error) { check_error("boost.redis", boost::redis::error::invalid_data_type); check_error("boost.redis", boost::redis::error::not_a_number); @@ -514,6 +516,12 @@ BOOST_AUTO_TEST_CASE(error) check_error("boost.redis", boost::redis::error::not_a_double); check_error("boost.redis", boost::redis::error::resp3_null); check_error("boost.redis", boost::redis::error::not_connected); + check_error("boost.redis", boost::redis::error::resolve_timeout); + check_error("boost.redis", boost::redis::error::connect_timeout); + check_error("boost.redis", boost::redis::error::pong_timeout); + check_error("boost.redis", boost::redis::error::ssl_handshake_timeout); + check_error("boost.redis", boost::redis::error::sync_receive_push_failed); + check_error("boost.redis", boost::redis::error::incompatible_node_depth); } std::string get_type_as_str(boost::redis::resp3::type t) @@ -602,3 +610,58 @@ BOOST_AUTO_TEST_CASE(adapter_as) adapter(e, ec); } } + +BOOST_AUTO_TEST_CASE(cancel_one_1) +{ + auto resp = push_e1a; + BOOST_TEST(resp.has_value()); + + consume_one(resp); + BOOST_TEST(resp.value().empty()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_empty) +{ + generic_response resp; + BOOST_TEST(resp.has_value()); + + consume_one(resp); + BOOST_TEST(resp.value().empty()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_has_error) +{ + generic_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}}; + BOOST_TEST(resp.has_error()); + + consume_one(resp); + BOOST_TEST(resp.has_error()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_has_does_not_consume_past_the_end) +{ + auto resp = push_e1a; + BOOST_TEST(resp.has_value()); + resp.value().insert( + std::cend(resp.value()), + std::cbegin(push_e1a.value()), + std::cend(push_e1a.value())); + + consume_one(resp); + + BOOST_CHECK_EQUAL(resp.value().size(), push_e1a.value().size()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_incompatible_depth) +{ + auto resp = streamed_string_e1; + BOOST_TEST(resp.has_value()); + + error_code ec; + consume_one(resp, ec); + + error_code expected = error::incompatible_node_depth; + BOOST_CHECK_EQUAL(ec, expected); + + BOOST_CHECK_EQUAL(resp.value().size(), push_e1a.value().size()); +}