Skip to content

Commit

Permalink
Add message pulling
Browse files Browse the repository at this point in the history
  • Loading branch information
ozdoganoguzhan authored Apr 25, 2024
1 parent 7718a04 commit 1b1e6fd
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 4 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Cargo.toml:
onvif = { git = "https://github.com/lumeohq/onvif-rs" }
```

## Troubleshooting
## Troubleshooting

If you have an issue with OpenSSL build under Ubuntu, perform the following actions:

Expand All @@ -41,6 +41,7 @@ cargo run --example discovery
```

To [inspect and control a camera](onvif/examples/camera.rs):

```shell script
cargo run --example camera -- help

Expand All @@ -53,6 +54,12 @@ cargo run --example camera -- set-hostname \
cargo run --example camera -- get-stream-uris --uri=http://192.168.0.2:8000
```

To [pull events](onvif/examples/event.rs) from a camera, adjust credentials in event.rs and run:

```shell script
cargo run --example event
```

## Dependencies

- XSD -> Rust code generation: [xsd-parser-rs](https://github.com/lumeohq/xsd-parser-rs)
Expand Down
1 change: 1 addition & 0 deletions onvif/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ futures-util = "0.3.8"
structopt = "0.3.21"
tokio = { version = "1.0.1", features = ["full"] }
tracing-subscriber = "0.2.20"
b_2 = {path = "../wsdl_rs/b_2"}
105 changes: 105 additions & 0 deletions onvif/examples/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// This example pulls messages related to the RuleEngine topic.
// RuleEngine topic consists of events related to motion detection.
// Tested on Dahua, uniview, reolink and axis ip cameras.
// Don't forget to set the camera's IP address, username and password.

use onvif::soap::client::{ClientBuilder, Credentials};
use schema::event::{self, CreatePullPointSubscription, PullMessages};
use url::Url;

#[derive(Debug, Clone)]
pub struct Camera {
pub device_service_url: String,
pub username: String,
pub password: String,
pub event_service_url: String,
}

impl Default for Camera {
fn default() -> Self {
Camera {
device_service_url: "http://192.168.1.100/onvif/device_service".to_string(),
username: "admin".to_string(),
password: "admin".to_string(),
event_service_url: "http://192.168.1.100/onvif/event_service".to_string(),
}
}
}

#[tokio::main]
async fn main() {
let camera_ip = "192.168.1.50";
let username = "admin";
let password = "admin";

let camera: Camera = Camera {
device_service_url: format!("http://{}/onvif/device_service", camera_ip),
username: username.to_string(),
password: password.to_string(),
event_service_url: format!("http://{}/onvif/event_service", camera_ip),
};

let creds: Credentials = Credentials {
username: camera.username.to_string(),
password: camera.password.to_string(),
};
let event_client = ClientBuilder::new(&Url::parse(&camera.event_service_url).unwrap())
.credentials(Some(creds))
.build();
let create_pull_sub_request = CreatePullPointSubscription {
initial_termination_time: None,
filter: Some(b_2::FilterType {
topic_expression: Some(b_2::TopicExpressionType {
dialect: "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet".to_string(),
inner_text: "tns1:RuleEngine//.".to_string(),
}),
}),
subscription_policy: None,
};
let create_pull_puint_sub_response =
event::create_pull_point_subscription(&event_client, &create_pull_sub_request).await;
let camera_sub = match create_pull_puint_sub_response {
Ok(sub) => sub,
Err(e) => {
println!("Error: {:?}", e);
return;
}
};

let uri: Url = Url::parse(&camera_sub.subscription_reference.address).unwrap();
let creds: Credentials = Credentials {
username: camera.username.to_string(),
password: camera.password.to_string(),
};
let pull_msg_client = ClientBuilder::new(&uri)
.credentials(Some(creds))
.auth_type(onvif::soap::client::AuthType::Digest)
.build();
let pull_messages_request = PullMessages {
message_limit: 256,
timeout: xsd_types::types::Duration {
seconds: 1.0,
..Default::default()
},
};

// Main Loop
loop {
let pull_messages_response =
event::pull_messages(&pull_msg_client, &pull_messages_request).await;
let msg = match pull_messages_response {
Ok(msg) => msg,
Err(e) => {
println!("Error: {:?}", e);
continue;
}
};
if !msg.notification_message.is_empty() {
println!("Notification Message: {:?}", msg.notification_message[0]);
} else {
println!("No new notification message");
}

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
}
120 changes: 120 additions & 0 deletions schema/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl transport::Transport for FakeTransport {
}

#[test]
#[cfg(feature = "devicemgmt")]
fn basic_deserialization() {
let response = r#"
<?xml version="1.0" encoding="UTF-8"?>
Expand Down Expand Up @@ -66,6 +67,7 @@ fn basic_deserialization() {
assert_eq!(de.utc_date_time.as_ref().unwrap().time.second, 9);
}

#[cfg(feature = "devicemgmt")]
#[test]
fn basic_serialization() {
let expected = r#"
Expand Down Expand Up @@ -327,6 +329,7 @@ fn duration_deserialization() {
}

#[tokio::test]
#[cfg(feature = "devicemgmt")]
async fn operation_get_system_date_and_time() {
let req: devicemgmt::GetSystemDateAndTime = Default::default();

Expand Down Expand Up @@ -369,6 +372,7 @@ async fn operation_get_system_date_and_time() {
}

#[tokio::test]
#[cfg(feature = "devicemgmt")]
async fn operation_get_device_information() {
let req: devicemgmt::GetDeviceInformation = Default::default();

Expand Down Expand Up @@ -693,3 +697,119 @@ fn media2_configs_name_serialization() {
type_of(&media2::GetAudioDecoderConfigurationOptions::default())
);
}

#[tokio::test]
#[cfg(feature = "event")]
async fn operation_pull_messages() {
let req: event::PullMessages = Default::default();

let transport = FakeTransport {
response: r#"
<tev:PullMessagesResponse
xmlns:tt="http://www.onvif.org/ver10/schema"
xmlns:wsnt="http://docs.oasis-open.org/wsn/b-2"
xmlns:tev="http://www.onvif.org/ver10/events/wsdl"
xmlns:wsa5="http://www.w3.org/2005/08/addressing"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing"
xmlns:wstop="http://docs.oasis-open.org/wsn/t-1"
xmlns:tns1="http://www.onvif.org/ver10/topics">
<tev:CurrentTime>
2023-09-28T16:01:15Z
</tev:CurrentTime>
<tev:TerminationTime>
2023-09-28T16:11:15Z
</tev:TerminationTime>
<wsnt:NotificationMessage>
<wsnt:Topic
Dialect="http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet">
tns1:RuleEngine/CellMotionDetector/Motion
</wsnt:Topic>
<wsnt:Message>
<tt:Message
UtcTime="2023-09-28T16:01:15Z"
PropertyOperation="Initialized">
<tt:Source>
<tt:SimpleItem
Name="VideoSourceConfigurationToken"
Value="00000"/>
<tt:SimpleItem
Name="VideoAnalyticsConfigurationToken"
Value="00000"/>
<tt:SimpleItem
Name="Rule"
Value="00000"/>
</tt:Source>
<tt:Data>
<tt:SimpleItem
Name="IsMotion"
Value="false"/>
</tt:Data>
</tt:Message>
</wsnt:Message>
</wsnt:NotificationMessage>
</tev:PullMessagesResponse>
"#
.into(),
};

let response = event::pull_messages(&transport, &req).await;

let resp = match response {
Ok(resp) => resp,
Err(err) => panic!("Error: {:?}", err),
};

assert_eq!(
resp.notification_message[0].message.msg.source.simple_item[0].name,
"VideoSourceConfigurationToken"
);
assert_eq!(
resp.notification_message[0].message.msg.source.simple_item[0].value,
"00000"
);
assert_eq!(
resp.notification_message[0].message.msg.data.simple_item[0].name,
"IsMotion"
);
assert_eq!(
resp.notification_message[0].message.msg.data.simple_item[0].value,
"false"
);
}

#[tokio::test]
#[cfg(feature = "event")]
async fn operation_create_pullpoint_subscription() {
let req: event::CreatePullPointSubscription = Default::default();

let transport = FakeTransport {
response: r#"
<tev:CreatePullPointSubscriptionResponse
xmlns:tev="http://www.onvif.org/ver10/events/wsdl"
xmlns:wsnt="http://docs.oasis-open.org/wsn/b-2"
xmlns:wsa5="http://www.w3.org/2005/08/addressing">
<tev:SubscriptionReference>
<wsa5:Address>
http://192.168.88.108/onvif/Subscription?Idx=5
</wsa5:Address>
</tev:SubscriptionReference>
<wsnt:CurrentTime>
2023-09-28T16:01:15Z
</wsnt:CurrentTime>
<wsnt:TerminationTime>
2023-09-28T16:11:15Z
</wsnt:TerminationTime>
</tev:CreatePullPointSubscriptionResponse>
"#
.into(),
};

let resp = event::create_pull_point_subscription(&transport, &req)
.await
.unwrap();

assert_eq!(
resp.subscription_reference.address,
"http://192.168.88.108/onvif/Subscription?Idx=5"
);
}
49 changes: 47 additions & 2 deletions wsdl_rs/b_2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ impl Validate for QueryExpressionType {}
pub struct TopicExpressionType {
#[yaserde(attribute, rename = "Dialect")]
pub dialect: String,

#[yaserde(text)]
pub inner_text: String,
}

impl Validate for TopicExpressionType {}
Expand All @@ -36,7 +39,10 @@ impl Validate for TopicExpressionType {}
prefix = "wsnt",
namespace = "wsnt: http://docs.oasis-open.org/wsn/b-2"
)]
pub struct FilterType {}
pub struct FilterType {
#[yaserde(prefix = "wsnt", rename = "TopicExpression")]
pub topic_expression: Option<TopicExpressionType>,
}

impl Validate for FilterType {}

Expand Down Expand Up @@ -131,15 +137,54 @@ pub struct NotificationMessageHolderType {

impl Validate for NotificationMessageHolderType {}

#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[yaserde(prefix = "tt", namespace = "tt: http://www.onvif.org/ver10/schema")]
pub struct SimpleItemType {
// Item name.
#[yaserde(attribute, rename = "Name")]
pub name: String,

// Item value. The type is defined in the corresponding description.
#[yaserde(attribute, rename = "Value")]
pub value: String,
}

pub mod notification_message_holder_type {
use super::*;

#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[yaserde(prefix = "tt", namespace = "tt: http://www.onvif.org/ver10/schema")]
pub struct DataType {
#[yaserde(prefix = "tt", rename = "SimpleItem")]
pub simple_item: Vec<SimpleItemType>,
}

#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[yaserde(prefix = "tt", namespace = "tt: http://www.onvif.org/ver10/schema")]
pub struct SourceType {
#[yaserde(prefix = "tt", rename = "SimpleItem")]
pub simple_item: Vec<SimpleItemType>,
}

#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[yaserde(prefix = "tt", namespace = "tt: http://www.onvif.org/ver10/schema")]
pub struct MessageTypeInner {
#[yaserde(prefix = "tt", rename = "Source")]
pub source: SourceType,

#[yaserde(prefix = "tt", rename = "Data")]
pub data: DataType,
}

#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[yaserde(
prefix = "wsnt",
namespace = "wsnt: http://docs.oasis-open.org/wsn/b-2"
)]
pub struct MessageType {}
pub struct MessageType {
#[yaserde(prefix = "tt", rename = "Message")]
pub msg: MessageTypeInner,
}

impl Validate for MessageType {}
}
Expand Down
2 changes: 1 addition & 1 deletion wsdl_rs/ws_addr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub type EndpointReference = EndpointReferenceType;
)]
pub struct EndpointReferenceType {
#[yaserde(prefix = "tns", rename = "Address")]
pub address: AttributedURIType,
pub address: String,

#[yaserde(prefix = "tns", rename = "ReferenceParameters")]
pub reference_parameters: Option<ReferenceParameters>,
Expand Down

0 comments on commit 1b1e6fd

Please sign in to comment.