diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index bb083a7e3..f9ca414ea 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -225,6 +225,7 @@ // "messages": [ // "put", "delete", "declare_subscriber", // "query", "reply", "declare_queryable", + // "liveliness_token", "liveliness_query", "declare_liveliness_subscriber", // ], // "flows":["egress","ingress"], // "permission": "allow", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index b3df5e3cb..d0600eb4d 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -170,6 +170,9 @@ pub enum AclMessage { Query, DeclareQueryable, Reply, + LivelinessToken, + DeclareLivelinessSubscriber, + LivelinessQuery, } #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4771bce62..0c01bffdb 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1252,7 +1252,9 @@ impl SessionInner { primitives.send_interest(Interest { id: pub_state.remote_id, mode: InterestMode::Final, - options: InterestOptions::empty(), + // Note: InterestMode::Final options are undefined in the current protocol specification, + // they are initialized here for internal use by local egress interceptors. + options: InterestOptions::SUBSCRIBERS, wire_expr: None, ext_qos: declare::ext::QoSType::DEFAULT, ext_tstamp: None, @@ -1458,7 +1460,9 @@ impl SessionInner { primitives.send_interest(Interest { id: sub_state.id, mode: InterestMode::Final, - options: InterestOptions::empty(), + // Note: InterestMode::Final options are undefined in the current protocol specification, + // they are initialized here for internal use by local egress interceptors. + options: InterestOptions::TOKENS, wire_expr: None, ext_qos: declare::ext::QoSType::DEFAULT, ext_tstamp: None, diff --git a/zenoh/src/net/routing/dispatcher/token.rs b/zenoh/src/net/routing/dispatcher/token.rs index a34e35af6..397f2304f 100644 --- a/zenoh/src/net/routing/dispatcher/token.rs +++ b/zenoh/src/net/routing/dispatcher/token.rs @@ -156,6 +156,7 @@ pub(crate) fn undeclare_token( { tracing::debug!("{} Undeclare token {} ({})", face, id, res.expr()); } else { - tracing::error!("{} Undeclare unknown token {}", face, id); + // NOTE: This is expected behavior if liveliness tokens are denied with ingress ACL interceptor. + tracing::debug!("{} Undeclare unknown token {}", face, id); } } diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index c534c65ba..0f900d595 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -212,7 +212,9 @@ impl HatInterestTrait for HatCode { Interest { id, mode: InterestMode::Final, - options: InterestOptions::empty(), + // Note: InterestMode::Final options are undefined in the current protocol specification, + // they are initialized here for internal use by local egress interceptors. + options: interest.options, wire_expr: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index b0d7c9842..abe1108f7 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -238,7 +238,9 @@ impl HatInterestTrait for HatCode { Interest { id, mode: InterestMode::Final, - options: InterestOptions::empty(), + // Note: InterestMode::Final options are undefined in the current protocol specification, + // they are initialized here for internal use by local egress interceptors. + options: interest.options, wire_expr: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index 2142d3e0c..170771d54 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -26,7 +26,10 @@ use zenoh_config::{ }; use zenoh_protocol::{ core::ZenohIdProto, - network::{Declare, DeclareBody, NetworkBody, NetworkMessage, Push, Request, Response}, + network::{ + interest::InterestMode, Declare, DeclareBody, Interest, NetworkBody, NetworkMessage, Push, + Request, Response, + }, zenoh::{PushBody, RequestBody}, }; use zenoh_result::ZResult; @@ -334,6 +337,75 @@ impl InterceptorTrait for IngressAclEnforcer { } } } + NetworkBody::Declare(Declare { + body: DeclareBody::DeclareToken(_), + .. + }) => { + if self.action( + AclMessage::LivelinessToken, + "Liveliness Token (ingress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + + NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareToken(_), + .. + }) => { + // Undeclaration filtering diverges between ingress and egress: + // Undeclarations in ingress are only filtered if the ext_wire_expr is set. + // If it's not set, we let the undeclaration pass, it will be rejected by the routing logic + // if its associated declaration was denied. + if let Some(key_expr) = key_expr { + if !key_expr.is_empty() + && self.action( + AclMessage::LivelinessToken, + "Undeclare Liveliness Token (ingress)", + key_expr, + ) == Permission::Deny + { + return None; + } + } + } + NetworkBody::Interest(Interest { + mode: InterestMode::Current, + options, + .. + }) if options.tokens() => { + if self.action( + AclMessage::LivelinessQuery, + "Liveliness Query (ingress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + NetworkBody::Interest(Interest { + mode: InterestMode::Future | InterestMode::CurrentFuture, + options, + .. + }) if options.tokens() => { + if self.action( + AclMessage::DeclareLivelinessSubscriber, + "Declare Liveliness Subscriber (ingress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + NetworkBody::Interest(Interest { + mode: InterestMode::Final, + .. + }) => { + // InterestMode::Final filtering diverges between ingress and egress: + // InterestMode::Final ingress is always allowed, it will be rejected by routing logic if its associated Interest was denied + } // Unfiltered Declare messages NetworkBody::Declare(Declare { body: DeclareBody::DeclareKeyExpr(_), @@ -342,19 +414,11 @@ impl InterceptorTrait for IngressAclEnforcer { | NetworkBody::Declare(Declare { body: DeclareBody::DeclareFinal(_), .. - }) - | NetworkBody::Declare(Declare { - body: DeclareBody::DeclareToken(_), - .. }) => {} // Unfiltered Undeclare messages NetworkBody::Declare(Declare { body: DeclareBody::UndeclareKeyExpr(_), .. - }) - | NetworkBody::Declare(Declare { - body: DeclareBody::UndeclareToken(_), - .. }) => {} // Unfiltered remaining message types NetworkBody::Interest(_) | NetworkBody::OAM(_) | NetworkBody::ResponseFinal(_) => {} @@ -470,6 +534,80 @@ impl InterceptorTrait for EgressAclEnforcer { return None; } } + NetworkBody::Declare(Declare { + body: DeclareBody::DeclareToken(_), + .. + }) => { + if self.action( + AclMessage::LivelinessToken, + "Liveliness Token (egress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareToken(_), + .. + }) => { + // Undeclaration filtering diverges between ingress and egress: + // in egress the keyexpr has to be provided in the RoutingContext + if self.action( + AclMessage::LivelinessToken, + "Undeclare Liveliness Token (egress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + NetworkBody::Interest(Interest { + mode: InterestMode::Current, + options, + .. + }) if options.tokens() => { + if self.action( + AclMessage::LivelinessQuery, + "Liveliness Query (egress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + NetworkBody::Interest(Interest { + mode: InterestMode::Future | InterestMode::CurrentFuture, + options, + .. + }) if options.tokens() => { + if self.action( + AclMessage::DeclareLivelinessSubscriber, + "Declare Liveliness Subscriber (egress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } + NetworkBody::Interest(Interest { + mode: InterestMode::Final, + options, + .. + }) if options.tokens() => { + // Note: options are set for InterestMode::Final for internal use only by egress interceptors. + + // InterestMode::Final filtering diverges between ingress and egress: + // in egress the keyexpr has to be provided in the RoutingContext + if self.action( + AclMessage::DeclareLivelinessSubscriber, + "Undeclare Liveliness Subscriber (egress)", + key_expr?, + ) == Permission::Deny + { + return None; + } + } // Unfiltered Declare messages NetworkBody::Declare(Declare { body: DeclareBody::DeclareKeyExpr(_), @@ -478,19 +616,11 @@ impl InterceptorTrait for EgressAclEnforcer { | NetworkBody::Declare(Declare { body: DeclareBody::DeclareFinal(_), .. - }) - | NetworkBody::Declare(Declare { - body: DeclareBody::DeclareToken(_), - .. }) => {} // Unfiltered Undeclare messages NetworkBody::Declare(Declare { body: DeclareBody::UndeclareKeyExpr(_), .. - }) - | NetworkBody::Declare(Declare { - body: DeclareBody::UndeclareToken(_), - .. }) => {} // Unfiltered remaining message types NetworkBody::Interest(_) | NetworkBody::OAM(_) | NetworkBody::ResponseFinal(_) => {} diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index ae70f11a4..3f09d90fb 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -188,6 +188,9 @@ struct ActionPolicy { declare_subscriber: PermissionPolicy, declare_queryable: PermissionPolicy, reply: PermissionPolicy, + liveliness_token: PermissionPolicy, + declare_liveliness_sub: PermissionPolicy, + liveliness_query: PermissionPolicy, } impl ActionPolicy { @@ -199,6 +202,9 @@ impl ActionPolicy { AclMessage::Delete => &self.delete, AclMessage::DeclareSubscriber => &self.declare_subscriber, AclMessage::DeclareQueryable => &self.declare_queryable, + AclMessage::LivelinessToken => &self.liveliness_token, + AclMessage::DeclareLivelinessSubscriber => &self.declare_liveliness_sub, + AclMessage::LivelinessQuery => &self.liveliness_query, } } fn action_mut(&mut self, action: AclMessage) -> &mut PermissionPolicy { @@ -209,6 +215,9 @@ impl ActionPolicy { AclMessage::Delete => &mut self.delete, AclMessage::DeclareSubscriber => &mut self.declare_subscriber, AclMessage::DeclareQueryable => &mut self.declare_queryable, + AclMessage::LivelinessToken => &mut self.liveliness_token, + AclMessage::DeclareLivelinessSubscriber => &mut self.declare_liveliness_sub, + AclMessage::LivelinessQuery => &mut self.liveliness_query, } } } diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index da55c191b..68293f4ad 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -16,7 +16,7 @@ #![cfg(target_family = "unix")] mod test { use std::{ - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, time::Duration, }; @@ -56,6 +56,23 @@ mod test { test_reply_allow_then_deny(27449).await; } + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_acl_liveliness() { + zenoh::init_log_from_env_or("error"); + + test_liveliness_allow(27450).await; + test_liveliness_deny(27450).await; + + test_liveliness_allow_deny_token(27450).await; + test_liveliness_deny_allow_token(27450).await; + + test_liveliness_allow_deny_sub(27450).await; + test_liveliness_deny_allow_sub(27450).await; + + test_liveliness_allow_deny_query(27450).await; + test_liveliness_deny_allow_query(27450).await; + } + async fn get_basic_router_config(port: u16) -> Config { let mut config = Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); @@ -805,4 +822,718 @@ mod test { close_sessions(get_session, qbl_session).await; close_router_session(session).await; } + + async fn test_liveliness_deny(port: u16) { + println!("test_liveliness_deny"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [], + "subjects": [], + "policies": [], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_allow(port: u16) { + println!("test_liveliness_allow"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [], + "subjects": [], + "policies": [], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_allow_deny_token(port: u16) { + println!("test_liveliness_allow_deny_token"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + id: "filter token", + permission: "deny", + messages: ["liveliness_token"], + flows: ["ingress", "egress"], + key_exprs: ["test/demo"], + }, + ], + "subjects": [ + { id: "all" }, + ], + "policies": [ + { + "subjects": ["all"], + "rules": ["filter token"], + }, + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_deny_allow_token(port: u16) { + println!("test_liveliness_deny_allow_token"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + id: "filter token", + permission: "allow", + messages: ["liveliness_token"], + flows: ["ingress", "egress"], + key_exprs: ["test/demo"], + }, + ], + "subjects": [ + { id: "all" }, + ], + "policies": [ + { + "subjects": ["all"], + "rules": ["filter token"], + }, + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_allow_deny_sub(port: u16) { + println!("test_liveliness_allow_deny_sub"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + id: "filter sub", + permission: "deny", + messages: ["declare_liveliness_subscriber"], + flows: ["ingress", "egress"], + key_exprs: ["test/demo"], + }, + ], + "subjects": [ + { id: "all" }, + ], + "policies": [ + { + "subjects": ["all"], + "rules": ["filter sub"], + }, + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_deny_allow_sub(port: u16) { + println!("test_liveliness_deny_allow_sub"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + id: "filter sub", + permission: "allow", + messages: ["declare_liveliness_subscriber", "liveliness_token"], + flows: ["ingress", "egress"], + key_exprs: ["test/demo"], + }, + ], + "subjects": [ + { id: "all" }, + ], + "policies": [ + { + "subjects": ["all"], + "rules": ["filter sub"], + }, + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_allow_deny_query(port: u16) { + println!("test_liveliness_allow_deny_query"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + id: "filter query", + permission: "deny", + messages: ["liveliness_query"], + flows: ["ingress", "egress"], + key_exprs: ["test/demo"], + }, + ], + "subjects": [ + { id: "all" }, + ], + "policies": [ + { + "subjects": ["all"], + "rules": ["filter query"], + }, + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } + + async fn test_liveliness_deny_allow_query(port: u16) { + println!("test_liveliness_deny_allow_query"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + id: "filter query", + permission: "allow", + messages: ["liveliness_query", "liveliness_token"], + flows: ["ingress", "egress"], + key_exprs: ["test/demo"], + }, + ], + "subjects": [ + { id: "all" }, + ], + "policies": [ + { + "subjects": ["all"], + "rules": ["filter query"], + }, + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; + } }