diff --git a/README.md b/README.md index e6b70566..c9bedda6 100644 --- a/README.md +++ b/README.md @@ -677,19 +677,30 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ### develop (incorporates changes to conform the boost review and more) * Deprecates the `async_receive` overload that takes a response. Users - should now first call `set_receive_response` to avoid contantly seting - the same response. + should now first call `set_receive_response` to avoid constantly and + unnecessarily setting the same response. * Uses `std::function` to type erase the response adapter. This change should not influence users in any way but allowed important - simplification in the connections internals. This resulted in big - performance improvement where one of my benchmark programs passed - from 190k/s to 473k/s. + simplification in the connections internals. This resulted in + massive performance improvement. * The connection has a new member `get_usage()` that returns the - connection usage information, such as number of bytes writen, + connection usage information, such as number of bytes written, received etc. +* There are massive performance improvements in the consuming of + server pushes which are now communicated with an `asio::channel` and + therefore can be buffered. Batch reads are also supported by means + of `channle.try_send` and buffered messages can be consumed + synchronously with `connection::receive`. The function + `boost::redis::cancel_one` has been added to simplify processing + multiple server pushes contained in the same `generic_response`. + *IMPORTANT*: These changes may result in more than one push in the + response when `connection::async_receive` resumes. The user must + therefore be careful when calling `resp.clear()`: either ensure that + all message have been processed or just use `consume_one`. + ### v1.4.2 (incorporates changes to conform the boost review and more) * Adds `boost::redis::config::database_index` to make it possible to diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index ac1cc884..c112d75c 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -22,9 +22,11 @@ namespace asio = boost::asio; using namespace std::chrono_literals; using boost::redis::request; using boost::redis::generic_response; +using boost::redis::consume_one; using boost::redis::logger; using boost::redis::config; using boost::redis::ignore; +using boost::redis::error; using boost::system::error_code; using boost::redis::connection; using signal_set = asio::deferred_t::as_default_on_t; @@ -58,20 +60,28 @@ receiver(std::shared_ptr conn) -> asio::awaitable // Loop while reconnection is enabled while (conn->will_reconnect()) { - // Reconnect to channels. + // Reconnect to the channels. co_await conn->async_exec(req, ignore, asio::deferred); // Loop reading Redis pushs messages. for (error_code ec;;) { - co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); + // First tries to read any buffered pushes. + conn->receive(ec); + if (ec == error::sync_receive_push_failed) { + ec = {}; + co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); + } + if (ec) break; // Connection lost, break so we can reconnect to channels. + std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " << resp.value().at(3).value << std::endl; - resp.value().clear(); + + consume_one(resp); } } } diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 82875ec4..a1f91076 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -139,11 +139,7 @@ class simple_impl { void on_value_available(Result&) {} template - void - operator()( - Result& result, - resp3::basic_node const& n, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& n, system::error_code& ec) { if (is_aggregate(n.data_type)) { ec = redis::error::expects_resp3_simple_type; @@ -164,11 +160,7 @@ class set_impl { { hint_ = std::end(result); } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (nd.data_type != resp3::type::set) @@ -200,11 +192,7 @@ class map_impl { { current_ = std::end(result); } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (element_multiplicity(nd.data_type) != 2) @@ -239,11 +227,7 @@ class vector_impl { void on_value_available(Result& ) { } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { auto const m = element_multiplicity(nd.data_type); @@ -264,11 +248,7 @@ class array_impl { void on_value_available(Result& ) { } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (is_aggregate(nd.data_type)) { if (i_ != -1) { @@ -300,11 +280,7 @@ struct list_impl { void on_value_available(Result& ) { } template - void - operator()( - Result& result, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(Result& result, resp3::basic_node const& nd, system::error_code& ec) { if (!is_aggregate(nd.data_type)) { BOOST_ASSERT(nd.aggregate_size == 1); @@ -397,10 +373,7 @@ class wrapper> { } template - void - operator()( - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(resp3::basic_node const& nd, system::error_code& ec) { BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index 78bd1e28..3ba5bfec 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -24,8 +24,7 @@ namespace boost::redis::adapter::detail class ignore_adapter { public: template - void - operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) + void operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) { switch (nd.data_type) { case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break; @@ -61,11 +60,7 @@ class static_adapter { { return size;} template - void - operator()( - std::size_t i, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(std::size_t i, resp3::basic_node const& nd, system::error_code& ec) { using std::visit; // I am usure whether this should be an error or an assertion. @@ -91,11 +86,7 @@ class vector_adapter { { return static_cast(-1);} template - void - operator()( - std::size_t, - resp3::basic_node const& nd, - system::error_code& ec) + void operator()(std::size_t, resp3::basic_node const& nd, system::error_code& ec) { adapter_(nd, ec); } diff --git a/include/boost/redis/config.hpp b/include/boost/redis/config.hpp index f2b1c8e8..2bb85d7d 100644 --- a/include/boost/redis/config.hpp +++ b/include/boost/redis/config.hpp @@ -7,9 +7,12 @@ #ifndef BOOST_REDIS_CONFIG_HPP #define BOOST_REDIS_CONFIG_HPP +#include + #include #include #include +#include namespace boost::redis { @@ -80,6 +83,24 @@ struct config { std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1}; }; +struct conn_config { + /// SSL context method + asio::ssl::context::method method = asio::ssl::context::tls_client; + + /// Max read size passed to the internal `asio::dynamic_buffer`. + std::size_t max_read_size = (std::numeric_limits::max)(); + + /** @brief Push maximum buffer size + * + * This value is passed to the underlying `asio::channel`. When + * the queued number of pushes reaches this value the connection + * won't read any more messages from the socket until at least one + * push is consumed from the buffer, this includes pushes and + * responses to commands. + */ + std::size_t push_max_buffer_size = 128; +}; + } // boost::redis #endif // BOOST_REDIS_CONFIG_HPP diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c1cb7dea..bd8612b7 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -18,7 +18,6 @@ #include #include -#include namespace boost::redis { namespace detail @@ -88,21 +87,15 @@ class basic_connection { /// Contructs from an executor. explicit - basic_connection( - executor_type ex, - asio::ssl::context::method method = asio::ssl::context::tls_client, - std::size_t max_read_size = (std::numeric_limits::max)()) - : impl_{ex, method, max_read_size} + basic_connection(executor_type ex, conn_config const& cfg = {}) + : impl_{ex, cfg} , timer_{ex} { } /// Contructs from a context. explicit - basic_connection( - asio::io_context& ioc, - asio::ssl::context::method method = asio::ssl::context::tls_client, - std::size_t max_read_size = (std::numeric_limits::max)()) - : basic_connection(ioc.get_executor(), method, max_read_size) + basic_connection(asio::io_context& ioc, conn_config const& cfg = {}) + : basic_connection(ioc.get_executor(), cfg) { } /** @brief Starts underlying connection operations. @@ -188,6 +181,23 @@ class basic_connection { auto async_receive(CompletionToken token = CompletionToken{}) { return impl_.async_receive(std::move(token)); } + + /** @brief Receives server pushes synchronously without blocking. + * + * Receives a server push synchronously by calling `try_receive` on + * the underlying channel. If the operation fails because + * `try_receive` returns `false`, `ec` will be set to + * `boost::redis::error::sync_receive_push_failed`. + * + * @param ec Contains the error if any occurred. + * + * @returns The number of bytes read from the socket. + */ + std::size_t receive(system::error_code& ec) + { + return impl_.receive(ec); + } + template < class Response = ignore_t, class CompletionToken = asio::default_completion_token_t @@ -325,18 +335,10 @@ class connection { using executor_type = asio::any_io_executor; /// Contructs from an executor. - explicit - connection( - executor_type ex, - asio::ssl::context::method method = asio::ssl::context::tls_client, - std::size_t max_read_size = (std::numeric_limits::max)()); + explicit connection(executor_type ex, conn_config const& cfg = {}); /// Contructs from a context. - explicit - connection( - asio::io_context& ioc, - asio::ssl::context::method method = asio::ssl::context::tls_client, - std::size_t max_read_size = (std::numeric_limits::max)()); + explicit connection(asio::io_context& ioc, conn_config const& cfg = {}); /// Returns the underlying executor. executor_type get_executor() noexcept @@ -367,6 +369,12 @@ class connection { auto async_receive(CompletionToken token) { return impl_.async_receive(std::move(token)); } + /// Calls `boost::redis::basic_connection::receive`. + std::size_t receive(system::error_code& ec) + { + return impl_.receive(ec); + } + /// Calls `boost::redis::basic_connection::async_exec`. template auto async_exec(request const& req, Response& resp, CompletionToken token) diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index 1c4ff578..ffba6147 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -348,8 +348,10 @@ struct reader_op { } if (res_.first == parse_result::push) { - BOOST_ASIO_CORO_YIELD - conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); + if (!conn_->receive_channel_.try_send(ec, res_.second)) { + BOOST_ASIO_CORO_YIELD + conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); + } if (ec) { logger_.trace("reader-op: error. Exiting ..."); @@ -390,17 +392,13 @@ class connection_base { using this_type = connection_base; - /// Constructs from an executor. - connection_base( - executor_type ex, - asio::ssl::context::method method, - std::size_t max_read_size) - : ctx_{method} + connection_base(executor_type ex, conn_config const& cfg) + : ctx_{cfg.method} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} - , receive_channel_{ex} + , receive_channel_{ex, cfg.push_max_buffer_size} , runner_{ex, {}} - , dbuf_{read_buffer_, max_read_size} + , dbuf_{read_buffer_, cfg.max_read_size} { set_receive_response(ignore); writer_timer_.expires_at(std::chrono::steady_clock::time_point::max()); @@ -470,6 +468,26 @@ class connection_base { auto async_receive(CompletionToken token) { return receive_channel_.async_receive(std::move(token)); } + std::size_t receive(system::error_code& ec) + { + std::size_t size = 0; + + auto f = [&](system::error_code const& ec2, std::size_t n) + { + ec = ec2; + size = n; + }; + + auto const res = receive_channel_.try_receive(f); + if (ec) + return 0; + + if (!res) + ec = error::sync_receive_push_failed; + + return size; + } + template auto async_run(config const& cfg, Logger l, CompletionToken token) { diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 7424aea7..85b152d2 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -75,6 +75,12 @@ enum class error /// SSL handshake timeout ssl_handshake_timeout, + + /// Can't receive push synchronously without blocking + sync_receive_push_failed, + + /// Incompatible node depth. + incompatible_node_depth, }; /** \internal diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 9c83c145..9cd486c8 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -8,18 +8,12 @@ namespace boost::redis { -connection::connection( - executor_type ex, - asio::ssl::context::method method, - std::size_t max_read_size) -: impl_{ex, method, max_read_size} +connection::connection(executor_type ex, conn_config const& cfg) +: impl_{ex, cfg} { } -connection::connection( - asio::io_context& ioc, - asio::ssl::context::method method, - std::size_t max_read_size) -: impl_{ioc.get_executor(), method, max_read_size} +connection::connection(asio::io_context& ioc, conn_config const& cfg) +: impl_{ioc.get_executor(), cfg} { } void diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index 9f5c06eb..2ac903ea 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -41,6 +41,8 @@ struct error_category_impl : system::error_category { case error::resolve_timeout: return "Resolve timeout."; case error::connect_timeout: return "Connect timeout."; case error::pong_timeout: return "Pong timeout."; + case error::sync_receive_push_failed: return "Can't receive server push synchronously without blocking."; + case error::incompatible_node_depth: return "Incompatible node depth."; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/response.ipp b/include/boost/redis/impl/response.ipp new file mode 100644 index 00000000..c2306e1e --- /dev/null +++ b/include/boost/redis/impl/response.ipp @@ -0,0 +1,48 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#include + +namespace boost::redis +{ + +void consume_one(generic_response& r, system::error_code& ec) +{ + if (r.has_error()) + return; // Nothing to consume. + + if (std::empty(r.value())) + return; // Nothing to consume. + + auto const depth = r.value().front().depth; + + // To simplify we will refuse to consume any data-type that is not + // a root node. I think there is no use for that and it is complex + // since it requires updating parent nodes. + if (depth != 0) { + ec = error::incompatible_node_depth; + return; + } + + auto f = [depth](auto const& e) + { return e.depth == depth; }; + + auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f); + + r.value().erase(std::cbegin(r.value()), match); +} + +void consume_one(generic_response& r) +{ + system::error_code ec; + consume_one(r, ec); + if (ec) + throw system::system_error(ec); +} + +} // boost::redis::resp3 diff --git a/include/boost/redis/response.hpp b/include/boost/redis/response.hpp index 5f6c5c37..b3f76ad5 100644 --- a/include/boost/redis/response.hpp +++ b/include/boost/redis/response.hpp @@ -9,12 +9,14 @@ #include #include +#include #include #include #include -namespace boost::redis { +namespace boost::redis +{ /** @brief Response with compile-time size. * @ingroup high-level-api @@ -32,6 +34,47 @@ using response = std::tuple...>; */ using generic_response = adapter::result>; -} // boost::redis::resp3 +/** @brief Consume on response from a generic response + * + * This function rotates the elements so that the start of the next + * response becomes the new front element. For example the output of + * the following code + * + * @code + * request req; + * req.push("PING", "one"); + * req.push("PING", "two"); + * req.push("PING", "three"); + * + * generic_response resp; + * co_await conn->async_exec(req, resp, asio::deferred); + * + * std::cout << "PING: " << resp.value().front().value << std::endl; + * consume_one(resp); + * std::cout << "PING: " << resp.value().front().value << std::endl; + * consume_one(resp); + * std::cout << "PING: " << resp.value().front().value << std::endl; + * @code + * + * is + * + * @code + * PING: one + * PING: two + * PING: three + * @code + * + * Given that this function rotates elements, it won't be very + * efficient for responses with a large number of elements. It was + * introduced mainly to deal with buffers server pushes as shown in + * the cpp20_subscriber.cpp example. In the future queue-like + * responses might be introduced to consume in O(1) operations. + */ +void consume_one(generic_response& r, system::error_code& ec); + +/// Throwing overload of `consume_one`. +void consume_one(generic_response& r); + +} // boost::redis #endif // BOOST_REDIS_RESPONSE_HPP diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 3a06c3e0..7075bf13 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/tests/common.hpp b/tests/common.hpp index b5bc7dab..bbe3e3f2 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -21,5 +21,5 @@ run( boost::redis::config cfg = {}, boost::system::error_code ec = boost::asio::error::operation_aborted, boost::redis::operation op = boost::redis::operation::receive, - boost::redis::logger::level l = boost::redis::logger::level::info); + boost::redis::logger::level l = boost::redis::logger::level::disabled); diff --git a/tests/test_conn_check_health.cpp b/tests/test_conn_check_health.cpp index 91ece76a..c070787b 100644 --- a/tests/test_conn_check_health.cpp +++ b/tests/test_conn_check_health.cpp @@ -48,8 +48,12 @@ struct push_callback { } BOOST_TEST(resp2->has_value()); - BOOST_TEST(!resp2->value().empty()); - std::clog << "Event> " << resp2->value().front().value << std::endl; + + if (resp2->value().empty()) { + std::clog << "Event> empty" << std::endl; + } else { + std::clog << "Event> " << resp2->value().front().value << std::endl; + } ++i; @@ -74,7 +78,6 @@ BOOST_AUTO_TEST_CASE(check_health) { net::io_context ioc; - connection conn1{ioc}; request req1; @@ -83,10 +86,10 @@ BOOST_AUTO_TEST_CASE(check_health) config cfg1; cfg1.health_check_id = "conn1"; cfg1.reconnect_wait_interval = std::chrono::seconds::zero(); - error_code res1; + error_code ec1; conn1.async_run(cfg1, {}, [&](auto ec) { std::cout << "async_run 1 completed: " << ec.message() << std::endl; - res1 = ec; + ec1 = ec; }); //-------------------------------- @@ -97,10 +100,10 @@ BOOST_AUTO_TEST_CASE(check_health) config cfg2; cfg2.health_check_id = "conn2"; - error_code res2; + error_code ec2; conn2.async_run(cfg2, {}, [&](auto ec){ std::cout << "async_run 2 completed: " << ec.message() << std::endl; - res2 = ec; + ec2 = ec; }); request req2; @@ -119,8 +122,8 @@ BOOST_AUTO_TEST_CASE(check_health) ioc.run(); - BOOST_TEST(!!res1); - BOOST_TEST(!!res2); + BOOST_TEST(!!ec1); + BOOST_TEST(!!ec2); // Waits before exiting otherwise it might cause subsequent tests // to fail. diff --git a/tests/test_conn_echo_stress.cpp b/tests/test_conn_echo_stress.cpp index 0cbf3a4c..95115584 100644 --- a/tests/test_conn_echo_stress.cpp +++ b/tests/test_conn_echo_stress.cpp @@ -27,6 +27,7 @@ using boost::redis::logger; using boost::redis::config; using boost::redis::connection; using boost::redis::usage; +using boost::redis::error; std::ostream& operator<<(std::ostream& os, usage const& u) { @@ -45,8 +46,16 @@ auto push_consumer(std::shared_ptr conn, int expected) -> net::await { int c = 0; for (error_code ec;;) { - co_await conn->async_receive(redirect_error(net::use_awaitable, ec)); + conn->receive(ec); + if (ec == error::sync_receive_push_failed) { + ec = {}; + co_await conn->async_receive(redirect_error(net::use_awaitable, ec)); + } else if (!ec) { + //std::cout << "Skipping suspension." << std::endl; + } + if (ec) { + BOOST_TEST(false); std::cout << "push_consumer error: " << ec.message() << std::endl; co_return; } @@ -61,30 +70,10 @@ auto echo_session( std::shared_ptr conn, std::shared_ptr pubs, - std::string id, int n) -> net::awaitable { - auto ex = co_await net::this_coro::executor; - - request req; - response resp; - - for (auto i = 0; i < n; ++i) { - auto const msg = id + "/" + std::to_string(i); - //std::cout << msg << std::endl; - req.push("HELLO", 3); // Just to mess around. - req.push("PING", msg); - req.push("PING", "lsls"); // TODO: Change to HELLO after fixing issue 105. - boost::system::error_code ec; - co_await conn->async_exec(req, resp, redir(ec)); - - BOOST_TEST(!ec); - BOOST_REQUIRE_EQUAL(msg, std::get<1>(resp).value()); - req.clear(); - std::get<1>(resp).value().clear(); - + for (auto i = 0; i < n; ++i) co_await conn->async_exec(*pubs, ignore, net::deferred); - } } auto async_echo_stress(std::shared_ptr conn) -> net::awaitable @@ -103,19 +92,20 @@ auto async_echo_stress(std::shared_ptr conn) -> net::awaitable // Number of coroutines that will send pings sharing the same // connection to redis. - int const sessions = 500; + int const sessions = 1000; // The number of pings that will be sent by each session. - int const msgs = 1000; + int const msgs = 500; // The number of publishes that will be sent by each session with // each message. - int const n_pubs = 10; + int const n_pubs = 100; // This is the total number of pushes we will receive. int total_pushes = sessions * msgs * n_pubs + 1; auto pubs = std::make_shared(); + pubs->push("PING"); for (int i = 0; i < n_pubs; ++i) pubs->push("PUBLISH", "channel", "payload"); @@ -124,7 +114,7 @@ auto async_echo_stress(std::shared_ptr conn) -> net::awaitable net::co_spawn(ex, push_consumer(conn, total_pushes), net::detached); for (int i = 0; i < sessions; ++i) - net::co_spawn(ex, echo_session(conn, pubs, std::to_string(i), msgs), net::detached); + net::co_spawn(ex, echo_session(conn, pubs, msgs), net::detached); } BOOST_AUTO_TEST_CASE(echo_stress) @@ -134,8 +124,10 @@ BOOST_AUTO_TEST_CASE(echo_stress) net::co_spawn(ioc, async_echo_stress(conn), net::detached); ioc.run(); - std::cout << "-------------------\n" - << conn->get_usage() << std::endl; + std::cout + << "-------------------\n" + << conn->get_usage() + << std::endl; } #else diff --git a/tests/test_conn_push.cpp b/tests/test_conn_push.cpp index ab2572ce..40eff4ea 100644 --- a/tests/test_conn_push.cpp +++ b/tests/test_conn_push.cpp @@ -26,6 +26,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore; using boost::redis::ignore_t; +using boost::system::error_code; using redis::config; using boost::redis::logger; using namespace std::chrono_literals; @@ -49,7 +50,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) auto c3 =[](auto ec, auto...) { - BOOST_TEST(!!ec); + std::cout << "c3: " << ec.message() << std::endl; }; auto c2 =[&, conn](auto ec, auto...) @@ -73,8 +74,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) std::cout << "async_receive" << std::endl; BOOST_TEST(!ec); push_received = true; - conn->cancel(operation::run); - conn->cancel(operation::reconnection); + conn->cancel(); }); ioc.run(); @@ -87,29 +87,45 @@ BOOST_AUTO_TEST_CASE(push_received1) net::io_context ioc; auto conn = std::make_shared(ioc); + // Trick: Uses SUBSCRIBE because this command has no response or + // better said, its response is a server push, which is what we + // want to test. We send two because we want to test both + // async_receive and receive. request req; - //req.push("HELLO", 3); - req.push("SUBSCRIBE", "channel"); + req.push("SUBSCRIBE", "channel1"); + req.push("SUBSCRIBE", "channel2"); conn->async_exec(req, ignore, [conn](auto ec, auto){ std::cout << "async_exec" << std::endl; BOOST_TEST(!ec); }); - run(conn); - - bool push_received = false; + bool push_async_received = false; conn->async_receive([&, conn](auto ec, auto){ - std::cout << "async_receive" << std::endl; + std::cout << "(1) async_receive" << std::endl; + BOOST_TEST(!ec); - push_received = true; - conn->cancel(operation::run); - conn->cancel(operation::reconnection); + push_async_received = true; + + // Receives the second push synchronously. + error_code ec2; + std::size_t res = 0; + res = conn->receive(ec2); + BOOST_TEST(!ec2); + BOOST_TEST(res != std::size_t(0)); + + // Tries to receive a third push synchronously. + ec2 = {}; + res = conn->receive(ec2); + BOOST_CHECK_EQUAL(ec2, boost::redis::make_error_code(boost::redis::error::sync_receive_push_failed)); + + conn->cancel(); }); + run(conn); ioc.run(); - BOOST_TEST(push_received); + BOOST_TEST(push_async_received); } BOOST_AUTO_TEST_CASE(push_filtered_out) diff --git a/tests/test_low_level.cpp b/tests/test_low_level.cpp index 6bfe8d0f..65f7ece0 100644 --- a/tests/test_low_level.cpp +++ b/tests/test_low_level.cpp @@ -30,15 +30,17 @@ namespace resp3 = boost::redis::resp3; using boost::system::error_code; using boost::redis::request; using boost::redis::response; +using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::adapter::result; using boost::redis::resp3::parser; using boost::redis::resp3::parse; +using boost::redis::consume_one; +using boost::redis::error; using boost::redis::adapter::adapt2; using node_type = result; -using vec_node_type = result>; using vec_type = result>; using op_vec_type = result>>; @@ -154,7 +156,7 @@ result> op_bool_ok = true; // TODO: Test a streamed string that is not finished with a string of // size 0 but other command comes in. -vec_node_type streamed_string_e1 +generic_response streamed_string_e1 {{ {boost::redis::resp3::type::streamed_string, 0, 1, ""} , {boost::redis::resp3::type::streamed_string_part, 1, 1, "Hell"} , {boost::redis::resp3::type::streamed_string_part, 1, 1, "o wor"} @@ -162,10 +164,10 @@ vec_node_type streamed_string_e1 , {boost::redis::resp3::type::streamed_string_part, 1, 1, ""} }}; -vec_node_type streamed_string_e2 +generic_response streamed_string_e2 {{{resp3::type::streamed_string, 0UL, 1UL, {}}, {resp3::type::streamed_string_part, 1UL, 1UL, {}} }}; -vec_node_type const push_e1a +generic_response const push_e1a {{ {resp3::type::push, 4UL, 0UL, {}} , {resp3::type::simple_string, 1UL, 1UL, "pubsub"} , {resp3::type::simple_string, 1UL, 1UL, "message"} @@ -173,10 +175,10 @@ vec_node_type const push_e1a , {resp3::type::simple_string, 1UL, 1UL, "some message"} }}; -vec_node_type const push_e1b +generic_response const push_e1b {{{resp3::type::push, 0UL, 0UL, {}}}}; -vec_node_type const set_expected1a +generic_response const set_expected1a {{{resp3::type::set, 6UL, 0UL, {}} , {resp3::type::simple_string, 1UL, 1UL, {"orange"}} , {resp3::type::simple_string, 1UL, 1UL, {"apple"}} @@ -192,7 +194,7 @@ muset_type const set_e1g{{"apple", "one", "orange", "orange", "three", "two"}}; vec_type const set_e1d = {{"orange", "apple", "one", "two", "three", "orange"}}; op_vec_type const set_expected_1e = set_e1d; -vec_node_type const array_e1a +generic_response const array_e1a {{ {resp3::type::array, 3UL, 0UL, {}} , {resp3::type::blob_string, 1UL, 1UL, {"11"}} , {resp3::type::blob_string, 1UL, 1UL, {"22"}} @@ -202,12 +204,12 @@ vec_node_type const array_e1a result> const array_e1b{{11, 22, 3}}; result> const array_e1c{{"11", "22", "3"}}; result> const array_e1d{}; -vec_node_type const array_e1e{{{resp3::type::array, 0UL, 0UL, {}}}}; +generic_response const array_e1e{{{resp3::type::array, 0UL, 0UL, {}}}}; array_type const array_e1f{{11, 22, 3}}; result> const array_e1g{{11, 22, 3}}; result> const array_e1h{{11, 22, 3}}; -vec_node_type const map_expected_1a +generic_response const map_expected_1a {{ {resp3::type::map, 4UL, 0UL, {}} , {resp3::type::blob_string, 1UL, 1UL, {"key1"}} , {resp3::type::blob_string, 1UL, 1UL, {"value1"}} @@ -263,7 +265,7 @@ tuple8_type const map_e1f , std::string{"key3"}, std::string{"value3"} }; -vec_node_type const attr_e1a +generic_response const attr_e1a {{ {resp3::type::attribute, 1UL, 0UL, {}} , {resp3::type::simple_string, 1UL, 1UL, "key-popularity"} , {resp3::type::map, 2UL, 1UL, {}} @@ -273,7 +275,7 @@ vec_node_type const attr_e1a , {resp3::type::doublean, 1UL, 2UL, "0.0012"} } }; -vec_node_type const attr_e1b +generic_response const attr_e1b {{{resp3::type::attribute, 0UL, 0UL, {}} }}; #define S01a "#11\r\n" @@ -407,7 +409,7 @@ vec_node_type const attr_e1b test(make_expected(S04e, array_type2{}, boost::redis::error::incompatible_size));\ test(make_expected(S04e, tuple_int_2{}, boost::redis::error::incompatible_size));\ test(make_expected(S04f, array_type2{}, boost::redis::error::nested_aggregate_not_supported));\ - test(make_expected(S04g, vec_node_type{}, boost::redis::error::exceeeds_max_nested_depth));\ + test(make_expected(S04g, generic_response{}, boost::redis::error::exceeeds_max_nested_depth));\ test(make_expected(S04h, array_e1d));\ test(make_expected(S04h, array_e1e));\ test(make_expected(S04i, set_type{}, boost::redis::error::expects_resp3_set)); \ @@ -418,7 +420,7 @@ vec_node_type const attr_e1b test(make_expected(S09a, set_expected1a)); \ test(make_expected(S09a, set_expected_1e)); \ test(make_expected(S09a, set_type{{"apple", "one", "orange", "three", "two"}})); \ - test(make_expected(S09b, vec_node_type{{{resp3::type::set, 0UL, 0UL, {}}}})); \ + test(make_expected(S09b, generic_response{{{resp3::type::set, 0UL, 0UL, {}}}})); \ test(make_expected(S03c, map_type{}));\ test(make_expected(S11a, node_type{{resp3::type::doublean, 1UL, 0UL, {"1.23"}}}));\ test(make_expected(S11b, node_type{{resp3::type::doublean, 1UL, 0UL, {"inf"}}}));\ @@ -496,7 +498,7 @@ void check_error(char const* name, boost::redis::error ev) static_cast::type>(ev))); } -BOOST_AUTO_TEST_CASE(error) +BOOST_AUTO_TEST_CASE(cover_error) { check_error("boost.redis", boost::redis::error::invalid_data_type); check_error("boost.redis", boost::redis::error::not_a_number); @@ -514,6 +516,11 @@ BOOST_AUTO_TEST_CASE(error) check_error("boost.redis", boost::redis::error::not_a_double); check_error("boost.redis", boost::redis::error::resp3_null); check_error("boost.redis", boost::redis::error::not_connected); + check_error("boost.redis", boost::redis::error::resolve_timeout); + check_error("boost.redis", boost::redis::error::connect_timeout); + check_error("boost.redis", boost::redis::error::pong_timeout); + check_error("boost.redis", boost::redis::error::ssl_handshake_timeout); + check_error("boost.redis", boost::redis::error::sync_receive_push_failed); } std::string get_type_as_str(boost::redis::resp3::type t) @@ -602,3 +609,58 @@ BOOST_AUTO_TEST_CASE(adapter_as) adapter(e, ec); } } + +BOOST_AUTO_TEST_CASE(cancel_one_1) +{ + auto resp = push_e1a; + BOOST_TEST(resp.has_value()); + + consume_one(resp); + BOOST_TEST(resp.value().empty()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_empty) +{ + generic_response resp; + BOOST_TEST(resp.has_value()); + + consume_one(resp); + BOOST_TEST(resp.value().empty()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_has_error) +{ + generic_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}}; + BOOST_TEST(resp.has_error()); + + consume_one(resp); + BOOST_TEST(resp.has_error()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_has_does_not_consume_past_the_end) +{ + auto resp = push_e1a; + BOOST_TEST(resp.has_value()); + resp.value().insert( + std::cend(resp.value()), + std::cbegin(push_e1a.value()), + std::cend(push_e1a.value())); + + consume_one(resp); + + BOOST_CHECK_EQUAL(resp.value().size(), push_e1a.value().size()); +} + +BOOST_AUTO_TEST_CASE(cancel_one_incompatible_depth) +{ + auto resp = streamed_string_e1; + BOOST_TEST(resp.has_value()); + + error_code ec; + consume_one(resp, ec); + + error_code expected = error::incompatible_node_depth; + BOOST_CHECK_EQUAL(ec, expected); + + BOOST_CHECK_EQUAL(resp.value().size(), push_e1a.value().size()); +}