Skip to content

Commit

Permalink
Replaced run_with_timeout by asio::cancel_after
Browse files Browse the repository at this point in the history
close #351
  • Loading branch information
anarthal authored Oct 4, 2024
1 parent 268aa33 commit 146b555
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 459 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
#include <boost/mysql/detail/connection_pool_fwd.hpp>

#include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
#include <boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp>
#include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>

Expand Down Expand Up @@ -103,6 +102,19 @@ class basic_connection_node : public intrusive::list_base_hook<>,
shared_st_->last_connect_diag = create_connect_diagnostics(ec, connect_diag_);
}

template <class Op, class Self>
void run_with_timeout(Op&& op, std::chrono::steady_clock::duration timeout, Self& self)
{
if (timeout.count() > 0)
{
std::forward<Op>(op)(asio::cancel_after(timer_, timeout, std::move(self)));
}
else
{
std::forward<Op>(op)(std::move(self));
}
}

struct connection_task_op
{
this_type& node_;
Expand All @@ -126,49 +138,44 @@ class basic_connection_node : public intrusive::list_base_hook<>,
// Invoke the sans-io algorithm
last_act_ = node_.resume(ec, col_st);

// Apply the next action. run_with_timeout makes sure that all handlers
// are dispatched using the timer's executor (that is, the pool executor)
// Apply the next action
switch (last_act_)
{
case next_connection_action::connect:
run_with_timeout(
node_.run_with_timeout(
node_.conn_
.async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
node_.timer_,
node_.params_->connect_timeout,
std::move(self)
self
);
break;
case next_connection_action::sleep_connect_failed:
node_.timer_.expires_after(node_.params_->retry_interval);
node_.timer_.async_wait(std::move(self));
break;
case next_connection_action::ping:
run_with_timeout(
node_.run_with_timeout(
node_.conn_.async_ping(asio::deferred),
node_.timer_,
node_.params_->ping_timeout,
std::move(self)
self
);
break;
case next_connection_action::reset:
run_with_timeout(
node_.run_with_timeout(
node_.conn_.async_run_pipeline(
*node_.reset_pipeline_req_,
node_.reset_pipeline_res_,
asio::deferred
),
node_.timer_,
node_.params_->ping_timeout,
std::move(self)
self
);
break;
case next_connection_action::idle_wait:
run_with_timeout(
node_.run_with_timeout(
node_.collection_timer_.async_wait(asio::deferred),
node_.timer_,
node_.params_->ping_interval,
std::move(self)
self
);
break;
case next_connection_action::none: self.complete(error_code()); break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class basic_pool_impl
obj->maybe_create_connection();

// Wait to be notified, or until a cancellation happens
BOOST_MYSQL_YIELD(resume_point, 2, obj->wait_for_connections(self);)
BOOST_MYSQL_YIELD(resume_point, 2, obj->wait_for_connections(self))

// Remember that we have waited, so completions are dispatched
// correctly
Expand Down
164 changes: 0 additions & 164 deletions include/boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp

This file was deleted.

1 change: 0 additions & 1 deletion test/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ add_executable(
test/execution_processor/static_results_impl.cpp

test/connection_pool/wait_group.cpp
test/connection_pool/run_with_timeout.cpp
test/connection_pool/sansio_connection_node.cpp
test/connection_pool/connection_pool_impl.cpp

Expand Down
1 change: 0 additions & 1 deletion test/unit/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ run
test/execution_processor/static_results_impl.cpp

test/connection_pool/wait_group.cpp
test/connection_pool/run_with_timeout.cpp
test/connection_pool/sansio_connection_node.cpp
test/connection_pool/connection_pool_impl.cpp

Expand Down
15 changes: 13 additions & 2 deletions test/unit/test/connection_pool/connection_pool_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
#include <boost/asio/compose.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/ssl/context.hpp>
Expand Down Expand Up @@ -118,6 +120,15 @@ class mock_connection
asio::experimental::channel<void(error_code, fn_type)> to_test_chan_;
asio::experimental::channel<void(error_code, diagnostics)> from_test_chan_;

// Transforms the channel-specific cancel code into asio::error::operation_aborted,
// which is returned by connections when operations get cancelled
static error_code transform_ec(error_code input)
{
return input == asio::experimental::channel_errc::channel_cancelled
? asio::error::operation_aborted
: input;
}

// Code shared between all mocked ops
struct mocked_op
{
Expand All @@ -138,7 +149,7 @@ class mock_connection
if (ec)
{
// We were cancelled
self.complete(ec);
self.complete(transform_ec(ec));
}
else
{
Expand All @@ -153,7 +164,7 @@ class mock_connection
// Done
if (diag)
*diag = std::move(recv_diag);
self.complete(ec);
self.complete(transform_ec(ec));
}
};

Expand Down
Loading

0 comments on commit 146b555

Please sign in to comment.