diff --git a/CMakeLists.txt b/CMakeLists.txt index f442d55c..60d07175 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,13 +19,76 @@ target_compile_features(boost_redis INTERFACE cxx_std_17) # Dependencies if (BOOST_REDIS_MAIN_PROJECT) - # If we're the root project, error if a dependency is not found - find_package(Boost 1.83 REQUIRED COMPONENTS headers) + # TODO: Understand why we have to list all dependencies below + # instead of + #set(BOOST_INCLUDE_LIBRARIES redis) + #set(BOOST_EXCLUDE_LIBRARIES redis) + #add_subdirectory(../.. boostorg/boost EXCLUDE_FROM_ALL) + + set(deps + system + assert + config + throw_exception + asio + variant2 + mp11 + winapi + predef + align + context + core + coroutine + static_assert + pool + date_time + smart_ptr + exception + integer + move + type_traits + algorithm + utility + io + lexical_cast + numeric/conversion + mpl + range + tokenizer + tuple + array + bind + concept_check + function + iterator + regex + unordered + preprocessor + container + conversion + container_hash + detail + optional + function_types + fusion + intrusive + describe + typeof + functional + test + json + ) + + foreach(dep IN LISTS deps) + add_subdirectory(../${dep} boostorg/${dep}) + endforeach() + find_package(Threads REQUIRED) find_package(OpenSSL REQUIRED) target_link_libraries(boost_redis INTERFACE - Boost::headers + Boost::system + Boost::asio Threads::Threads OpenSSL::Crypto OpenSSL::SSL diff --git a/README.md b/README.md index 69335443..5435e51d 100644 --- a/README.md +++ b/README.md @@ -676,6 +676,28 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ## Changelog +### Boost 1.85 + +* ([Issue 170](https://github.com/boostorg/redis/issues/170)) + Under load and on low-latency networks it is possible to start + receiving responses before the write operation completed and while + the request is still marked as staged and not written. This messes + up with the heuristics that classifies responses as unsolicied or + not. + +* ([Issue 168](https://github.com/boostorg/redis/issues/168)). + Provides a way of passing a custom SSL context to the connection. + The design here differs from that of Boost.Beast and Boost.MySql + since in Boost.Redis the connection owns the context instead of only + storing a reference to a user provided one. This is ok so because + apps need only one connection for their entire application, which + makes the overhead of one ssl-context per connection negligible. + +* ([Issue 169](https://github.com/boostorg/redis/issues/169)). + Allows setting a callback that is called before every attempt to + stablish a connection or reconnection. See `cpp20_intro_tls.cpp` for + an example. + ### Boost 1.84 (First release in Boost) * Deprecates the `async_receive` overload that takes a response. Users diff --git a/doc/on-the-costs-of-async-abstractions.md b/doc/on-the-costs-of-async-abstractions.md new file mode 100644 index 00000000..55615318 --- /dev/null +++ b/doc/on-the-costs-of-async-abstractions.md @@ -0,0 +1,671 @@ +# On the costs of asynchronous abstractions + +The biggest force behind the evolution of +[Boost.Redis](https://github.com/boostorg/redis) was my struggling in +coming up with a high-level connection abstraction that was capable of +multiplexing Redis commands from independent sources while +concurrently handling server pushes. This journey taught me many +important lessons, many of which are related to the design and +performance of asynchronous programs based on Boost.Asio. + +In this article I will share some of the lessons learned, specially +those related to the performance costs of _abstractions_ such as +`async_read_until` that tend to overschedule into the event-loop. In +this context I will also briefly comment on how the topics discussed +here influenced my views on the proposed +[P2300](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2300r7.html) +(a.k.a. Senders and Receivers), which is likely to become the basis of +networking in upcoming C++ standards. + +Although the analysis presented here uses the Redis communication +protocol for illustration I expect it to be useful in general since +[RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) shares +many similarities with other widely used protocols such as HTTP. + +## Parsing `\r\n`-delimited messages + +The Redis server communicates with its clients by exchanging data +serialized in +[RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) format. +Among the data types supported by this specification, the +`\r\n`-delimited messages are some of the most frequent in a typical +session. The table below shows some examples + + Command | Response | Wire format | RESP3 name + ---------|----------|---------------|--------------------- + PING | PONG | `+PONG\r\n` | simple-string + INCR | 42 | `:42\r\n` | number + GET | null | `_\r\n` | null + +Redis also supports command pipelines, which provide a way of +optimizing round-trip times by batching commands. A pipeline composed +by the commands shown in the previous table look like this + +``` + | Sent in a | + | single write | ++--------+ | | +-------+ +| | --------> PING + INCR + GET --------> | | +| | | | +| Client | | Redis | +| | | | +| | <-------- "+PONG\r\n:42\r\n_\r\n" <-------- | | ++--------+ |<------>|<---->|<-->| +-------+ + | | + | Responses | +``` + +Messages that use delimiters are so common in networking that a +facility called `async_read_until` for reading them incrementally from +a socket is already part of Boost.Asio. The coroutine below uses it to +print message contents to the screen + +```cpp +awaitable parse_resp3_simple_msgs(tcp::socket socket) +{ + for (std::string buffer;;) { + auto n = co_await async_read_until(socket, dynamic_buffer(buffer), "\r\n"); + + std::cout << buffer.substr(1, n - 3) << std::endl; + + // Consume the buffer. + buffer.erase(0, n); + } +} +``` + +If we pay attention to the buffer content as it is parsed by the code +above we can see it is rotated fairly often, for example + +``` + "+PONG\r\n:100\r\n+OK\r\n_\r\n" + ":100\r\n+OK\r\n_\r\n" + "+OK\r\n_\r\n" + "_\r\n" + "" +``` + +When I first realized these, apparently excessive, buffer rotations I +was concerned they would impact the performance of Boost.Redis in a +severe way. To measure the magnitude of this impact I came up with an +experimental implementation of Asio's `dynamic_buffer` that consumed +the buffer less eagerly than the `std::string::erase` function used +above. For that, the implementation increased a buffer offset up +to a certain threshold and only then triggered a (larger) rotation. +This is illustrated in the diagram below + +``` + |<---- offset threshold ---->| + | | + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + | # Initial message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<------>| # After 1st message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<-------------->| # After 2nd message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<--------------------->| # After 3rd message + + "+PONG\r\n:100\r\n+OK\r\n_\r\n+PONG\r\n" + |<-------------------------->| # 4th message crosses the threashold + + "+PONG\r\n" + | # After rotation +``` + +After comparing the performance differences between the two versions I +was surprised there wasn't any! But that was also very suspicious +since some RESP3 aggregate types contain a considerable number of +separators. For example, a map with two pairs `[(key1, value1), +(key2, value2)]` encoded in RESP3 requires ten rotations in total + +``` + "%2\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + "$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + "key1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + "$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n" + ... +``` + +It was evident something more costly was shadowing the buffer +rotations. But it couldn't be the search for the separator since it +performs equivalently to rotations. It is also easy to show that the +overhead is not related to any IO operation since the problem persists +if the buffer is never consumed (which causes the function to be +called with the same string repeatedly). Once these two factors +are removed from the table, we are driven into the conclusion that +calling `async_read_until` has an intrinsic cost, let us see what +that is. + +### Async operations that complete synchronously considered harmful + +Assume the scenario described earlier where `async_read_until` is used +to parse multiple `\r\n`-delimited messages. The following is a +detailed description of what happens behind the scenes + + 1. `async_read_until` calls `socket.async_read_some` repeatedly + until the separator `\r\n` shows up in the buffer + +``` + "" # Read 1: needs more data. + "" # Read 2: needs more data. + "" # Read 3: needs more data. + "" # Read 4: needs more data. + "\r\n" # separator found, done. +``` + + 2. The last call to `socket.async_read_some` happens to read past + the separator `\r\n` (depicted as `` above), + resulting in bonus (maybe incomplete) messages in the buffer + +``` + | 1st async_read_some | 2nd async_read_some | + | | | + "+message content here \r\n:100\r\n+OK\r\n_\r\n+incomplete respo" + | | | | + | Message wanted |<-- bonus msgs --->|<--incomplete-->| + | | msg | + | | | + | |<---------- bonus bytes ----------->| +``` + + 3. The buffer is consumed and `async_read_until` is called again. + However, since the buffer already contains the next message this + is an IO-less call + +``` + ":100\r\n+OK\r\n_\r\n+not enough byt" + | | | + | No IO required | Need more | + | to parse these | data | + | messages. | | +``` + +The fact that step 3. doesn't perform any IO implies the operation can +complete synchronously, but because this is an asynchronous function +Boost.Asio by default won't call the continuation before the +function returns. The implementation must therefore enqueue it for +execution, as depicted below + +``` + OP5 ---> OP4 ---> OP3 ---> OP2 ---> OP1 # Reschedules the continuation + | + OP1 schedules its continuation | + +-----------------------------------+ + | + | + OP6 ---> OP5 ---> OP4 ---> OP3 ---> OP2 # Reschedules the continuation + | + OP2 schedules its continuation | + +-----------------------------------+ + | + | + OP7 ---> OP6 ---> OP5 ---> OP4 ---> OP3 +``` + +When summed up, the excessive rescheduling of continuations lead to +performance degradation at scale. But since this is an event-loop +there is no way around rescheduling as doing otherwise would mean +allowing a task to monopolize the event-loop, preventing other tasks +from making progress. The best that can be done is to avoid +_overscheduling_, so let us determine how much rescheduling is too +much. + +## The intrinsic latency of an event-loop + +An event-loop is a design pattern originally used to handle events +external to the application, such as GUIs, networking and other forms +of IO. If we take this literally, it becomes evident that the way +`async_read_until` works is incompatible with an event-loop since +_searching for the separator_ is not an external event and as such +should not have to be enqueued for execution. + +Once we constrain ourselves to events that have an external origin, +such as anything related to IO and including any form of IPC, the +scheduling overhead is reduced considerably since the latency +of the transport layer eclipses whatever time it takes to schedule the +continuation, for example, according to +[these](https://www.boost.org/doc/libs/develop/libs/cobalt/doc/html/index.html#posting_to_an_executor) +benchmarks, the time it takes to schedule a task in the +`asio::io_context ` is approximately `50ns`. + +To give the reader an idea about the magnitude of this number, if +rescheduling alone were to account for 1% of the runtime of an app +that uses asynchronous IO to move around data in chunks of size 128kb, +then this app would have a throughput of approximately 24Gbs. At such +high throughput multiple other factors kick in before any scheduling +overhead even starts to manifest. + +It is therefore safe to say that only asynchronous operations that +don't perform or are not bound to any IO are ever likely to +overschedule in the sense described above. Those cases can be usually +avoided, this is what worked for Boost.Redis + + 1. `async_read_until` was replaced with calls to + `socket.async_read_some` and an incremental parser that does not + do any IO. + + 2. Channel `try_` functions are used to check if send and receive + operations can be called without suspension. For example, + `try_send` before `async_send` and `try_receive` before + `async_receive` ([see also](https://github.com/chriskohlhoff/asio/commit/fe4fd7acf145335eeefdd19708483c46caeb45e5) + `try_send_via_dispatch` for a more aggressive optimization). + + 3. Coalescing of individual requests into a single payload to reduce + the number of necessary writes on the socket,this is only + possible because Redis supports pipelining (good protocols + help!). + + 4. Increased the socket read sizes to 4kb to reduce the number of + reads (which is outweighed by the costs of rotating data in the + buffer). + + 5. Dropped the `resp3::async_read` abstraction. When I started + developing Boost.Redis there was convincing precedent for having + a `resp3::async_read` function to read complete RESP3 messages + from a socket + + Name | Description + ---------------------------------------|------------------- + `asio::ip::tcp::async_read` | Reads `n` bytes from a stream. + `beast::http::async_read` | Reads a complete HTTP message. + `beast::websocket::stream::async_read` | Reads a complete Websocket message. + `redis::async_read` | Reads a complete RESP3 message. + + It turns out however that this function is also vulnerable to + immediate completions since in command pipelines multiple + responses show up in the buffer after a call to + `socket.async_read_some`. When that happens each call to + `resp3::async_read` is IO-less. + +Sometimes it is not possible to avoid asynchronous operations that +complete synchronously, in the following sections we will therefore +see how favoring throughput over fairness works in Boost.Asio. + +### Calling the continuation inline + +In Boost.Asio it is possible to customize how an algorithm executes +the continuation when an immediate completion occurs, this includes +the ability of calling it inline, thereby avoiding the costs of +excessive rescheduling. Here is how it works + +```cpp +// (default) The continuation is enqueued for execution, regardless of +// whether it is immediate or not. +async_read_until(socket, buffer, "\r\n", continuation); + +// Immediate completions are executed in exec2 (otherwise equal to the +// version above). The completion is called inline if exec2 is the +same // executor that is running the operation. +async_read_until(socket, buffer, "\r\n", bind_immediate_executor(exec2, completion)); +``` + +To compare the performance of both cases I have written a small +function that calls `async_read_until` in a loop with a buffer that is +never consumed so that all completions are immediate. The version +below uses the default behaviour + +```cpp +void read_safe(tcp::socket& s, std::string& buffer) +{ + auto continuation = [&s, &buffer](auto ec, auto n) + { + read_safe(s, buffer); // Recursive call + }; + + // This won't cause stack exhaustion because the continuation is + // not called inline but posted in the event loop. + async_read_until(s, dynamic_buffer(buffer), "\r\n", continuation); +} +``` + +To optimize away some of the rescheduling the version below uses the +`bind_immediate_executor` customization to call the continuation +reentrantly and then breaks the stack from time to time to avoid +exhausting it + +```cpp +void read_reentrant(tcp::socket& s, std::string& buffer) +{ + auto cont = [&](auto, auto) + { + read_reentrant(s, buffer); // Recursive call + }; + + // Breaks the callstack after 16 inline calls. + if (counter % 16 == 0) { + post(s.get_executor(), [cont](){cont({}, 0);}); + return; + } + + // Continuation called reentrantly. + async_read_until(s, dynamic_buffer(buffer), "\r\n", + bind_immediate_executor(s.get_executor(), cont)); +} +``` + +The diagram below shows what the reentrant chain of calls in the code +above look like from the event-loop point of view + +``` + OP5 ---> OP4 ---> OP3 ---> OP2 ---> OP1a # Completes immediately + | + | + ... | + OP1b # Completes immediately + | + Waiting for OP5 to | + reschedule its | + continuation OP1c # Completes immediately + | + | + ... | + OP1d # Break the call-stack + | + +-----------------------------------+ + | + OP6 ---> OP5 ---> OP4 ---> OP3 ---> OP2 +``` + +Unsurprisingly, the reentrant code is 3x faster than the one that +relies on the default behaviour (don't forget that this is a best case +scenario, in the general case not all completions are immediate). +Although faster, this strategy has some downsides + + - The overall operation is not as fast as possible since it still + has to reschedule from time to time to break the call stack. The + less it reschedules the higher the risk of exhausting it. + + - It is too easy to forget to break the stack. For example, the + programmer might decide to branch somewhere into another chain of + asynchronous calls that also use this strategy. To avoid + exhaustion all such branches would have to be safeguarded with a + manual rescheduling i.e. `post`. + + - Requires additional layers of complexity such as + `bind_immediate_executor` in addition to `bind_executor`. + + - Not compliat with more strict + [guidelines](https://en.wikipedia.org/wiki/The_Power_of_10:_Rules_for_Developing_Safety-Critical_Code) + that prohibits reentrat code. + + - There is no simple way of choosing the maximum allowed number of + reentrant calls for each function in a way that covers different + use cases and users. Library writers and users would be tempted + into using a small value reducing the performance advantage. + + - If the socket is always ready for reading the task will + monopolize IO for up to `16` interactions which might cause + stutter in unrelated tasks as depicted below + +``` + Unfairness + + +----+----+----+ +----+----+----+ +----+----+----+ +Socket-1 | | | | | | | | | | | | + +----+----+----+----+----+----+----+----+----+----+----+----+ +Socket-2 | | | | | | + +----+ +----+ +----+ +``` + +From the aesthetic point of view the code above is also unpleasant as +it breaks the function asynchronous contract by injecting a reentrant +behaviour. It gives me the same kind of feeling I have about +[recursive +mutexes](http://www.zaval.org/resources/library/butenhof1.html). + +Note: It is worth mentioning here that a similar +[strategy](https://github.com/NVIDIA/stdexec/blob/6f23dd5b1d523541ce28af32fc2603403ebd36ed/include/exec/trampoline_scheduler.hpp#L52) +is used to break the call stack of repeating algorithms in +[stdexec](https://github.com/NVIDIA/stdexec), but in this time +based on +[P2300](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2300r7.html) +and not on Boost.Asio. + +### Coroutine tail-calls + +In the previous section we have seen how to avoid overscheduling by +instructing the asynchronous operation to call the completion inline +on immediate completion. It turns out however that coroutine support +for _tail-calls_ provide a way to completely sidestep this problem. +This feature is described by +[Backer](https://lewissbaker.github.io/2020/05/11/understanding_symmetric_transfer) +as follows + +> A tail-call is one where the current stack-frame is popped before +> the call and the current function’s return address becomes the +> return-address for the callee. ie. the callee will return directly +> the the [sic] caller of this function. + +This means (at least in principle) that a library capable of using +tail-calls when an immediate completion occurs neither has to +reschedule the continuation nor call it inline. To test how this +feature compares to the other styles I have used Boost.Cobalt. The +code looks as follows + +```cpp +// Warning: risks unfairness and starvation of other tasks. +task read_until_unfair() +{ + for (int i = 0; i != repeat; ++i) { + co_await async_read_until(s, dynamic_buffer(buffer), "\r\n", cobalt::use_op); + } +} +``` + +The result of this comparison as listed in the table below + +Time/s | Style | Configuration | Library +-------|-----------|-----------------------------|------------- + 1,0 | Coroutine | `await_ready` optimization | Boost.Cobalt + 4.8 | Callback | Reentant | Boost.Asio +10.3 | Coroutine | `use_op` | Boost.Cobalt +14.9 | Callback | Regular | Boost.Asio +15.6 | Coroutine | `asio::deferred` | Boost.Asio + +As the reader can see, `cobalt::use_op` ranks 3rd and is considerably +faster (10.3 vs 15.6) than the Asio equivalent that uses +default-rescheduling. However, by trading rescheduling with tail-calls +the code above can now monopolize the event-loop, resulting in +unfairness if the socket happens to receive data at a higher rate +than other tasks. If by chance data is received continuously +on a socket that is always ready for reading, other tasks will starve + +``` + Starvation + + +----+----+----+----+----+----+----+----+----+----+----+----+ +Socket-1 | | | | | | | | | | | | | + +----+----+----+----+----+----+----+----+----+----+----+----+ + +Socket-2 Starving ... + +``` + +To avoid this problem the programmer is forced to reschedule from time +to time, in the same way we did for the reentrant calls + +```cpp +task read_until_fair() +{ + for (int i = 0; i != repeat; ++i) { + if (repeat % 16 == 0) { + // Reschedules to address unfairness and starvation of + // other tasks. + co_await post(cobalt::use_op); + continue; + } + + co_await async_read_until(s, dynamic_buffer(buffer), "\r\n", cobalt::use_op); + } +} +``` + +Delegating fairness-safety to applications is a dangerous game. +This is a +[problem](https://tokio.rs/blog/2020-04-preemption) the Tokio +community had to deal with before Tokio runtime started enforcing +rescheduling (after 256 successful operations) + +> If data is received faster than it can be processed, it is possible +> that more data will have already been received by the time the +> processing of a data chunk completes. In this case, .await will +> never yield control back to the scheduler, other tasks will not be +> scheduled, resulting in starvation and large latency variance. + +> Currently, the answer to this problem is that the user of Tokio is +> responsible for adding yield points in both the application and +> libraries. In practice, very few actually do this and end up being +> vulnerable to this sort of problem. + +### Safety in P2300 (Senders and Receivers) + +As of this writing, the C++ standards committee (WG21) has been +pursuing the standardization of a networking library for almost 20 +years. One of the biggest obstacles that prevented it from happening +was a disagreement on what the _asynchronous model_ that underlies +networking should look like. Until 2021 that model was basically +Boost.Asio _executors_, but in this +[poll](https://www.reddit.com/r/cpp/comments/q6tgod/c_committee_polling_results_for_asynchronous/) +the committee decided to abandon that front and concentrate efforts on +the new [P2300](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2300r7.html) +proposal, also known as _senders and receivers_. The decision was +quite [abrupt](https://isocpp.org/files/papers/P2464R0.html) + +> The original plan about a week earlier than the actual writing of +> this paper was to write a paper that makes a case for standardizing +> the Networking TS. + +and opinions turned out to be very strong against Boost.Asio (see +[this](https://api.csswg.org/bikeshed/?force=1&url=https://raw.githubusercontent.com/brycelelbach/wg21_p2459_2022_january_library_evolution_poll_outcomes/main/2022_january_library_evolution_poll_outcomes.bs) +for how each voter backed their vote) + +> The whole concept is completely useless, there's no composed code +> you can write with it. + +The part of that debate that interests us most here is stated in +[P2471](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p2471r1.pdf), +that compares Boost.Asio with P2300 + +> Yes, default rescheduling each operation and default not +> rescheduling each operation, is a poor trade off. IMO both options +> are poor. The one good option that I know of that can prevent stack +> exhaustion is first-class tail-recursion in library or language + +> ASIO has chosen to require that every async operation must schedule +> the completion on a scheduler (every read, every write, etc..). + +> sender/receiver has not decided to +> require that the completion be scheduled. + +> This is why I consider tail-call the only good solution. Scheduling +> solutions are all inferior (give thanks to Lewis for this shift in +> my understanding :) ). + +Although tail-calls solve the problem of stack-exhaustion as we have +seen above, it makes the code vulnerable to unfairness and starvation +and therefore it is not an alternative to default-rescheduling as the +quotation above is implying. To deal with the lack of +default-rescheduling, libraries and applications built on top of P2300 +have to address the aforementioned problems, layer after layer. For +example, +[stdexec](https://github.com/NVIDIA/stdexec) has invented something +called +_[trampoline-scheduler](https://github.com/NVIDIA/stdexec/blob/e7cd275273525dbc693f4bf5f6dc4d4181b639e4/include/exec/trampoline_scheduler.hpp)_ +to protect repeating algorithms such as `repeat_effect_until` from +exhausting the stack. This construct however is built around +reentracy, allowing +[sixteen](https://github.com/NVIDIA/stdexec/blob/83cdb92d316e8b3bca1357e2cf49fc39e9bed403/include/exec/trampoline_scheduler.hpp#L52) +levels of inline calls by default. While in Boost.Asio it is possible to use +reentracy as an optimization for a corner cases, here it is made its +_modus operandi_, my opinion about this has already been stated in a +previous section so I won't repeat it here. + +Also the fact that a special scheduler is needed by specific +algorithms is a problem on its own since it contradicts one of the +main selling points of P2300 which is that of being _generic_. For +example, [P2464R0](https://isocpp.org/files/papers/P2464R0.html) uses +the code below as an example + +```cpp +void +run_that_io_operation( + scheduler auto sched, + sender_of auto wrapping_continuation) +{ + // snip +} +``` + +and states + +> I have no idea what the sched's concrete type is. I have no idea +> what the wrapping_continuation's concrete type is. They're none of +> my business, ... + +Hence, by being generic, the algorithms built on top of P2300 are also +unsafe (against stack-exhaustion, unfairness and starvation). Otherwise, +if library writers require a specific scheduler to ensure safety, then +the algorithms become automatically non-generic, pick your poison! + +The proposers of P2300 claim that it doesn't address safety because it +should be seen as the low-level building blocks of asynchronous +programming and that its the role of higher-level libraries, to deal +with that. This claim however does not hold since, as we have just +seen, Boost.Asio also provides those building blocks but does so in a +safe way. In fact during the whole development of Boost.Redis I never +had to think about these kinds of problems because safety is built +from the ground up. + +### Avoiding coroutine suspension with `await_ready` + +Now let us get back to the first place in the table above, which uses +the `await_ready` optimization from Boost.Cobalt. This API provides +users with the ability to avoid coroutine suspension altogether in +case the separator is already present in the buffer. It works by +defining a `struct` with the following interface + +```cpp +struct read_until : cobalt::op { + ... + + void ready(cobalt::handler handler) override + { + // Search for the separator in buffer and call the handler if found + } + + void initiate(cobalt::completion_handler complete) override + { + // Regular call to async_read_until. + async_read_until(socket, buffer, delim, std::move(complete)); + } +}; +``` + +and the code that uses it + +```cpp +for (int i = 0; i != repeat; ++i) { + co_await read_until(socket, dynamic_buffer(buffer)); +} +``` + +In essence, what the code above does is to skip a call to +`async_read_unil` by first checking with the ready function whether +the forthcoming operation is going to complete immediately. The +nice thing about it is that the programmer can use this optimization +only when a performance bottleneck is detected, without planing for it +in advance. The drawback however is that it requires reimplementing +the search for the separator in the body of the `ready` function, +defeating the purpose of using `async_read_until` in first place as +(again) it would have been simpler to reformulate the operation in +terms of `socket.async_read_some` directly. + +## Acknowledgements + +Thanks to Klemens Morgenstern for answering questions about +Boost.Cobalt. + diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 16531341..a400d21b 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -10,6 +10,9 @@ macro(make_example EXAMPLE_NAME STANDARD) if (${STANDARD} STREQUAL "20") target_link_libraries(${EXAMPLE_NAME} PRIVATE examples_main) endif() + if (${EXAMPLE_NAME} STREQUAL "cpp20_json") + target_link_libraries(${EXAMPLE_NAME} PRIVATE Boost::json Boost::container_hash) + endif() endmacro() macro(make_testable_example EXAMPLE_NAME STANDARD) @@ -46,4 +49,4 @@ endif() if (NOT MSVC) make_example(cpp20_chat_room 20) -endif() \ No newline at end of file +endif() diff --git a/example/cpp20_intro_tls.cpp b/example/cpp20_intro_tls.cpp index b98028ce..9f26ad1e 100644 --- a/example/cpp20_intro_tls.cpp +++ b/example/cpp20_intro_tls.cpp @@ -22,10 +22,16 @@ using boost::redis::connection; auto verify_certificate(bool, asio::ssl::verify_context&) -> bool { - std::cout << "set_verify_callback" << std::endl; + std::cout << "verify_certificate called" << std::endl; return true; } +auto prepare_callback = [](connection::next_layer_type& stream) +{ + stream.set_verify_mode(asio::ssl::verify_peer); + stream.set_verify_callback(verify_certificate); +}; + auto co_main(config cfg) -> asio::awaitable { cfg.use_ssl = true; @@ -35,6 +41,7 @@ auto co_main(config cfg) -> asio::awaitable cfg.addr.port = "6380"; auto conn = std::make_shared(co_await asio::this_coro::executor); + conn->set_prepare_callback(prepare_callback); conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); request req; @@ -42,9 +49,6 @@ auto co_main(config cfg) -> asio::awaitable response resp; - conn->next_layer().set_verify_mode(asio::ssl::verify_peer); - conn->next_layer().set_verify_callback(verify_certificate); - co_await conn->async_exec(req, resp, asio::deferred); conn->cancel(); diff --git a/example/cpp20_json.cpp b/example/cpp20_json.cpp index 2f0674f0..261a3f2f 100644 --- a/example/cpp20_json.cpp +++ b/example/cpp20_json.cpp @@ -15,13 +15,11 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -#define BOOST_JSON_NO_LIB -#define BOOST_CONTAINER_NO_LIB #include #include #include +#include #include -#include namespace asio = boost::asio; using namespace boost::describe; diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 664c6e5d..e1fbab1c 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -34,6 +34,8 @@ struct reconnection_op { { BOOST_ASIO_CORO_REENTER (coro_) for (;;) { + conn_->m_prepare_callback(conn_->next_layer()); + BOOST_ASIO_CORO_YIELD conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self)); conn_->cancel(operation::receive); @@ -78,6 +80,15 @@ class basic_connection { executor_type get_executor() noexcept { return impl_.get_executor(); } + /// Next layer type. + using next_layer_type = asio::ssl::stream>; + + /** Prepare callback type + * + * See set_prepare_callback for more information. + */ + using prepare_callback_type = std::function; + /// Rebinds the socket type to another executor. template struct rebind_executor @@ -86,13 +97,19 @@ class basic_connection { using other = basic_connection; }; - /// Contructs from an executor. + /** @brief Constructor + * + * @param ex Executor on which connection operation will run. + * @param ctx SSL context. + * @param max_read_size Maximum read size that is passed to + * the internal `asio::dynamic_buffer` constructor. + */ explicit basic_connection( executor_type ex, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()) - : impl_{ex, method, max_read_size} + : impl_{ex, std::move(ctx), max_read_size} , timer_{ex} { } @@ -100,9 +117,9 @@ class basic_connection { explicit basic_connection( asio::io_context& ioc, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()) - : basic_connection(ioc.get_executor(), method, max_read_size) + : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size) { } /** @brief Starts underlying connection operations. @@ -286,10 +303,6 @@ class basic_connection { auto const& get_ssl_context() const noexcept { return impl_.get_ssl_context();} - /// Returns the ssl context. - auto& get_ssl_context() noexcept - { return impl_.get_ssl_context();} - /// Resets the underlying stream. void reset_stream() { impl_.reset_stream(); } @@ -311,6 +324,30 @@ class basic_connection { usage get_usage() const noexcept { return impl_.get_usage(); } + /** @brief Set the prepare callback + * + * This callback is called before every new connect or reconnect + * attempt. It is specially useful for SSL connections, for example + * + * @code + * auto verify_certificate(bool, asio::ssl::verify_context&) -> bool + * { + * std::cout << "verify_certificate called" << std::endl; + * return true; + * } + * + * auto prepare_callback = [](connection::next_layer_type& stream) + * { + * stream.set_verify_mode(asio::ssl::verify_peer); + * stream.set_verify_callback(verify_certificate); + * }; + * @endcode + */ + void set_prepare_callback(prepare_callback_type callback) + { + m_prepare_callback = std::move(callback); + } + private: using timer_type = asio::basic_waitable_timer< @@ -323,6 +360,7 @@ class basic_connection { config cfg_; detail::connection_base impl_; timer_type timer_; + prepare_callback_type m_prepare_callback = [](next_layer_type&){ }; }; /** \brief A basic_connection that type erases the executor. @@ -339,18 +377,27 @@ class connection { /// Executor type. using executor_type = asio::any_io_executor; + /// Underlying connection type. + using underlying_type = basic_connection; + + /// Next layer type. + using next_layer_type = underlying_type::next_layer_type; + + /// Prepare callback type + using prepare_callback_type = underlying_type::prepare_callback_type; + /// Contructs from an executor. explicit connection( executor_type ex, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()); /// Contructs from a context. explicit connection( asio::io_context& ioc, - asio::ssl::context::method method = asio::ssl::context::tls_client, + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size = (std::numeric_limits::max)()); /// Returns the underlying executor. @@ -423,6 +470,14 @@ class connection { usage get_usage() const noexcept { return impl_.get_usage(); } + /// Returns the ssl context. + auto const& get_ssl_context() const noexcept + { return impl_.get_ssl_context();} + + /// Calls `boost::redis::basic_connection::set_prepare_callback`. + void set_prepare_callback(prepare_callback_type callback) + { impl_.set_prepare_callback(std::move(callback)); } + private: void async_run_impl( @@ -430,7 +485,7 @@ class connection { logger l, asio::any_completion_handler token); - basic_connection impl_; + underlying_type impl_; }; } // boost::redis diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index a954c0c8..043203bc 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -113,7 +113,7 @@ struct exec_op { asio::coroutine coro{}; template - void operator()(Self& self , system::error_code ec = {}) + void operator()(Self& self , system::error_code ec = {}, std::size_t = 0) { BOOST_ASIO_CORO_REENTER (coro) { @@ -130,7 +130,6 @@ struct exec_op { EXEC_OP_WAIT: BOOST_ASIO_CORO_YIELD info_->async_wait(std::move(self)); - BOOST_ASSERT(ec == asio::error::operation_aborted); if (info_->ec_) { self.complete(info_->ec_, 0); @@ -140,18 +139,18 @@ struct exec_op { if (info_->stop_requested()) { // Don't have to call remove_request as it has already // been by cancel(exec). - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } if (is_cancelled(self)) { - if (info_->is_written()) { + if (!info_->is_waiting()) { using c_t = asio::cancellation_type; auto const c = self.get_cancellation_state().cancelled(); if ((c & c_t::terminal) != c_t::none) { // Cancellation requires closing the connection // otherwise it stays in inconsistent state. conn_->cancel(operation::run); - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } else { // Can't implement other cancelation types, ignoring. self.get_cancellation_state().clear(); @@ -163,7 +162,7 @@ struct exec_op { } else { // Cancelation can be honored. conn_->remove_request(info_); - self.complete(ec, 0); + self.complete(asio::error::operation_aborted, 0); return; } } @@ -395,9 +394,9 @@ class connection_base { /// Constructs from an executor. connection_base( executor_type ex, - asio::ssl::context::method method, + asio::ssl::context ctx, std::size_t max_read_size) - : ctx_{method} + : ctx_{std::move(ctx)} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} , receive_channel_{ex, 256} @@ -412,10 +411,6 @@ class connection_base { auto const& get_ssl_context() const noexcept { return ctx_;} - /// Returns the ssl context. - auto& get_ssl_context() noexcept - { return ctx_;} - /// Resets the underlying stream. void reset_stream() { @@ -516,6 +511,7 @@ class connection_base { using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; using receiver_adapter_type = std::function const&, system::error_code&)>; + using exec_notifier_type = receive_channel_type; auto use_ssl() const noexcept { return runner_.get_config().use_ssl;} @@ -527,10 +523,10 @@ class connection_base { { BOOST_ASSERT(ptr != nullptr); - if (ptr->is_written()) { - return !ptr->req_->get_config().cancel_if_unresponded; - } else { + if (ptr->is_waiting()) { return !ptr->req_->get_config().cancel_on_connection_lost; + } else { + return !ptr->req_->get_config().cancel_if_unresponded; } }; @@ -544,7 +540,7 @@ class connection_base { reqs_.erase(point, std::end(reqs_)); std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return ptr->reset_status(); + return ptr->mark_waiting(); }); return ret; @@ -555,7 +551,7 @@ class connection_base { auto f = [](auto const& ptr) { BOOST_ASSERT(ptr != nullptr); - return ptr->is_written(); + return !ptr->is_waiting(); }; auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); @@ -615,25 +611,15 @@ class connection_base { using node_type = resp3::basic_node; using wrapped_adapter_type = std::function; - enum class action - { - stop, - proceed, - none, - }; - explicit req_info(request const& req, adapter_type adapter, executor_type ex) - : timer_{ex} - , action_{action::none} + : notifier_{ex, 1} , req_{&req} , adapter_{} , expected_responses_{req.get_expected_responses()} - , status_{status::none} + , status_{status::waiting} , ec_{{}} , read_size_{0} { - timer_.expires_at((std::chrono::steady_clock::time_point::max)()); - adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) { auto const i = req_->get_expected_responses() - expected_responses_; @@ -643,18 +629,16 @@ class connection_base { auto proceed() { - timer_.cancel(); - action_ = action::proceed; + notifier_.try_send(std::error_code{}, 0); } void stop() { - timer_.cancel(); - action_ = action::stop; + notifier_.close(); } - [[nodiscard]] auto is_waiting_write() const noexcept - { return !is_written() && !is_staged(); } + [[nodiscard]] auto is_waiting() const noexcept + { return status_ == status::waiting; } [[nodiscard]] auto is_written() const noexcept { return status_ == status::written; } @@ -668,27 +652,26 @@ class connection_base { void mark_staged() noexcept { status_ = status::staged; } - void reset_status() noexcept - { status_ = status::none; } + void mark_waiting() noexcept + { status_ = status::waiting; } [[nodiscard]] auto stop_requested() const noexcept - { return action_ == action::stop;} + { return !notifier_.is_open();} template auto async_wait(CompletionToken token) { - return timer_.async_wait(std::move(token)); + return notifier_.async_receive(std::move(token)); } //private: enum class status - { none + { waiting , staged , written }; - timer_type timer_; - action action_; + exec_notifier_type notifier_; request const* req_; wrapped_adapter_type adapter_; @@ -716,7 +699,7 @@ class connection_base { void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); + return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { @@ -737,7 +720,7 @@ class connection_base { if (info->req_->has_hello_priority()) { auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { - return e->is_waiting_write(); + return e->is_waiting(); }); std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); @@ -781,7 +764,7 @@ class connection_base { // Coalesces the requests and marks them staged. After a // successful write staged requests will be marked as written. auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { - return !ri->is_waiting_write(); + return !ri->is_waiting(); }); std::for_each(point, std::cend(reqs_), [this](auto const& ri) { @@ -798,7 +781,14 @@ class connection_base { bool is_waiting_response() const noexcept { - return !std::empty(reqs_) && reqs_.front()->is_written(); + if (std::empty(reqs_)) + return false; + + // Under load and on low-latency networks we might start + // receiving responses before the write operation completed and + // the request is still maked as staged and not written. See + // https://github.com/boostorg/redis/issues/170 + return !reqs_.front()->is_waiting(); } void close() @@ -814,36 +804,39 @@ class connection_base { auto is_next_push() { - // We handle unsolicited events in the following way - // - // 1. Its resp3 type is a push. - // - // 2. A non-push type is received with an empty requests - // queue. I have noticed this is possible (e.g. -MISCONF). - // I expect them to have type push so we can distinguish - // them from responses to commands, but it is a - // simple-error. If we are lucky enough to receive them - // when the command queue is empty we can treat them as - // server pushes, otherwise it is impossible to handle - // them properly - // - // 3. The request does not expect any response but we got - // one. This may happen if for example, subscribe with - // wrong syntax. - // - // Useful links: + BOOST_ASSERT(!read_buffer_.empty()); + + // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 - // - - BOOST_ASSERT(!read_buffer_.empty()); - - return - (resp3::to_type(read_buffer_.front()) == resp3::type::push) - || reqs_.empty() - || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0) - || !is_waiting_response(); // Added to deal with MONITOR. + // - https://github.com/boostorg/redis/issues/170 + + // The message's resp3 type is a push. + if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + return true; + + // This is non-push type and the requests queue is empty. I have + // noticed this is possible, for example with -MISCONF. I don't + // know why they are not sent with a push type so we can + // distinguish them from responses to commands. If we are lucky + // enough to receive them when the command queue is empty they + // can be treated as server pushes, otherwise it is impossible + // to handle them properly + if (reqs_.empty()) + return true; + + // The request does not expect any response but we got one. This + // may happen if for example, subscribe with wrong syntax. + if (reqs_.front()->expected_responses_ == 0) + return true; + + // Added to deal with MONITOR and also to fix PR170 which + // happens under load and on low-latency networks, where we + // might start receiving responses before the write operation + // completed and the request is still maked as staged and not + // written. + return reqs_.front()->is_waiting(); } auto get_suggested_buffer_growth() const noexcept diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 796573e9..12abc996 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -10,16 +10,16 @@ namespace boost::redis { connection::connection( executor_type ex, - asio::ssl::context::method method, + asio::ssl::context ctx, std::size_t max_read_size) -: impl_{ex, method, max_read_size} +: impl_{ex, std::move(ctx), max_read_size} { } connection::connection( asio::io_context& ioc, - asio::ssl::context::method method, + asio::ssl::context ctx, std::size_t max_read_size) -: impl_{ioc.get_executor(), method, max_read_size} +: impl_{ioc.get_executor(), std::move(ctx), max_read_size} { } void diff --git a/include/boost/redis/request.hpp b/include/boost/redis/request.hpp index ebc94a22..0e62e0a9 100644 --- a/include/boost/redis/request.hpp +++ b/include/boost/redis/request.hpp @@ -47,31 +47,31 @@ class request { public: /// Request configuration options. struct config { - /** \brief If `true` - * `boost::redis::connection::async_exec` will complete with error if the - * connection is lost. Affects only requests that haven't been - * sent yet. + /** \brief If `true` calls to `connection::async_exec` will + * complete with error if the connection is lost while the + * request hasn't been sent yet. */ bool cancel_on_connection_lost = true; - /** \brief If `true` the request will complete with - * boost::redis::error::not_connected if `async_exec` is called before - * the connection with Redis was established. + /** \brief If `true` `connection::async_exec` will complete with + * `boost::redis::error::not_connected` if the call happens + * before the connection with Redis was established. */ bool cancel_if_not_connected = false; - /** \brief If `false` `boost::redis::connection::async_exec` will not + /** \brief If `false` `connection::async_exec` will not * automatically cancel this request if the connection is lost. * Affects only requests that have been written to the socket - * but remained unresponded when `boost::redis::connection::async_run` - * completed. + * but remained unresponded when + * `boost::redis::connection::async_run` completed. */ bool cancel_if_unresponded = true; - /** \brief If this request has a `HELLO` command and this flag is - * `true`, the `boost::redis::connection` will move it to the front of - * the queue of awaiting requests. This makes it possible to - * send `HELLO` and authenticate before other commands are sent. + /** \brief If this request has a `HELLO` command and this flag + * is `true`, the `boost::redis::connection` will move it to the + * front of the queue of awaiting requests. This makes it + * possible to send `HELLO` and authenticate before other + * commands are sent. */ bool hello_with_priority = true; }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6d615c5b..40edb275 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -23,6 +23,7 @@ macro(make_test TEST_NAME STANDARD) boost_redis_src boost_redis_tests_common boost_redis_project_options + Boost::unit_test_framework ) target_compile_features(${EXE_NAME} PRIVATE cxx_std_${STANDARD}) add_test(${EXE_NAME} ${EXE_NAME}) @@ -70,4 +71,4 @@ add_custom_target( COMMAND ${COVERAGE_HTML_COMMAND} COMMENT "Generating coverage report" VERBATIM -) \ No newline at end of file +) diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index 5e135cac..c3f04134 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -6,6 +6,7 @@ #include #include +#include #define BOOST_TEST_MODULE conn-exec #include #include @@ -17,12 +18,13 @@ // container. namespace net = boost::asio; +using boost::redis::config; using boost::redis::connection; -using boost::redis::request; -using boost::redis::response; using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::operation; +using boost::redis::request; +using boost::redis::response; // Sends three requests where one of them has a hello with a priority // set, which means it should be executed first. @@ -153,3 +155,36 @@ BOOST_AUTO_TEST_CASE(correct_database) BOOST_CHECK_EQUAL(cfg.database_index.value(), index); } +BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170) +{ + // See https://github.com/boostorg/redis/issues/170 + + std::string payload; + payload.resize(1024); + std::fill(std::begin(payload), std::end(payload), 'A'); + + net::io_context ioc; + auto conn = std::make_shared(ioc); + + auto cfg = make_test_config(); + cfg.health_check_interval = std::chrono::seconds(0); + conn->async_run(cfg, {}, net::detached); + + int counter = 0; + int const repeat = 8000; + + for (int i = 0; i < repeat; ++i) { + auto req = std::make_shared(); + req->push("PING", payload); + conn->async_exec(*req, ignore, [req, &counter, conn](auto ec, auto) { + BOOST_TEST(!ec); + if (++counter == repeat) + conn->cancel(); + }); + } + + ioc.run(); + + BOOST_CHECK_EQUAL(counter, repeat); +} + diff --git a/test/test_conn_exec_retry.cpp b/test/test_conn_exec_retry.cpp index c464b342..99f68c39 100644 --- a/test/test_conn_exec_retry.cpp +++ b/test/test_conn_exec_retry.cpp @@ -57,12 +57,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false) auto c2 = [&](auto ec, auto){ std::cout << "c2" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c1 = [&](auto ec, auto){ std::cout << "c1" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c0 = [&](auto ec, auto){ diff --git a/test/test_conn_quit.cpp b/test/test_conn_quit.cpp index fcd580b9..9d5dd2f3 100644 --- a/test/test_conn_quit.cpp +++ b/test/test_conn_quit.cpp @@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits) auto c3 = [](auto ec, auto) { std::clog << "c3: " << ec.message() << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c2 = [&](auto ec, auto) diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index 5b45a127..d4605d5a 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -99,7 +99,7 @@ auto async_test_reconnect_timeout() -> net::awaitable std::cout << "ccc" << std::endl; - BOOST_CHECK_EQUAL(ec1, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec1, boost::asio::error::operation_aborted); } BOOST_AUTO_TEST_CASE(test_reconnect_and_idle) diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index 5e38ef3c..b4779a0f 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -17,6 +17,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::redis::operation; +using boost::redis::ignore; using boost::system::error_code; bool verify_certificate(bool, net::ssl::verify_context&) @@ -25,7 +26,13 @@ bool verify_certificate(bool, net::ssl::verify_context&) return true; } -BOOST_AUTO_TEST_CASE(ping) +auto prepare_callback = [](connection::next_layer_type& stream) +{ + stream.set_verify_mode(net::ssl::verify_peer); + stream.set_verify_callback(verify_certificate); +}; + +config make_tls_config() { config cfg; cfg.use_ssl = true; @@ -34,7 +41,12 @@ BOOST_AUTO_TEST_CASE(ping) cfg.addr.host = "db.occase.de"; cfg.addr.port = "6380"; //cfg.health_check_interval = std::chrono::seconds{0}; + return cfg; +} +BOOST_AUTO_TEST_CASE(ping_internal_ssl_context) +{ + auto const cfg = make_tls_config(); std::string const in = "Kabuf"; request req; @@ -44,8 +56,34 @@ BOOST_AUTO_TEST_CASE(ping) net::io_context ioc; connection conn{ioc}; - conn.next_layer().set_verify_mode(net::ssl::verify_peer); - conn.next_layer().set_verify_callback(verify_certificate); + conn.set_prepare_callback(prepare_callback); + + conn.async_exec(req, resp, [&](auto ec, auto) { + BOOST_TEST(!ec); + conn.cancel(); + }); + + conn.async_run(cfg, {}, [](auto) { }); + + ioc.run(); + + BOOST_CHECK_EQUAL(in, std::get<0>(resp).value()); +} + +BOOST_AUTO_TEST_CASE(ping_custom_ssl_context) +{ + auto const cfg = make_tls_config(); + std::string const in = "Kabuf"; + + request req; + req.push("PING", in); + + response resp; + + net::io_context ioc; + net::ssl::context ctx{boost::asio::ssl::context::tls_client}; + connection conn{ioc, std::move(ctx)}; + conn.set_prepare_callback(prepare_callback); conn.async_exec(req, resp, [&](auto ec, auto) { BOOST_TEST(!ec); @@ -61,12 +99,7 @@ BOOST_AUTO_TEST_CASE(ping) BOOST_AUTO_TEST_CASE(acl_does_not_allow_select) { - config cfg; - cfg.use_ssl = true; - cfg.username = "aedis"; - cfg.password = "aedis"; - cfg.addr.host = "db.occase.de"; - cfg.addr.port = "6380"; + auto cfg = make_tls_config(); cfg.database_index = 22; cfg.reconnect_wait_interval = std::chrono::seconds::zero(); @@ -79,8 +112,7 @@ BOOST_AUTO_TEST_CASE(acl_does_not_allow_select) net::io_context ioc; connection conn{ioc}; - conn.next_layer().set_verify_mode(net::ssl::verify_peer); - conn.next_layer().set_verify_callback(verify_certificate); + conn.set_prepare_callback(prepare_callback); conn.async_exec(req, resp, [&](auto, auto) { // TODO: We should not need this cancel here because @@ -98,3 +130,44 @@ BOOST_AUTO_TEST_CASE(acl_does_not_allow_select) BOOST_TEST(!!ec2); } + +BOOST_AUTO_TEST_CASE(tls_and_reconnection) +{ + net::io_context ioc; + connection conn{ioc}; + + int counter = 0; + auto prepare_callback = [&](auto& stream) + { + ++counter; + }; + + conn.set_prepare_callback(prepare_callback); + + request req; + req.get_config().cancel_on_connection_lost = false; + req.push("PING", "str1"); + req.push("QUIT"); + + conn.async_exec(req, ignore, [&](auto ec, auto) { + std::cout << "First: " << ec.message() << std::endl; + BOOST_TEST(!ec); + conn.async_exec(req, ignore, [&](auto ec, auto) { + std::cout << "Second: " << ec.message() << std::endl; + BOOST_TEST(!ec); + conn.async_exec(req, ignore, [&](auto ec, auto) { + std::cout << "Third: " << ec.message() << std::endl; + BOOST_TEST(!ec); + conn.cancel(); + }); + }); + }); + + auto const cfg = make_tls_config(); + conn.async_run(cfg, {}, [](auto) { }); + + ioc.run(); + + BOOST_CHECK_EQUAL(counter, 3); +} +