Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor link capabilities #280

Merged
merged 24 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
627c7d9
feat: add modular test
jean-roland Oct 20, 2023
c0afb69
build: add cmake generator option for build
jean-roland Oct 24, 2023
a1c44f7
style: run clang-format
jean-roland Oct 24, 2023
f8e809c
fix: keep a command with default cmake generator
jean-roland Oct 24, 2023
2d0bce8
build: remove unnecessary target
jean-roland Oct 25, 2023
3b2c23a
fix: remove old modular test
jean-roland Nov 6, 2023
c29a1c2
build: update actions deps
jean-roland Nov 8, 2023
64d90bc
fix: revert codacy workflow changes
jean-roland Nov 8, 2023
23ab70d
feat: switch capabilities from bitfield to enum
jean-roland Nov 14, 2023
b27a6c2
feat: change unsafe wbuf signatures
jean-roland Nov 14, 2023
da4aaec
feat: update link capabilities init
jean-roland Nov 14, 2023
7ab167c
feat: update capabilities in unicast
jean-roland Nov 14, 2023
a53f7f2
feat: update capabilities in multicast
jean-roland Nov 14, 2023
d86a1bb
feat: update capabilities in common transport
jean-roland Nov 14, 2023
0b247b0
feat: update capabilities in link
jean-roland Nov 14, 2023
18645d9
doc: add missing capabilities enum value
jean-roland Nov 14, 2023
cf76ee3
fix: add needed blocks to switch
jean-roland Nov 14, 2023
b680332
fix: typo
jean-roland Nov 14, 2023
d73b3d1
fix: define all enum values
jean-roland Nov 16, 2023
c525a18
feat: switch to register-like structure
jean-roland Nov 17, 2023
b29b95f
feat: update link files
jean-roland Nov 17, 2023
6e7a3c9
feat: update transport files
jean-roland Nov 17, 2023
16d5e3f
fix: run clang-format
jean-roland Nov 17, 2023
52cdb36
fix: use same time in the struct to reduce size
jean-roland Nov 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions include/zenoh-pico/link/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,44 @@
#include "zenoh-pico/utils/result.h"

/**
* Link capabilities values, defined as a bitmask.
* Link transport capability enum.
*
* Enumerators:
* Z_LINK_CAPABILITY_NONE: Bitmask to define that link has no capabilities.
* Z_LINK_CAPABILITY_RELIABLE: Bitmask to define and check if link is reliable.
* Z_LINK_CAPABILITY_STREAMED: Bitmask to define and check if link is streamed.
* Z_LINK_CAPABILITY_MULTICAST: Bitmask to define and check if link is multicast.
* Z_LINK_CAP_TRANSPORT_UNICAST: Link has unicast capabilities.
* Z_LINK_CAP_TRANSPORT_MULTICAST: Link has multicast capabilities.
*/
typedef enum {
Z_LINK_CAPABILITY_NONE = 0x00, // 0
Z_LINK_CAPABILITY_RELIABLE = 0x01, // 1 << 0
Z_LINK_CAPABILITY_STREAMED = 0x02, // 1 << 1
Z_LINK_CAPABILITY_MULTICAST = 0x04 // 1 << 2
} _z_link_capabilities_t;
Z_LINK_CAP_TRANSPORT_UNICAST = 0,
Z_LINK_CAP_TRANSPORT_MULTICAST = 1,
} _z_link_cap_transport_t;

#define _Z_LINK_IS_RELIABLE(X) ((X & Z_LINK_CAPABILITY_RELIABLE) == Z_LINK_CAPABILITY_RELIABLE)
#define _Z_LINK_IS_STREAMED(X) ((X & Z_LINK_CAPABILITY_STREAMED) == Z_LINK_CAPABILITY_STREAMED)
#define _Z_LINK_IS_MULTICAST(X) ((X & Z_LINK_CAPABILITY_MULTICAST) == Z_LINK_CAPABILITY_MULTICAST)
/**
* Link flow capability enum.
*
* Enumerators:
* Z_LINK_CAP_FLOW_STREAM: Link use datagrams.
* Z_LINK_CAP_FLOW_DATAGRAM: Link use byte stream.
*/
typedef enum {
Z_LINK_CAP_FLOW_DATAGRAM = 0,
Z_LINK_CAP_FLOW_STREAM = 1,
} _z_link_cap_flow_t;

/**
* Link capabilities, stored as a register-like object.
*
* Fields:
* transport: 2 bits, see _z_link_cap_transport_t enum.
* flow: 1 bit, see _z_link_cap_flow_t enum.
* reliable: 1 bit, 1 if the link is reliable (network definition)
* reserved: 4 bits, reserved for futur use
*/
typedef struct _z_link_capabilities_t {
uint8_t _transport : 2;
uint8_t _flow : 1;
uint8_t _is_reliable : 1;
uint8_t _reserved : 4;
} _z_link_capabilities_t;

struct _z_link_t; // Forward declaration to be used in _z_f_link_*

Expand Down Expand Up @@ -104,7 +124,7 @@ typedef struct _z_link_t {
_z_f_link_free _free_f;

uint16_t _mtu;
uint8_t _capabilities;
_z_link_capabilities_t _cap;
} _z_link_t;

void _z_link_clear(_z_link_t *zl);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/transport/transport.h"

void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed);
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
Expand Down
12 changes: 11 additions & 1 deletion src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,17 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len

int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) {
int8_t ret = _Z_RES_OK;
_Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities);
_Bool link_is_streamed = false;

switch (link->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
link_is_streamed = true;
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
link_is_streamed = false;
break;
}
for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) {
_z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i));
size_t n = bs.len;
Expand Down
5 changes: 4 additions & 1 deletion src/link/multicast/bt.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ uint16_t _z_get_link_mtu_bt(void) { return SPP_MAXIMUM_PAYLOAD; }
int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_STREAMED | Z_LINK_CAPABILITY_MULTICAST;
jean-roland marked this conversation as resolved.
Show resolved Hide resolved
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_bt();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/multicast/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ uint16_t _z_get_link_mtu_udp_multicast(void) {
int8_t _z_new_link_udp_multicast(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_MULTICAST;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_udp_multicast();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/serial.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ uint16_t _z_get_link_mtu_serial(void) { return _Z_SERIAL_MTU_SIZE; }
int8_t _z_new_link_serial(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_NONE;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_serial();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ uint16_t _z_get_link_mtu_tcp(void) {
int8_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE | Z_LINK_CAPABILITY_STREAMED;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM;
zl->_cap._is_reliable = true;

zl->_mtu = _z_get_link_mtu_tcp();

zl->_endpoint = *endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ uint16_t _z_get_link_mtu_udp_unicast(void) {
int8_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_NONE;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_udp_unicast();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ uint16_t _z_get_link_mtu_ws(void) {
int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = true;

zl->_mtu = _z_get_link_mtu_ws();

zl->_endpoint = *endpoint;
Expand Down
47 changes: 26 additions & 21 deletions src/transport/common/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,37 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) {
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);
_z_zbuf_reset(&zbf);

if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
// Read the message length
if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) {
size_t len = 0;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8));
}
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
// Read the message length
if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) {
size_t len = 0;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8));
}

size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf);
if (writable >= len) {
// Read enough bytes to decode the message
if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf);
if (writable >= len) {
// Read enough bytes to decode the message
if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
} else {
ret = _Z_ERR_TRANSPORT_NO_SPACE;
}
} else {
ret = _Z_ERR_TRANSPORT_NO_SPACE;
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
} else {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
} else {
if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
break;
default:
ret = _Z_ERR_GENERIC;
break;
}

if (ret == _Z_RES_OK) {
_z_transport_message_t l_t_msg;
ret = _z_transport_message_decode(&l_t_msg, &zbf);
Expand Down
75 changes: 52 additions & 23 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@
* Make sure that the following mutexes are locked before calling this function:
* - ztu->mutex_tx
*/
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) {
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) {
_z_wbuf_reset(buf);

if (is_streamed == true) {
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, 0, i);
}
_z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE);
switch (link_flow_capability) {
// Stream capable links
case Z_LINK_CAP_FLOW_STREAM:
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, 0, i);
}
_z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE);
break;
// Datagram capable links
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
break;
}
}

Expand All @@ -43,12 +50,20 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) {
* Make sure that the following mutexes are locked before calling this function:
* - ztu->mutex_tx
*/
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed) {
if (is_streamed == true) {
size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) {
switch (link_flow_capability) {
// Stream capable links
case Z_LINK_CAP_FLOW_STREAM: {
size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
}
break;
}
// Datagram capable links
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
break;
}
}

Expand All @@ -74,24 +89,38 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m
// Create and prepare the buffer to serialize the message on
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);
if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, 0, i);
}
_z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE);
}

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, 0, i);
}
_z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE);
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Encode the session message
ret = _z_transport_message_encode(&wbf, t_msg);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM: {
// Write the message length in the reserved space if needed
size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
}
break;
}
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}

// Send the wbuf on the socket
ret = _z_link_send_wbuf(zl, &wbf);
}
Expand Down
Loading
Loading