From e06a581843e5682789082e60b48dfdfc7f8241f9 Mon Sep 17 00:00:00 2001 From: Christian M Date: Sun, 15 Oct 2023 09:43:01 +0200 Subject: [PATCH] :sparkles: adds gatt example --- examples/bt_gatt_client.rs | 205 +++++++++++++++++++++++++++++++++++++ src/bluetooth/gatt_srv.rs | 107 +++++++++++++++++-- src/bluetooth/mod.rs | 4 +- 3 files changed, 307 insertions(+), 9 deletions(-) create mode 100644 examples/bt_gatt_client.rs diff --git a/examples/bt_gatt_client.rs b/examples/bt_gatt_client.rs new file mode 100644 index 0000000..cd3f505 --- /dev/null +++ b/examples/bt_gatt_client.rs @@ -0,0 +1,205 @@ +//! Connects to the Bluetooth GATT echo service and tests it. + +use bluer::{gatt::remote::Characteristic, AdapterEvent, Device, Result}; +use futures::{pin_mut, StreamExt}; +use rand::Rng; +use std::time::Duration; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time::{sleep, timeout}, +}; +use pretty_env_logger::env_logger; + +use ornithology_pi::bluetooth::SERVICE_UUID; +use ornithology_pi::bluetooth::CHARACTERISTIC_UUID; + +async fn find_our_characteristic(device: &Device) -> Result> { + let addr = device.address(); + let uuids = device.uuids().await?.unwrap_or_default(); + println!("Discovered device {} with service UUIDs {:?}", addr, &uuids); + let md = device.manufacturer_data().await?; + println!(" Manufacturer data: {:x?}", &md); + + if uuids.contains(&SERVICE_UUID) { + println!(" Device provides our service!"); + if !device.is_connected().await? { + println!(" Connecting..."); + let mut retries = 2; + loop { + match device.connect().await { + Ok(()) => break, + Err(err) if retries > 0 => { + println!(" Connect error: {}", &err); + retries -= 1; + } + Err(err) => return Err(err), + } + } + println!(" Connected"); + } else { + println!(" Already connected"); + } + + println!(" Enumerating services..."); + for service in device.services().await? { + let uuid = service.uuid().await?; + println!(" Service UUID: {}", &uuid); + if uuid == SERVICE_UUID { + println!(" Found our service!"); + for char in service.characteristics().await? { + let uuid = char.uuid().await?; + println!(" Characteristic UUID: {}", &uuid); + if uuid == CHARACTERISTIC_UUID { + println!(" Found our characteristic!"); + return Ok(Some(char)); + } + } + } + } + + println!(" Not found!"); + } + + Ok(None) +} + +async fn exercise_characteristic(char: &Characteristic) -> Result<()> { + let mut write_io = char.write_io().await?; + println!(" Obtained write IO with MTU {} bytes", write_io.mtu()); + let mut notify_io = char.notify_io().await?; + println!(" Obtained notification IO with MTU {} bytes", notify_io.mtu()); + + // Flush notify buffer. + let mut buf = [0; 1024]; + while let Ok(Ok(_)) = timeout(Duration::from_secs(1), notify_io.read(&mut buf)).await {} + + let mut rng = rand::thread_rng(); + for i in 0..1024 { + let mut len = rng.gen_range(0..20000); + + // Try to trigger packet reordering over EATT. + if i % 10 == 0 { + // Big packet is split into multiple small packets. + // (by L2CAP layer, because GATT MTU is bigger than L2CAP MTU) + len = write_io.mtu(); // 512 + } + if i % 10 == 1 { + // Small packet can use different L2CAP channel when EATT is enabled. + len = 20; + } + // Thus small packet can arrive before big packet. + // The solution is to disable EATT in /etc/bluetooth/main.conf. + + println!(" Test iteration {i} with data size {len}"); + let data: Vec = (0..len).map(|_| rng.gen()).collect(); + + // We must read back the data while sending, otherwise the connection + // buffer will overrun and we will lose data. + let read_task = tokio::spawn(async move { + let mut echo_buf = vec![0u8; len]; + let res = match notify_io.read_exact(&mut echo_buf).await { + Ok(_) => Ok(echo_buf), + Err(err) => Err(err), + }; + (notify_io, res) + }); + + // Note that write_all will automatically split the buffer into + // multiple writes of MTU size. + write_io.write_all(&data).await.expect("write failed"); + + println!(" Waiting for echo"); + let (notify_io_back, res) = read_task.await.unwrap(); + notify_io = notify_io_back; + let echo_buf = res.expect("read failed"); + + if echo_buf != data { + println!(); + println!("Echo data mismatch!"); + println!("Send data: {:x?}", &data); + println!("Received data: {:x?}", &echo_buf); + println!(); + println!("By 512 blocks:"); + for (sent, recv) in data.chunks(512).zip(echo_buf.chunks(512)) { + println!(); + println!( + "Send: {:x?} ... {:x?}", + &sent[0..4.min(sent.len())], + &sent[sent.len().saturating_sub(4)..] + ); + println!( + "Recv: {:x?} ... {:x?}", + &recv[0..4.min(recv.len())], + &recv[recv.len().saturating_sub(4)..] + ); + } + println!(); + + panic!("echoed data does not match sent data"); + } + println!(" Data matches"); + } + + println!(" Test okay"); + Ok(()) +} + +#[tokio::main] +async fn main() -> bluer::Result<()> { + env_logger::init(); + let session = bluer::Session::new().await?; + let binding = session.adapter_names().await?; + let adapter_name = binding.last().unwrap(); + let adapter = session.adapter(adapter_name)?; + adapter.set_powered(true).await?; + + { + println!( + "Discovering on Bluetooth adapter {} with address {}\n", + adapter.name(), + adapter.address().await? + ); + let discover = adapter.discover_devices().await?; + pin_mut!(discover); + let mut done = false; + while let Some(evt) = discover.next().await { + match evt { + AdapterEvent::DeviceAdded(addr) => { + let device = adapter.device(addr)?; + match find_our_characteristic(&device).await { + Ok(Some(char)) => match exercise_characteristic(&char).await { + Ok(()) => { + println!(" Characteristic exercise completed"); + done = true; + } + Err(err) => { + println!(" Characteristic exercise failed: {}", &err); + } + }, + Ok(None) => (), + Err(err) => { + println!(" Device failed: {}", &err); + let _ = adapter.remove_device(device.address()).await; + } + } + match device.disconnect().await { + Ok(()) => println!(" Device disconnected"), + Err(err) => println!(" Device disconnection failed: {}", &err), + } + println!(); + } + AdapterEvent::DeviceRemoved(addr) => { + println!("Device removed {addr}"); + } + _ => (), + } + if done { + break; + } + } + println!("Stopping discovery"); + } + + sleep(Duration::from_secs(1)).await; + Ok(()) +} \ No newline at end of file diff --git a/src/bluetooth/gatt_srv.rs b/src/bluetooth/gatt_srv.rs index 99c54c5..f18c69d 100644 --- a/src/bluetooth/gatt_srv.rs +++ b/src/bluetooth/gatt_srv.rs @@ -2,7 +2,15 @@ use crate::bluetooth::setup_session; use crate::Sighting; use bluer::{ adv::Advertisement, - gatt::local::{Application, Characteristic, CharacteristicRead, Service}, + gatt::{ + local::{ + characteristic_control, Application, ApplicationHandle, Characteristic, + CharacteristicControl, CharacteristicControlEvent, CharacteristicControlHandle, + CharacteristicNotify, CharacteristicNotifyMethod, CharacteristicRead, + CharacteristicWrite, CharacteristicWriteMethod, Service, + }, + CharacteristicReader, CharacteristicWriter, + }, }; use futures::FutureExt; use std::{ @@ -10,14 +18,16 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; +use futures::{future, pin_mut, StreamExt}; use tokio::{ - io::{AsyncBufReadExt, BufReader}, + io::{AsyncBufReadExt, BufReader, AsyncReadExt, AsyncWriteExt}, time::sleep, }; -use super::MANUFACTURER_ID; +use super::{MANUFACTURER_ID, CHARACTERISTIC_UUID}; + +use super::{SERVICE_UUID, CHANNEL, MTU}; -pub const SERVICE_UUID: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00001); pub const LAST_SIGHTING_CHARACTERISTIC: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00003); pub const SIGHTING_COUNT_CHARACTERISTIC: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00004); pub const LAST_SPECIES_CHARACTERISTIC: uuid::Uuid = uuid::Uuid::from_u128(0xF00DC0DE00005); @@ -96,6 +106,23 @@ pub fn last_species_characteristic(sightings: Arc>>) -> Char } } +pub fn stream_characteristic(char_handle: CharacteristicControlHandle, sightings: Arc>>) -> Characteristic { + Characteristic { uuid: CHARACTERISTIC_UUID, + write: Some(CharacteristicWrite { + write_without_response: true, + method: CharacteristicWriteMethod::Io, + ..Default::default() + }), + notify: Some(CharacteristicNotify { + notify: true, + method: CharacteristicNotifyMethod::Io, + ..Default::default() + }), + control_handle: char_handle, + ..Default::default() + } +} + pub async fn run_advertise( adapter: &bluer::Adapter, ) -> bluer::Result { @@ -115,12 +142,17 @@ pub async fn run_advertise( pub async fn run_app( adapter: &bluer::Adapter, sightings: Arc>>, -) -> bluer::Result { +) -> bluer::Result<( + ApplicationHandle, + CharacteristicControl +)> { + let (char_control, char_handle) = characteristic_control(); let app = Application { services: vec![Service { uuid: SERVICE_UUID, primary: true, characteristics: vec![ + stream_characteristic(char_handle, sightings.clone()), last_sighting_characteristic(sightings.clone()), sighting_count_characteristic(sightings.clone()), last_species_characteristic(sightings.clone()), @@ -130,7 +162,62 @@ pub async fn run_app( ..Default::default() }; let app_handle = adapter.serve_gatt_application(app).await?; - Ok(app_handle) + Ok((app_handle, char_control)) +} + +pub async fn listen(char_control: CharacteristicControl, sightings: Arc>>) -> bluer::Result<()> { + let mut read_buf = Vec::new(); + let mut reader_opt: Option = None; + let mut writer_opt: Option = None; + pin_mut!(char_control); + + loop { + tokio::select! { + evt = char_control.next() => { + match evt { + Some(CharacteristicControlEvent::Write(req)) => { + println!("Accepting write request event with MTU {}", req.mtu()); + read_buf = vec![0; req.mtu()]; + reader_opt = Some(req.accept()?); + }, + Some(CharacteristicControlEvent::Notify(notifier)) => { + println!("Accepting notify request event with MTU {}", notifier.mtu()); + writer_opt = Some(notifier); + }, + None => break, + } + }, + read_res = async { + match &mut reader_opt { + Some(reader) if writer_opt.is_some() => reader.read(&mut read_buf).await, + _ => future::pending().await, + } + } => { + match read_res { + Ok(0) => { + println!("Read stream ended"); + reader_opt = None; + } + Ok(n) => { + let value = read_buf[..n].to_vec(); + println!("Echoing {} bytes: {:x?} ... {:x?}", value.len(), &value[0..4.min(value.len())], &value[value.len().saturating_sub(4) ..]); + if value.len() < 512 { + println!(); + } + if let Err(err) = writer_opt.as_mut().unwrap().write_all(&value).await { + println!("Write failed: {}", &err); + writer_opt = None; + } + } + Err(err) => { + println!("Read stream error: {}", &err); + reader_opt = None; + } + } + } + } + } + Ok(()) } pub async fn run(sightings: Arc>>) -> bluer::Result<()> { @@ -139,7 +226,11 @@ pub async fn run(sightings: Arc>>) -> bluer::Result<()> { let adapter = session.default_adapter().await?; let adv_handle = run_advertise(&adapter).await.unwrap(); - let app_handle = run_app(&adapter, sightings.clone()).await.unwrap(); + let (app_handle, char_control) = + run_app(&adapter, sightings.clone()).await.unwrap(); + + + let listen_handle = tokio::spawn(listen(char_control, sightings.clone())); log::info!("Service ready. Press enter to quit."); let stdin = BufReader::new(tokio::io::stdin()); @@ -149,6 +240,8 @@ pub async fn run(sightings: Arc>>) -> bluer::Result<()> { log::info!("Removing service and advertisement"); drop(adv_handle); drop(app_handle); + //drop(char_control); + drop(listen_handle); sleep(Duration::from_secs(1)).await; Ok(()) } diff --git a/src/bluetooth/mod.rs b/src/bluetooth/mod.rs index bbbcb03..dbae737 100644 --- a/src/bluetooth/mod.rs +++ b/src/bluetooth/mod.rs @@ -2,10 +2,10 @@ use std::sync::{Arc, Mutex}; use crate::Sighting; -mod handle_message; +pub mod handle_message; pub use handle_message::handle_message; -mod message; +pub mod message; pub use message::Message; pub mod gatt_srv;