Skip to content

Commit

Permalink
Added support for Participant discovery and additional QoS policies (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gmartin82 authored Aug 10, 2023
1 parent 499802a commit 99208ae
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 187 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ jobs:
os: ubuntu-20.04,
use-cross: true,
}
- { target: x86_64-pc-windows-msvc, arch: win64, os: windows-2019 }
##
## NOTE: cannot build for Windows since cyclocut was not ported to Windows.
## NOTE: cannon build for Windows GNU as not supported by cyclors
##
# - { target: x86_64-pc-windows-msvc, arch: win64, os: windows-2019 }
# - { target: x86_64-pc-windows-gnu , arch: win64 , os: windows-2019 }
# - { target: x86_64-pc-windows-gnu, arch: win64 , os: windows-2019 }
steps:
- name: Checkout source code
uses: actions/checkout@v2
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Install ACL
if: startsWith(matrix.os,'ubuntu')
run: sudo apt-get -y install acl-dev

- name: Install Rust toolchain
uses: actions-rs/toolchain@v1
with:
Expand Down
28 changes: 16 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async-trait = "0.1.66"
bincode = "1.3.3"
cdr = "0.2.4"
clap = "3.2.23"
cyclors = { git = "https://github.com/kydos/cyclors", branch = "master" }
cyclors = { git = "https://github.com/kydos/cyclors", branch = "master"}
derivative = "2.2.0"
env_logger = "0.10.0"
flume = "0.10.14"
Expand Down
204 changes: 133 additions & 71 deletions zenoh-plugin-dds/src/dds_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task;
use cyclors::qos::{HistoryKind, Qos};
use cyclors::qos::{History, HistoryKind, Qos};
use cyclors::*;
use flume::Sender;
use log::{debug, error, warn};
use serde::{Deserialize, Serialize, Serializer};
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::mem::MaybeUninit;
use std::os::raw;
use std::slice;
Expand Down Expand Up @@ -50,12 +51,37 @@ pub(crate) struct DdsEntity {
pub(crate) routes: HashMap<String, RouteStatus>, // map of routes statuses indexed by partition ("*" only if no partition)
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DdsParticipant {
pub(crate) key: String,
pub(crate) qos: Qos,
}

#[derive(Debug)]
pub(crate) enum DiscoveryEvent {
DiscoveredPublication { entity: DdsEntity },
UndiscoveredPublication { key: String },
DiscoveredSubscription { entity: DdsEntity },
UndiscoveredSubscription { key: String },
DiscoveredParticipant { entity: DdsParticipant },
UndiscoveredParticipant { key: String },
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum DiscoveryType {
Participant,
Publication,
Subscription,
}

impl fmt::Display for DiscoveryType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DiscoveryType::Participant => write!(f, "participant"),
DiscoveryType::Publication => write!(f, "publication"),
DiscoveryType::Subscription => write!(f, "subscription"),
}
}
}

pub(crate) struct DDSRawSample {
Expand Down Expand Up @@ -94,8 +120,9 @@ impl Drop for DDSRawSample {
}

unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let btx = Box::from_raw(arg as *mut (bool, Sender<DiscoveryEvent>));
let pub_discovery: bool = btx.0;
let btx = Box::from_raw(arg as *mut (DiscoveryType, Sender<DiscoveryEvent>));
let discovery_type = btx.0;
let sender = &btx.1;
let dp = dds_get_participant(dr);
let mut dpih: dds_instance_handle_t = 0;
let _ = dds_get_instance_handle(dp, &mut dpih);
Expand All @@ -116,80 +143,103 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let si = si.assume_init();

for i in 0..n {
let sample = samples[i as usize] as *mut dds_builtintopic_endpoint_t;
if (*sample).participant_instance_handle == dpih {
// Ignore discovery of entities created by our own participant
continue;
}
let is_alive = si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE;
let key = hex::encode((*sample).key.v);

if is_alive {
let topic_name = match CStr::from_ptr((*sample).topic_name).to_str() {
Ok(s) => s,
Err(e) => {
warn!("Discovery of an invalid topic name: {}", e);
match discovery_type {
DiscoveryType::Publication | DiscoveryType::Subscription => {
let sample = samples[i as usize] as *mut dds_builtintopic_endpoint_t;
if (*sample).participant_instance_handle == dpih {
// Ignore discovery of entities created by our own participant
continue;
}
};
if topic_name.starts_with("DCPS") {
debug!(
"Ignoring discovery of {} ({} is a builtin topic)",
key, topic_name
);
continue;
let is_alive = si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE;
let key = hex::encode((*sample).key.v);

if is_alive {
let topic_name = match CStr::from_ptr((*sample).topic_name).to_str() {
Ok(s) => s,
Err(e) => {
warn!("Discovery of an invalid topic name: {}", e);
continue;
}
};
if topic_name.starts_with("DCPS") {
debug!(
"Ignoring discovery of {} ({} is a builtin topic)",
key, topic_name
);
continue;
}

let type_name = match CStr::from_ptr((*sample).type_name).to_str() {
Ok(s) => s,
Err(e) => {
warn!("Discovery of an invalid topic type: {}", e);
continue;
}
};
let participant_key = hex::encode((*sample).participant_key.v);
let keyless = (*sample).key.v[15] == 3 || (*sample).key.v[15] == 4;

debug!(
"Discovered DDS {} {} from Participant {} on {} with type {} (keyless: {})",
discovery_type, key, participant_key, topic_name, type_name, keyless
);

// send a DiscoveryEvent
let entity = DdsEntity {
key: key.clone(),
participant_key: participant_key.clone(),
topic_name: String::from(topic_name),
type_name: String::from(type_name),
keyless,
qos: Qos::from_qos_native((*sample).qos),
routes: HashMap::<String, RouteStatus>::new(),
};

if let DiscoveryType::Publication = discovery_type {
send_discovery_event(
sender,
DiscoveryEvent::DiscoveredPublication { entity },
);
} else {
send_discovery_event(
sender,
DiscoveryEvent::DiscoveredSubscription { entity },
);
}
} else if let DiscoveryType::Publication = discovery_type {
send_discovery_event(sender, DiscoveryEvent::UndiscoveredPublication { key });
} else {
send_discovery_event(sender, DiscoveryEvent::UndiscoveredSubscription { key });
}
}
DiscoveryType::Participant => {
let sample = samples[i as usize] as *mut dds_builtintopic_participant_t;
let is_alive = si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE;
let key = hex::encode((*sample).key.v);

let type_name = match CStr::from_ptr((*sample).type_name).to_str() {
Ok(s) => s,
Err(e) => {
warn!("Discovery of an invalid topic type: {}", e);
let mut guid = dds_builtintopic_guid { v: [0; 16] };
let _ = dds_get_guid(dp, &mut guid);
let guid = hex::encode(guid.v);

if key == guid {
// Ignore discovery of entities created by our own participant
continue;
}
};
let participant_key = hex::encode((*sample).participant_key.v);
let keyless = (*sample).key.v[15] == 3 || (*sample).key.v[15] == 4;

debug!(
"Discovered DDS {} {} from Participant {} on {} with type {} (keyless: {})",
if pub_discovery {
"publication"
} else {
"subscription"
},
key,
participant_key,
topic_name,
type_name,
keyless
);

let qos = if pub_discovery {
Qos::from_writer_qos_native((*sample).qos)
} else {
Qos::from_reader_qos_native((*sample).qos)
};
if is_alive {
debug!("Discovered DDS Participant {})", key,);

// send a DiscoveryEvent
let entity = DdsEntity {
key: key.clone(),
participant_key: participant_key.clone(),
topic_name: String::from(topic_name),
type_name: String::from(type_name),
keyless,
qos,
routes: HashMap::<String, RouteStatus>::new(),
};
// Send a DiscoveryEvent
let entity = DdsParticipant {
key: key.clone(),
qos: Qos::from_qos_native((*sample).qos),
};

if pub_discovery {
send_discovery_event(&btx.1, DiscoveryEvent::DiscoveredPublication { entity });
} else {
send_discovery_event(&btx.1, DiscoveryEvent::DiscoveredSubscription { entity });
send_discovery_event(sender, DiscoveryEvent::DiscoveredParticipant { entity });
} else {
send_discovery_event(sender, DiscoveryEvent::UndiscoveredParticipant { key });
}
}
} else if pub_discovery {
send_discovery_event(&btx.1, DiscoveryEvent::UndiscoveredPublication { key });
} else {
send_discovery_event(&btx.1, DiscoveryEvent::UndiscoveredSubscription { key });
}
}
dds_return_loan(
Expand All @@ -211,8 +261,9 @@ fn send_discovery_event(sender: &Sender<DiscoveryEvent>, event: DiscoveryEvent)

pub(crate) fn run_discovery(dp: dds_entity_t, tx: Sender<DiscoveryEvent>) {
unsafe {
let ptx = Box::new((true, tx.clone()));
let stx = Box::new((false, tx));
let ptx = Box::new((DiscoveryType::Publication, tx.clone()));
let stx = Box::new((DiscoveryType::Subscription, tx.clone()));
let dptx = Box::new((DiscoveryType::Participant, tx));
let sub_listener = dds_create_listener(Box::into_raw(ptx) as *mut std::os::raw::c_void);
dds_lset_data_available(sub_listener, Some(on_data));

Expand All @@ -231,6 +282,15 @@ pub(crate) fn run_discovery(dp: dds_entity_t, tx: Sender<DiscoveryEvent>) {
std::ptr::null(),
sub_listener,
);

let sub_listener = dds_create_listener(Box::into_raw(dptx) as *mut std::os::raw::c_void);
dds_lset_data_available(sub_listener, Some(on_data));
let _dpr = dds_create_reader(
dp,
DDS_BUILTIN_TOPIC_DCPSPARTICIPANT,
std::ptr::null(),
sub_listener,
);
}
}

Expand Down Expand Up @@ -322,8 +382,10 @@ pub fn create_forwarding_dds_reader(
}
Some(period) => {
// Use a periodic task that takes data to route from a Reader with KEEP_LAST 1
qos.history.kind = HistoryKind::KEEP_LAST;
qos.history.depth = 1;
qos.history = Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 1,
});
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(dp, t, qos_native, std::ptr::null());
let z_key = z_key.into_owned();
Expand Down
Loading

0 comments on commit 99208ae

Please sign in to comment.