Skip to content

Commit

Permalink
Implements batch reads for server pushes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimbres committed Sep 9, 2023
1 parent 4547e1a commit 0726d4d
Show file tree
Hide file tree
Showing 18 changed files with 362 additions and 163 deletions.
23 changes: 17 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,19 +677,30 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
### develop (incorporates changes to conform the boost review and more)

* Deprecates the `async_receive` overload that takes a response. Users
should now first call `set_receive_response` to avoid contantly seting
the same response.
should now first call `set_receive_response` to avoid constantly and
unnecessarily setting the same response.

* Uses `std::function` to type erase the response adapter. This change
should not influence users in any way but allowed important
simplification in the connections internals. This resulted in big
performance improvement where one of my benchmark programs passed
from 190k/s to 473k/s.
simplification in the connections internals. This resulted in
massive performance improvement.

* The connection has a new member `get_usage()` that returns the
connection usage information, such as number of bytes writen,
connection usage information, such as number of bytes written,
received etc.

* There are massive performance improvements in the consuming of
server pushes which are now communicated with an `asio::channel` and
therefore can be buffered. Batch reads are also supported by means
of `channle.try_send` and buffered messages can be consumed
synchronously with `connection::receive`. The function
`boost::redis::cancel_one` has been added to simplify processing
multiple server pushes contained in the same `generic_response`.
*IMPORTANT*: These changes may result in more than one push in the
response when `connection::async_receive` resumes. The user must
therefore be careful when calling `resp.clear()`: either ensure that
all message have been processed or just use `consume_one`.

### v1.4.2 (incorporates changes to conform the boost review and more)

* Adds `boost::redis::config::database_index` to make it possible to
Expand Down
16 changes: 13 additions & 3 deletions examples/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::ignore;
using boost::redis::error;
using boost::system::error_code;
using boost::redis::connection;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
Expand Down Expand Up @@ -58,20 +60,28 @@ receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
// Loop while reconnection is enabled
while (conn->will_reconnect()) {

// Reconnect to channels.
// Reconnect to the channels.
co_await conn->async_exec(req, ignore, asio::deferred);

// Loop reading Redis pushs messages.
for (error_code ec;;) {
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
// First tries to read any buffered pushes.
conn->receive(ec);
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
}

if (ec)
break; // Connection lost, break so we can reconnect to channels.

std::cout
<< resp.value().at(1).value
<< " " << resp.value().at(2).value
<< " " << resp.value().at(3).value
<< std::endl;
resp.value().clear();

consume_one(resp);
}
}
}
Expand Down
41 changes: 7 additions & 34 deletions include/boost/redis/adapter/detail/adapters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ class simple_impl {
void on_value_available(Result&) {}

template <class String>
void
operator()(
Result& result,
resp3::basic_node<String> const& n,
system::error_code& ec)
void operator()(Result& result, resp3::basic_node<String> const& n, system::error_code& ec)
{
if (is_aggregate(n.data_type)) {
ec = redis::error::expects_resp3_simple_type;
Expand All @@ -164,11 +160,7 @@ class set_impl {
{ hint_ = std::end(result); }

template <class String>
void
operator()(
Result& result,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
if (nd.data_type != resp3::type::set)
Expand Down Expand Up @@ -200,11 +192,7 @@ class map_impl {
{ current_ = std::end(result); }

template <class String>
void
operator()(
Result& result,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
if (element_multiplicity(nd.data_type) != 2)
Expand Down Expand Up @@ -239,11 +227,7 @@ class vector_impl {
void on_value_available(Result& ) { }

template <class String>
void
operator()(
Result& result,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
auto const m = element_multiplicity(nd.data_type);
Expand All @@ -264,11 +248,7 @@ class array_impl {
void on_value_available(Result& ) { }

template <class String>
void
operator()(
Result& result,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
if (i_ != -1) {
Expand Down Expand Up @@ -300,11 +280,7 @@ struct list_impl {
void on_value_available(Result& ) { }

template <class String>
void
operator()(
Result& result,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (!is_aggregate(nd.data_type)) {
BOOST_ASSERT(nd.aggregate_size == 1);
Expand Down Expand Up @@ -397,10 +373,7 @@ class wrapper<result<Result>> {
}

template <class String>
void
operator()(
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");

Expand Down
15 changes: 3 additions & 12 deletions include/boost/redis/adapter/detail/response_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ namespace boost::redis::adapter::detail
class ignore_adapter {
public:
template <class String>
void
operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
{
switch (nd.data_type) {
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
Expand Down Expand Up @@ -61,11 +60,7 @@ class static_adapter {
{ return size;}

template <class String>
void
operator()(
std::size_t i,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(std::size_t i, resp3::basic_node<String> const& nd, system::error_code& ec)
{
using std::visit;
// I am usure whether this should be an error or an assertion.
Expand All @@ -91,11 +86,7 @@ class vector_adapter {
{ return static_cast<std::size_t>(-1);}

template <class String>
void
operator()(
std::size_t,
resp3::basic_node<String> const& nd,
system::error_code& ec)
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
{
adapter_(nd, ec);
}
Expand Down
21 changes: 21 additions & 0 deletions include/boost/redis/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
#ifndef BOOST_REDIS_CONFIG_HPP
#define BOOST_REDIS_CONFIG_HPP

#include <boost/asio/ssl/context.hpp>

#include <string>
#include <chrono>
#include <optional>
#include <limits>

namespace boost::redis
{
Expand Down Expand Up @@ -80,6 +83,24 @@ struct config {
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
};

struct conn_config {
/// SSL context method
asio::ssl::context::method method = asio::ssl::context::tls_client;

/// Max read size passed to the internal `asio::dynamic_buffer`.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();

/** @brief Push maximum buffer size
*
* This value is passed to the underlying `asio::channel`. When
* the queued number of pushes reaches this value the connection
* won't read any more messages from the socket until at least one
* push is consumed from the buffer, this includes pushes and
* responses to commands.
*/
std::size_t push_max_buffer_size = 128;
};

} // boost::redis

#endif // BOOST_REDIS_CONFIG_HPP
50 changes: 29 additions & 21 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <chrono>
#include <memory>
#include <limits>

namespace boost::redis {
namespace detail
Expand Down Expand Up @@ -88,21 +87,15 @@ class basic_connection {

/// Contructs from an executor.
explicit
basic_connection(
executor_type ex,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
: impl_{ex, method, max_read_size}
basic_connection(executor_type ex, conn_config const& cfg = {})
: impl_{ex, cfg}
, timer_{ex}
{ }

/// Contructs from a context.
explicit
basic_connection(
asio::io_context& ioc,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
: basic_connection(ioc.get_executor(), method, max_read_size)
basic_connection(asio::io_context& ioc, conn_config const& cfg = {})
: basic_connection(ioc.get_executor(), cfg)
{ }

/** @brief Starts underlying connection operations.
Expand Down Expand Up @@ -188,6 +181,23 @@ class basic_connection {
auto async_receive(CompletionToken token = CompletionToken{})
{ return impl_.async_receive(std::move(token)); }


/** @brief Receives server pushes synchronously without blocking.
*
* Receives a server push synchronously by calling `try_receive` on
* the underlying channel. If the operation fails because
* `try_receive` returns `false`, `ec` will be set to
* `boost::redis::error::sync_receive_push_failed`.
*
* @param ec Contains the error if any occurred.
*
* @returns The number of bytes read from the socket.
*/
std::size_t receive(system::error_code& ec)
{
return impl_.receive(ec);
}

template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
Expand Down Expand Up @@ -325,18 +335,10 @@ class connection {
using executor_type = asio::any_io_executor;

/// Contructs from an executor.
explicit
connection(
executor_type ex,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
explicit connection(executor_type ex, conn_config const& cfg = {});

/// Contructs from a context.
explicit
connection(
asio::io_context& ioc,
asio::ssl::context::method method = asio::ssl::context::tls_client,
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
explicit connection(asio::io_context& ioc, conn_config const& cfg = {});

/// Returns the underlying executor.
executor_type get_executor() noexcept
Expand Down Expand Up @@ -367,6 +369,12 @@ class connection {
auto async_receive(CompletionToken token)
{ return impl_.async_receive(std::move(token)); }

/// Calls `boost::redis::basic_connection::receive`.
std::size_t receive(system::error_code& ec)
{
return impl_.receive(ec);
}

/// Calls `boost::redis::basic_connection::async_exec`.
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
Expand Down
38 changes: 28 additions & 10 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ struct reader_op {
}

if (res_.first == parse_result::push) {
BOOST_ASIO_CORO_YIELD
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
BOOST_ASIO_CORO_YIELD
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
}

if (ec) {
logger_.trace("reader-op: error. Exiting ...");
Expand Down Expand Up @@ -390,17 +392,13 @@ class connection_base {

using this_type = connection_base<Executor>;

/// Constructs from an executor.
connection_base(
executor_type ex,
asio::ssl::context::method method,
std::size_t max_read_size)
: ctx_{method}
connection_base(executor_type ex, conn_config const& cfg)
: ctx_{cfg.method}
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
, writer_timer_{ex}
, receive_channel_{ex}
, receive_channel_{ex, cfg.push_max_buffer_size}
, runner_{ex, {}}
, dbuf_{read_buffer_, max_read_size}
, dbuf_{read_buffer_, cfg.max_read_size}
{
set_receive_response(ignore);
writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
Expand Down Expand Up @@ -470,6 +468,26 @@ class connection_base {
auto async_receive(CompletionToken token)
{ return receive_channel_.async_receive(std::move(token)); }

std::size_t receive(system::error_code& ec)
{
std::size_t size = 0;

auto f = [&](system::error_code const& ec2, std::size_t n)
{
ec = ec2;
size = n;
};

auto const res = receive_channel_.try_receive(f);
if (ec)
return 0;

if (!res)
ec = error::sync_receive_push_failed;

return size;
}

template <class Logger, class CompletionToken>
auto async_run(config const& cfg, Logger l, CompletionToken token)
{
Expand Down
Loading

0 comments on commit 0726d4d

Please sign in to comment.