From cbcb612fe7e7db6b077ead06f93b2cae7b42c5c6 Mon Sep 17 00:00:00 2001 From: Mohamed Aouadhi Date: Sat, 3 Feb 2024 23:42:43 +0100 Subject: [PATCH] Add mqtt client to plant watering logic instread of udp --- Cargo.lock | 99 ++++++++++++++++++++++++++++++++++------- Cargo.toml | 9 +++- deploy.sh | 13 ++++++ src/lib/plant.rs | 96 +++++++++++++++++++++++++-------------- src/lib/polybot.rs | 4 +- src/lib/services/llm.rs | 4 ++ 6 files changed, 173 insertions(+), 52 deletions(-) create mode 100755 deploy.sh diff --git a/Cargo.lock b/Cargo.lock index ccae82b..d990345 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -743,9 +743,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.82" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ "jobserver", "libc", @@ -1180,6 +1180,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1473,6 +1484,7 @@ dependencies = [ "qdrant-client", "rand", "reqwest", + "rumqttc", "serde", "serde_json", "serde_with", @@ -1886,9 +1898,9 @@ checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -2208,6 +2220,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "111.28.1+1.1.1w" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bf7e82ffd6d3d6e6524216a0bfd85509f68b5b28354e8e7800057e44cefa9b4" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.91" @@ -2216,6 +2237,7 @@ checksum = "866b5f16f90776b9bb8dc1e1802ac6f0513de3a7a7465867bfbc563dc737faac" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -2675,12 +2697,44 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys", +] + +[[package]] +name = "rumqttc" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8" +dependencies = [ + "bytes", + "flume", + "futures-util", + "log", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki", + "thiserror", + "tokio", + "tokio-rustls", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2736,7 +2790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", - "ring", + "ring 0.16.20", "rustls-webpki", "sct", ] @@ -2764,12 +2818,12 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.3" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -2814,8 +2868,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3073,6 +3127,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "string_cache" version = "0.8.7" @@ -3743,6 +3806,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.4.0" @@ -3925,8 +3994,8 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e6f79d9..bca303c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,9 @@ name = "homebot" version = "0.1.0" edition = "2021" +[profile.release] +strip = "debuginfo" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -12,7 +15,7 @@ serde_json = "1.0" anyhow = "1.0" serde = { version = "1.0.2", features = ["derive"] } actix-web = { version = "4", features = ["openssl"] } -openssl = "0.10" +openssl = { version = "0.10", features = ["vendored"] } toml = "0.8.8" serde_with = { version = "3.0.0", features = ["chrono"] } chrono = "0.4.26" @@ -35,6 +38,7 @@ llm-chain-qdrant = "0.13.0" qdrant-client = "1.4.0" html2text = "0.11.0" influxdb = { version = "0.7.1", features = ["derive"] } +rumqttc = "0.23.0" [[bin]] name = "homebot" @@ -48,3 +52,6 @@ path = "src/lib/lib.rs" actix-rt = "2.8.0" httpmock = "0.7.0" tempfile = "3.7.1" + +[package.metadata.cross.target.aarch64-unknown-linux-gnu] +xargo = false diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..9a6cb18 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set -eux +cross build --release --target aarch64-unknown-linux-gnu + +#stop the service on the pi +ssh mohamed@awax.local sudo systemctl stop homebot.service + +#copy the binary +scp target/aarch64-unknown-linux-gnu/release/homebot mohamed@awax.local:~/homebot/ + +#start the service +ssh mohamed@awax.local sudo systemctl start homebot.service diff --git a/src/lib/plant.rs b/src/lib/plant.rs index 3b19789..75dfe3a 100644 --- a/src/lib/plant.rs +++ b/src/lib/plant.rs @@ -1,21 +1,21 @@ use std::sync::Arc; +use std::time::Duration; use crate::Bot; -use actix_web::cookie::time::format_description::parse; use anyhow::Result; use chrono::{DateTime, Utc}; +use influxdb::Client; use influxdb::InfluxDbWriteable; -use influxdb::{Client, Query, ReadQuery, Timestamp}; +use rumqttc::{AsyncClient, Event}; +use rumqttc::{MqttOptions, Packet}; use serde::Deserialize; -use tokio::net::UdpSocket; #[derive(Deserialize, Debug)] pub struct PlantData { moisture: u32, - is_watering: bool, } -struct Plant { +struct _Plant { name: String, moisture_level: u32, last_watering: DateTime, @@ -25,11 +25,11 @@ struct Plant { struct PlantReading { time: DateTime, moisture: u32, - is_watering: bool, #[influxdb(tag)] plant_name: String, } +#[allow(dead_code)] pub struct PlantServer { host: String, port: u32, @@ -38,50 +38,78 @@ pub struct PlantServer { } impl PlantServer { + const MAX_DRY: u32 = 1900; + const MIN_WET: u32 = 1500; + pub fn new(host: &str, chat_id: &str, port: u32, db_token: &str) -> Self { Self { host: host.to_string(), chat_id: chat_id.to_string(), port, - db_client: Client::new("http://localhost:8086", "homebucket").with_token(db_token), + db_client: Client::new("http://192.168.2.132:8086", "homebucket").with_token(db_token), } } pub async fn start(&self, bot: Arc) -> Result<()> { - let socket = UdpSocket::bind(format!("{}:{}", self.host, self.port)).await?; - let mut buf = [0u8; 2048]; - let mut avg_moisture: Vec = vec![]; + let mut mqttoptions = MqttOptions::new("homebot", "192.168.2.214", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + + client + .subscribe("plants/coleus/moisture", rumqttc::QoS::AtMostOnce) + .await?; loop { - let (len, _) = socket.recv_from(&mut buf).await?; - let data = &buf[..len]; + let notification: Event = eventloop.poll().await?; - match serde_json::from_slice::(data) { - Ok(parsed_json) => { - let write_query = PlantReading { - time: Utc::now(), - moisture: parsed_json.moisture, - is_watering: parsed_json.is_watering, - plant_name: "flowery".to_string(), - } - .into_query("moisture"); + if let Event::Incoming(Packet::Publish(data)) = notification { + match serde_json::from_slice::(&data.payload) { + Ok(parsed_json) => { + let write_query = PlantReading { + time: Utc::now(), + moisture: parsed_json.moisture, + plant_name: "flowery".to_string(), + } + .into_query("moisture"); - self.db_client.query(write_query).await?; + self.db_client.query(write_query).await?; - avg_moisture.push(parsed_json.moisture); - tracing::info!("Received {:?}", parsed_json); - if avg_moisture.len() == 12 { - bot.send_message( - &self.chat_id, - &format!("Moisture now is {}.", avg_moisture.iter().sum::() / 12), - ) - .await?; - avg_moisture.clear(); + avg_moisture.push(parsed_json.moisture); + tracing::info!("Received {:?}", parsed_json); + if avg_moisture.len() == 12 { + let avg = avg_moisture.iter().sum::() / 12; + match avg { + Self::MAX_DRY.. => { + // Inform the user in telegram + bot.send_message( + &self.chat_id, + &format!("Moisture now is {}. Watering the plant!", avg), + ) + .await?; + // water the plant + client + .publish( + "plants/coleus/water", + rumqttc::QoS::AtLeastOnce, + false, + "true", + ) + .await? + } + 0..=Self::MIN_WET => { + bot.send_message(&self.chat_id, "Coleus is too wet!") + .await?; + tracing::info!("Coleus is too wet!") + } + _ => (), + }; + avg_moisture.clear(); + } + } + Err(e) => { + tracing::error!("Error parsing json {}", e); } - } - Err(e) => { - tracing::error!("Error parsing json {}", e); } } } diff --git a/src/lib/polybot.rs b/src/lib/polybot.rs index 05ec0f9..7b96d33 100644 --- a/src/lib/polybot.rs +++ b/src/lib/polybot.rs @@ -92,7 +92,7 @@ impl Polybot { loop { let mut server = BotServer::new(self.config.server.clone(), self.bot.clone()); let plant = PlantServer::new( - "192.168.2.132", + "192.168.2.214", &self.config.bot.chat_id, 3333, &self.config.bot.db_token, @@ -110,7 +110,7 @@ impl Polybot { continue; } e = plant.start(self.bot.clone()) => { - tracing::info!("Plant Server exited {:?}", e); + tracing::error!("Plant Server exited {:?}", e); continue; } } diff --git a/src/lib/services/llm.rs b/src/lib/services/llm.rs index cc2df50..37f3617 100644 --- a/src/lib/services/llm.rs +++ b/src/lib/services/llm.rs @@ -2,9 +2,13 @@ use std::sync::Arc; use anyhow::{bail, Result}; use async_trait::async_trait; + +#[allow(unused)] use llm_chain::document_stores::document_store::DocumentStore; +#[allow(unused)] use llm_chain::tools::tools::VectorStoreTool; use llm_chain::{chains::conversation::Chain, executor, parameters, prompt, step::Step}; +#[allow(unused)] use llm_chain::{ schema::{Document, EmptyMetadata}, traits::{Embeddings, VectorStore},