Skip to content

Commit

Permalink
Merge branch 'rolling' into yadu/events
Browse files Browse the repository at this point in the history
  • Loading branch information
Yadunund committed Feb 19, 2024
2 parents cc195f8 + 08f5c30 commit ab2877d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 46 deletions.
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/// peers, or client can use to establish a zenoh session.
listen: {
endpoints: [
"tcp/localhost:7447"
"tcp/[::]:7447"
],
},
/// Configure the scouting mechanisms and their behaviours
Expand Down
22 changes: 8 additions & 14 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ static void graph_sub_data_handler(
const z_sample_t * sample,
void * data)
{
(void)data;
static_cast<void>(data);

z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
auto free_keystr = rcpputils::make_scope_exit(
[&keystr]() {
z_drop(z_move(keystr));
});

// Get the context impl from data.
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(
Expand All @@ -59,21 +64,16 @@ static void graph_sub_data_handler(
return;
}

// TODO(Yadunund): Avoid this copy.
std::string keyexpr_str(keystr._cstr);

switch (sample->kind) {
case z_sample_kind_t::Z_SAMPLE_KIND_PUT:
context_impl->graph_cache.parse_put(keyexpr_str);
context_impl->graph_cache.parse_put(keystr._cstr);
break;
case z_sample_kind_t::Z_SAMPLE_KIND_DELETE:
context_impl->graph_cache.parse_del(keyexpr_str);
context_impl->graph_cache.parse_del(keystr._cstr);
break;
default:
break;
}

z_drop(z_move(keystr));
}

//==============================================================================
Expand Down Expand Up @@ -269,12 +269,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
zc_liveliness_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()),
z_move(channel.send), NULL);
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// z_get_options_t opts = z_get_options_default();
// z_get(
// z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
// &opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply);
call_success = z_call(channel.recv, &reply))
Expand Down
110 changes: 79 additions & 31 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ namespace
// the old string into it. If this becomes a performance problem, we could consider
// modifying the topic_name in place. But this means we need to be much more
// careful about who owns the string.
z_owned_keyexpr_t ros_topic_name_to_zenoh_key(
const char * const topic_name, size_t domain_id, rcutils_allocator_t * allocator)
z_owned_keyexpr_t ros_topic_name_to_zenoh_key(const char * const topic_name, size_t domain_id)
{
std::string d = std::to_string(domain_id);

Expand All @@ -98,17 +97,9 @@ z_owned_keyexpr_t ros_topic_name_to_zenoh_key(
}
}

char * stripped_topic_name = rcutils_strndup(
&topic_name[start_offset], end_offset - start_offset, *allocator);
if (stripped_topic_name == nullptr) {
return z_keyexpr_null();
}

z_owned_keyexpr_t ret = z_keyexpr_join(z_keyexpr(d.c_str()), z_keyexpr(stripped_topic_name));

allocator->deallocate(stripped_topic_name, allocator->state);

return ret;
return z_keyexpr_join(
z_keyexpr(d.c_str()),
zc_keyexpr_from_slice(&topic_name[start_offset], end_offset - start_offset));
}

//==============================================================================
Expand Down Expand Up @@ -556,7 +547,7 @@ rmw_create_publisher(
});

z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key(
topic_name, node->context->actual_domain_id, allocator);
topic_name, node->context->actual_domain_id);
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
[&keyexpr]() {
z_keyexpr_drop(z_move(keyexpr));
Expand Down Expand Up @@ -1277,7 +1268,7 @@ rmw_create_subscription(

z_owned_closure_sample_t callback = z_closure(sub_data_handler, nullptr, sub_data);
z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key(
topic_name, node->context->actual_domain_id, allocator);
topic_name, node->context->actual_domain_id);
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
[&keyexpr]() {
z_keyexpr_drop(z_move(keyexpr));
Expand Down Expand Up @@ -1643,6 +1634,69 @@ rmw_take_sequence(
return RMW_RET_UNSUPPORTED;
}

//==============================================================================
static rmw_ret_t __rmw_take_serialized(
const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message,
bool * taken,
rmw_message_info_t * message_info)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(serialized_message, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

*taken = false;

auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

if (sub_data->context->impl->is_shutdown) {
return RMW_RET_OK;
}

// RETRIEVE SERIALIZED MESSAGE ===============================================

std::unique_ptr<saved_msg_data> msg_data = sub_data->pop_next_message();
if (msg_data == nullptr) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return RMW_RET_OK;
}

if (serialized_message->buffer_capacity < msg_data->payload.payload.len) {
rmw_ret_t ret =
rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len);
if (ret != RMW_RET_OK) {
return ret; // Error message already set
}
}
serialized_message->buffer_length = msg_data->payload.payload.len;
memcpy(
serialized_message->buffer, msg_data->payload.payload.start,
msg_data->payload.payload.len);

*taken = true;

// TODO(clalancette): fill in source_timestamp
message_info->source_timestamp = 0;
message_info->received_timestamp = msg_data->recv_timestamp;
// TODO(clalancette): fill in publication_sequence_number
message_info->publication_sequence_number = 0;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, 16);
message_info->from_intra_process = false;

return RMW_RET_OK;
}

//==============================================================================
/// Take an incoming ROS message as a byte stream.
rmw_ret_t
Expand All @@ -1652,11 +1706,11 @@ rmw_take_serialized_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(serialized_message);
static_cast<void>(taken);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

rmw_message_info_t dummy_msg_info;

return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info);
}

//==============================================================================
Expand All @@ -1669,12 +1723,9 @@ rmw_take_serialized_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(serialized_message);
static_cast<void>(taken);
static_cast<void>(message_info);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

return __rmw_take_serialized(subscription, serialized_message, taken, message_info);
}

//==============================================================================
Expand Down Expand Up @@ -1911,7 +1962,7 @@ rmw_create_client(
});

client_data->keyexpr = ros_topic_name_to_zenoh_key(
rmw_client->service_name, node->context->actual_domain_id, allocator);
rmw_client->service_name, node->context->actual_domain_id);
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[client_data]() {
z_keyexpr_drop(z_move(client_data->keyexpr));
Expand Down Expand Up @@ -2064,13 +2115,10 @@ static z_owned_bytes_map_t create_map_and_set_sequence_num(

z_bytes_t guid_bytes;
guid_bytes.len = RMW_GID_STORAGE_SIZE;
guid_bytes.start = static_cast<uint8_t *>(malloc(RMW_GID_STORAGE_SIZE));
memcpy(static_cast<void *>(const_cast<uint8_t *>(guid_bytes.start)), guid, RMW_GID_STORAGE_SIZE);
guid_bytes.start = guid;

z_bytes_map_insert_by_copy(&map, z_bytes_new("client_guid"), guid_bytes);

free(const_cast<uint8_t *>(guid_bytes.start));

free_attachment_map.cancel();

return map;
Expand Down Expand Up @@ -2550,7 +2598,7 @@ rmw_create_service(
allocator->deallocate(const_cast<char *>(rmw_service->service_name), allocator->state);
});
service_data->keyexpr = ros_topic_name_to_zenoh_key(
rmw_service->service_name, node->context->actual_domain_id, allocator);
rmw_service->service_name, node->context->actual_domain_id);
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[service_data]() {
if (service_data) {
Expand Down Expand Up @@ -2673,7 +2721,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)

// CLEANUP ================================================================
z_drop(z_move(service_data->keyexpr));
z_drop(z_move(service_data->qable));
z_undeclare_queryable(z_move(service_data->qable));
z_drop(z_move(service_data->token));

RMW_TRY_DESTRUCTOR(
Expand Down

0 comments on commit ab2877d

Please sign in to comment.