Skip to content

Commit

Permalink
http/client: Add timeout argument to make_request()
Browse files Browse the repository at this point in the history
And propagate it down the call stack to the interesting places. Next
patch will make the code obey the provided timeout.

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jun 26, 2024
1 parent eaf51a3 commit 5b3270e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
13 changes: 8 additions & 5 deletions include/seastar/http/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <seastar/http/reply.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/util/modules.hh>

namespace bi = boost::intrusive;
Expand Down Expand Up @@ -169,6 +170,7 @@ class client {
public:
using reply_handler = noncopyable_function<future<>(const reply&, input_stream<char>&& body)>;
using retry_requests = bool_class<struct retry_requests_tag>;
using clock_type = lowres_clock;

private:
friend class http::internal::client_ref;
Expand All @@ -185,17 +187,17 @@ private:

using connection_ptr = seastar::shared_ptr<connection>;

future<connection_ptr> get_connection();
future<connection_ptr> make_connection();
future<connection_ptr> get_connection(clock_type::time_point timeout);
future<connection_ptr> make_connection(clock_type::time_point timeout);
future<> put_connection(connection_ptr con);
future<> shrink_connections();

template <std::invocable<connection&> Fn>
auto with_connection(Fn&& fn);
auto with_connection(Fn&& fn, clock_type::time_point timeout);

template <typename Fn>
requires std::invocable<Fn, connection&>
auto with_new_connection(Fn&& fn);
auto with_new_connection(Fn&& fn, clock_type::time_point timeout);

future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional<reply::status_type> expected);

Expand Down Expand Up @@ -275,12 +277,13 @@ public:
* \param req -- request to be sent
* \param handle -- the response handler
* \param expected -- the optional expected reply status code, default is std::nullopt
* \param send_timeout -- time point at which make_request will stop waiting for transport
*
* Note that the handle callback should be prepared to be called more than once, because
* client may restart the whole request processing in case server closes the connection
* in the middle of operation
*/
future<> make_request(request req, reply_handler handle, std::optional<reply::status_type> expected = std::nullopt);
future<> make_request(request req, reply_handler handle, std::optional<reply::status_type> expected = std::nullopt, clock_type::time_point send_timeout = clock_type::time_point::max());

/**
* \brief Updates the maximum number of connections a client may have
Expand Down
28 changes: 14 additions & 14 deletions src/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ client::client(std::unique_ptr<connection_factory> f, unsigned max_connections,
{
}

future<client::connection_ptr> client::get_connection() {
future<client::connection_ptr> client::get_connection(clock_type::time_point timeout) {
if (!_pool.empty()) {
connection_ptr con = _pool.front().shared_from_this();
_pool.pop_front();
Expand All @@ -252,15 +252,15 @@ future<client::connection_ptr> client::get_connection() {
}

if (_nr_connections >= _max_connections) {
return _wait_con.wait().then([this] {
return get_connection();
return _wait_con.wait().then([this, timeout] {
return get_connection(timeout);
});
}

return make_connection();
return make_connection(timeout);
}

future<client::connection_ptr> client::make_connection() {
future<client::connection_ptr> client::make_connection(clock_type::time_point timeout) {
_total_new_connections++;
return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable {
http_log.trace("created new http connection {}", cs.local_address());
Expand Down Expand Up @@ -311,8 +311,8 @@ future<> client::set_maximum_connections(unsigned nr) {
}

template <std::invocable<connection&> Fn>
auto client::with_connection(Fn&& fn) {
return get_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable {
auto client::with_connection(Fn&& fn, clock_type::time_point timeout) {
return get_connection(timeout).then([this, fn = std::move(fn)] (connection_ptr con) mutable {
return fn(*con).finally([this, con = std::move(con)] () mutable {
return put_connection(std::move(con));
});
Expand All @@ -321,22 +321,22 @@ auto client::with_connection(Fn&& fn) {

template <typename Fn>
requires std::invocable<Fn, connection&>
auto client::with_new_connection(Fn&& fn) {
return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable {
auto client::with_new_connection(Fn&& fn, clock_type::time_point timeout) {
return make_connection(timeout).then([this, fn = std::move(fn)] (connection_ptr con) mutable {
return fn(*con).finally([this, con = std::move(con)] () mutable {
return put_connection(std::move(con));
});
});
}

future<> client::make_request(request req, reply_handler handle, std::optional<reply::status_type> expected) {
return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable {
future<> client::make_request(request req, reply_handler handle, std::optional<reply::status_type> expected, clock_type::time_point send_timeout) {
return do_with(std::move(req), std::move(handle), [this, expected, send_timeout] (request& req, reply_handler& handle) mutable {
auto f = with_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});
}, send_timeout);

if (_retry) {
f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) {
f = f.handle_exception_type([this, &req, &handle, expected, send_timeout] (const std::system_error& ex) {
auto code = ex.code().value();
if ((code != EPIPE) && (code != ECONNABORTED)) {
return make_exception_future<>(ex);
Expand All @@ -347,7 +347,7 @@ future<> client::make_request(request req, reply_handler handle, std::optional<r
// break the limit. That's OK, the 'con' will be closed really soon
return with_new_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});
}, send_timeout);
});
}

Expand Down

0 comments on commit 5b3270e

Please sign in to comment.