From fcdfaefe9dcdc6a6f512623377b3b89d44ab7fc6 Mon Sep 17 00:00:00 2001 From: Tuomo Kriikkula Date: Tue, 19 Dec 2023 17:21:35 +0200 Subject: [PATCH] Refactor multipart handling in C++ --- tests/umb_echo_server.cpp | 102 ++++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/tests/umb_echo_server.cpp b/tests/umb_echo_server.cpp index 8a46501..1a13e2d 100644 --- a/tests/umb_echo_server.cpp +++ b/tests/umb_echo_server.cpp @@ -94,6 +94,8 @@ void print_header(const Header& header) std::cout << std::format("type: {}\n", mt_str); } +// TODO: unify error codes! +// https://www.boost.org/doc/libs/1_76_0/libs/system/doc/html/system.html boost::asio::experimental::coro> read_header( tcp::socket& socket, @@ -333,62 +335,75 @@ handle_multi_part_msg( } const auto bytes_out = msg->to_bytes(); + // Only count the number of payload bytes left here. + auto bytes_left = bytes_out.size() - umb::g_header_size; + auto it_bytes_out = bytes_out.cbegin(); - const auto num_hdr_bytes = static_cast(std::ceil( - static_cast(bytes_out.size() - umb::g_header_size) / - static_cast(umb::g_packet_size)) * umb::g_header_size); - const auto num_parts_out = static_cast(std::ceil( - static_cast(num_hdr_bytes + (bytes_out.size() - umb::g_header_size)) / - static_cast(umb::g_packet_size))); - const auto total_bytes_to_send = bytes_out.size() + num_hdr_bytes - umb::g_header_size; - + // Cache message type values, will be reused for all headers. const auto mt0 = bytes_out[2]; const auto mt1 = bytes_out[3]; + std::array send_buf{}; + umb::byte send_size = 0; + umb::byte send_part = 0; + unsigned num_from_buf = 0; + + // Send full parts while we have enough bytes left for them. + while ((bytes_left + umb::g_header_size) > umb::g_packet_size) + { + send_size = static_cast(umb::g_packet_size); + send_buf[0] = send_size; + send_buf[1] = send_part++; + send_buf[2] = mt0; + send_buf[3] = mt1; + + num_from_buf = send_size - umb::g_header_size; + std::copy_n(it_bytes_out, num_from_buf, send_buf.begin() + umb::g_header_size); + it_bytes_out += send_size; - unsigned bytes_sent = 0; - unsigned offset = umb::g_header_size; + std::cout << std::format( + "sending {} bytes, send_part: {}, bytes_left: {}, num_from_buf: {}\n", + send_size, send_part, bytes_left, num_from_buf); + // TODO: check ec. + co_await async_write( + socket, + boost::asio::buffer(send_buf, send_size), + deferred); - std::cout << std::format( - "bytes_out: {}, num_hdr_bytes: {}, num_parts_out: {}, total_bytes_to_send: {}\n", - bytes_out.size(), num_hdr_bytes, num_parts_out, total_bytes_to_send); + bytes_left -= send_size; + } - // TODO: rethink this. We want a neat way of doing this... Is this neat? - for (size_t part_out = 0; part_out < num_parts_out; ++part_out) + // Send the final non-full part. + if ((bytes_left > 0) && (bytes_left <= (umb::g_packet_size - umb::g_header_size))) { - if (part_out == (num_parts_out - 1)) - { - part_out = umb::g_part_multi_part_end; - } + send_size = static_cast(bytes_left) + umb::g_header_size; + send_part = umb::g_part_multi_part_end; + send_buf[0] = send_size; + send_buf[1] = send_part; + send_buf[2] = mt0; + send_buf[3] = mt1; - const auto num_to_send = std::min(umb::g_packet_size, total_bytes_to_send - bytes_sent); - const auto num_to_take_from_buffer = num_to_send - umb::g_header_size; - std::cout << std::format( - "sending {} bytes, offset: {}, part_out: {}, num_to_take_from_buffer: {}\n", - num_to_send, offset, part_out, num_to_take_from_buffer); - - // Header. - auto it_send_buf = send_buf.begin(); - *it_send_buf++ = num_to_send; - *it_send_buf++ = part_out; - *it_send_buf++ = mt0; - *it_send_buf++ = mt1; - - // Grab the bytes we need for this part from the buffer containing - // all message bytes for the multipart message. - const std::span send_data{bytes_out.cbegin() + offset, - bytes_out.cend()}; - std::copy_n(send_data.cbegin(), num_to_take_from_buffer, it_send_buf); + num_from_buf = send_size - umb::g_header_size; + std::copy_n(it_bytes_out, num_from_buf, send_buf.begin() + umb::g_header_size); + it_bytes_out += send_size; + std::cout << std::format( + "sending {} bytes, send_part: {}, bytes_left: {}, num_from_buf: {}\n", + send_size, send_part, bytes_left, num_from_buf); // TODO: check ec. co_await async_write( socket, - boost::asio::buffer(send_buf, num_to_send), + boost::asio::buffer(send_buf, send_size), deferred); - - bytes_sent += num_to_send; - offset += num_to_send - umb::g_header_size; } + else + { + std::cout << std::format( + "ERROR: invalid number of bytes left for last part: {}!\n", + bytes_left); + } + + // TODO: check bytes left is 0 here. } // TODO: close connection on bad data, error, etc.? @@ -421,8 +436,9 @@ awaitable echo(tcp::socket socket) { const auto err = result.error(); // TODO: error messages. - std::cout << std::format("error: {}\n", static_cast(err)); - continue; // TODO: close conn? + std::cout << std::format("error: {}, closing connection\n", + static_cast(err)); + break; } const auto header = *result;