From 2b87df4c77aea3323930a2dcdfbc315d7ca85677 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Fri, 18 Aug 2023 18:50:55 +0200 Subject: [PATCH] implement fragmentation for multicast --- src/protocol/codec/transport.c | 1 + src/transport/multicast/link/rx.c | 3 ++- src/transport/multicast/link/task/read.c | 1 + src/transport/multicast/link/tx.c | 30 +++++++++++++++++++++++- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 408599c61..4c6fa2f54 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -401,6 +401,7 @@ int8_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t head __auto_type bytes = _z_bytes_wrap((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf)); _z_bytes_copy(&msg->_payload, &bytes); + zbf->_ios._r_pos = zbf->_ios._w_pos; return ret; } diff --git a/src/transport/multicast/link/rx.c b/src/transport/multicast/link/rx.c index 28dab94e0..9211b2e2e 100644 --- a/src/transport/multicast/link/rx.c +++ b/src/transport/multicast/link/rx.c @@ -21,6 +21,7 @@ #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/definitions/transport.h" +#include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -90,7 +91,7 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me } while (false); // The 1-iteration loop to use continue to break the entire loop on error if (ret == _Z_RES_OK) { - _Z_DEBUG(">> \t transport_message_decode\n"); + _Z_DEBUG(">> \t transport_message_decode: %ld\n", _z_zbuf_len(&ztm->_zbuf)); ret = _z_transport_message_decode(t_msg, &ztm->_zbuf); } diff --git a/src/transport/multicast/link/task/read.c b/src/transport/multicast/link/task/read.c index 45753857a..2c4278426 100644 --- a/src/transport/multicast/link/task/read.c +++ b/src/transport/multicast/link/task/read.c @@ -18,6 +18,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/transport/link/rx.h" #include "zenoh-pico/utils/logging.h" diff --git a/src/transport/multicast/link/tx.c b/src/transport/multicast/link/tx.c index 234929a20..67e4e35bd 100644 --- a/src/transport/multicast/link/tx.c +++ b/src/transport/multicast/link/tx.c @@ -113,7 +113,35 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ztm->_transmitted = true; // Mark the session that we have transmitted data } } else { - // TODO[protocol]: Fragmentation goes here + // The message does not fit in the current batch, let's fragment it + // Create an expandable wbuf for fragmentation + _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true); + + ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf + if (ret == _Z_RES_OK) { + _Bool is_first = true; // Fragment and send the message + while (_z_wbuf_len(&fbf) > 0) { + if (is_first == false) { // Get the fragment sequence number + sn = __unsafe_z_multicast_get_sn(ztm, reliability); + } + is_first = false; + + // Clear the buffer for serialization + __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + + // Serialize one fragment + ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); + if (ret == _Z_RES_OK) { + // Write the message length in the reserved space if needed + __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + + ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket + if (ret == _Z_RES_OK) { + ztm->_transmitted = true; // Mark the session that we have transmitted data + } + } + } + } } }