Skip to content

Commit

Permalink
added plant server
Browse files Browse the repository at this point in the history
  • Loading branch information
MedAouadhi committed Jan 16, 2024
1 parent a36ac96 commit 8379ff1
Show file tree
Hide file tree
Showing 12 changed files with 819 additions and 217 deletions.
707 changes: 515 additions & 192 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ anyhow = "1.0"
serde = { version = "1.0.2", features = ["derive"] }
actix-web = { version = "4", features = ["openssl"] }
openssl = "0.10"
toml = "0.7.1"
toml = "0.8.8"
serde_with = { version = "3.0.0", features = ["chrono"] }
chrono = "0.4.26"
async-trait = "0.1.72"
Expand All @@ -27,10 +27,14 @@ tracing-actix-web = "0.7.6"
actix-server = "2.2.0"
socket2 = "0.5.3"
futures = "0.3.28"
llm-chain = "0.12.3"
llm-chain-openai = "0.12.3"
llm-chain = "0.13.0"
llm-chain-openai = "0.13.0"
bot_commands_macro = { path = "./bot_commands_macro" }
enum_dispatch = "0.3.12"
llm-chain-qdrant = "0.13.0"
qdrant-client = "1.4.0"
html2text = "0.11.0"
influxdb = { version = "0.7.1", features = ["derive"] }

[[bin]]
name = "homebot"
Expand All @@ -42,5 +46,5 @@ path = "src/lib/lib.rs"

[dev-dependencies]
actix-rt = "2.8.0"
httpmock = "0.6.8"
httpmock = "0.7.0"
tempfile = "3.7.1"
12 changes: 6 additions & 6 deletions bot_commands_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn bot_commands(_args: TokenStream, input: TokenStream) -> TokenStream {
}
}

let attrs_count = vec![&chat_start_cmd, &chat_exit_cmd, &llm_request_cmd]
let attrs_count = [&chat_start_cmd, &chat_exit_cmd, &llm_request_cmd]
.iter()
.filter(|&x| x.is_some())
.count();
Expand All @@ -77,7 +77,7 @@ pub fn bot_commands(_args: TokenStream, input: TokenStream) -> TokenStream {
}

let handler_structs = commands.iter().map(|(command_name, _, _, _, _)| {
let struct_name = get_cmd_struct_name(&command_name);
let struct_name = get_cmd_struct_name(command_name);
quote! {

#[derive(Default)]
Expand All @@ -89,7 +89,7 @@ pub fn bot_commands(_args: TokenStream, input: TokenStream) -> TokenStream {
let handler_impls = commands
.iter()
.map(|(command_name, func_name, _, chat_start, chat_exit)| {
let struct_name = get_cmd_struct_name(&command_name);
let struct_name = get_cmd_struct_name(command_name);
let state = if chat_start == &Some(true) {
quote! {
user.set_chat_mode(true).await;
Expand All @@ -114,7 +114,7 @@ pub fn bot_commands(_args: TokenStream, input: TokenStream) -> TokenStream {
});

let command_insert = commands.iter().map(|(command_name, _, _, _, _)| {
let struct_name = get_cmd_struct_name(&command_name);
let struct_name = get_cmd_struct_name(command_name);
quote! { handlers.insert(#command_name.to_string(), Box::new(#struct_name))}
});

Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn bot_commands(_args: TokenStream, input: TokenStream) -> TokenStream {
new_items.push(Item::Impl(parsed_impl));

// Create new content with the original brace token and the new items
let new_content = Some((brace.clone(), new_items));
let new_content = Some((*brace, new_items));
let new_module = ItemMod {
attrs: module.attrs.clone(),
vis: module.vis.clone(),
Expand Down Expand Up @@ -271,7 +271,7 @@ pub fn handler(args: TokenStream, input: TokenStream) -> TokenStream {
// for now, only check that cmd is present.
let is_cmd_in_args = args.into_iter().any(|e| {
if let proc_macro::TokenTree::Ident(x) = e {
x.to_string() == "cmd".to_string()
x.to_string() == *"cmd"
} else {
false
}
Expand Down
28 changes: 28 additions & 0 deletions src/bot_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod commands {
use polybot::types::{BotUserActions, WeatherProvider};
use polybot::utils::{get_affirmation, get_ip};
use rand::Rng;
use std::io::Cursor;

#[handler(cmd = "/ip")]
async fn ip(_user_tx: impl BotUserActions, _: String) -> String {
Expand Down Expand Up @@ -94,4 +95,31 @@ pub mod commands {
async fn dice(_: impl BotUserActions, _: String) -> String {
rand::thread_rng().gen_range(1..=6).to_string()
}

#[handler(cmd = "/docsearch")]
async fn retrieval(_: impl BotUserActions, request: String) -> String {
if let Ok(agent) = OpenAiModel::try_new() {
if let Ok(answer) = agent.retrieval("mohamed", &request).await {
return answer;
}
"Problem getting the agent response".to_string()
} else {
"Could not create the llm agent, check the API key".to_string()
}
}

#[handler(cmd = "/url")]
async fn url(_: impl BotUserActions, request: String) -> String {
tracing::debug!("getting {}", request);
if let Ok(resp) = reqwest::get(request).await {
let body = resp.text().await.unwrap();
let cursor = Cursor::new(body.into_bytes());
let out = html2text::from_read(cursor, 200);
tracing::debug!("{out}");
// out
"printed it".to_string()
} else {
"Problem getting the url!".to_string()
}
}
}
1 change: 1 addition & 0 deletions src/lib/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod server;
pub mod telegram;
pub mod types;
pub use types::{Bot, BotConfig, Config, ServerConfig};
pub mod plant;
pub mod polybot;
pub mod services;
pub mod utils;
89 changes: 89 additions & 0 deletions src/lib/plant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::Arc;

use crate::Bot;
use actix_web::cookie::time::format_description::parse;
use anyhow::Result;
use chrono::{DateTime, Utc};
use influxdb::InfluxDbWriteable;
use influxdb::{Client, Query, ReadQuery, Timestamp};
use serde::Deserialize;
use tokio::net::UdpSocket;

#[derive(Deserialize, Debug)]
pub struct PlantData {
moisture: u32,
is_watering: bool,
}

struct Plant {
name: String,
moisture_level: u32,
last_watering: DateTime<Utc>,
}

#[derive(InfluxDbWriteable)]
struct PlantReading {
time: DateTime<Utc>,
moisture: u32,
is_watering: bool,
#[influxdb(tag)]
plant_name: String,
}

pub struct PlantServer {
host: String,
port: u32,
chat_id: String,
db_client: Client,
}

impl PlantServer {
pub fn new(host: &str, chat_id: &str, port: u32, db_token: &str) -> Self {
Self {
host: host.to_string(),
chat_id: chat_id.to_string(),
port,
db_client: Client::new("http://localhost:8086", "homebucket").with_token(db_token),
}
}

pub async fn start(&self, bot: Arc<impl Bot>) -> Result<()> {
let socket = UdpSocket::bind(format!("{}:{}", self.host, self.port)).await?;
let mut buf = [0u8; 2048];

let mut avg_moisture: Vec<u32> = vec![];

loop {
let (len, _) = socket.recv_from(&mut buf).await?;
let data = &buf[..len];

match serde_json::from_slice::<PlantData>(data) {
Ok(parsed_json) => {
let write_query = PlantReading {
time: Utc::now(),
moisture: parsed_json.moisture,
is_watering: parsed_json.is_watering,
plant_name: "flowery".to_string(),
}
.into_query("moisture");

self.db_client.query(write_query).await?;

avg_moisture.push(parsed_json.moisture);
tracing::info!("Received {:?}", parsed_json);
if avg_moisture.len() == 12 {
bot.send_message(
&self.chat_id,
&format!("Moisture now is {}.", avg_moisture.iter().sum::<u32>() / 12),
)
.await?;
avg_moisture.clear();
}
}
Err(e) => {
tracing::error!("Error parsing json {}", e);
}
}
}
}
}
26 changes: 20 additions & 6 deletions src/lib/polybot.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::plant::PlantServer;
use crate::server::BotServer;
use crate::utils::{generate_certificate, get_ip};
use crate::{Bot, Config};
Expand Down Expand Up @@ -45,12 +46,16 @@ impl<B: Bot> Polybot<B> {
// explicity handle the result as we are in async block
if let Ok(current_ip) = get_ip().await {
debug!("Current ip = {:?}", current_ip);
if !bot_clone.is_webhook_configured(&current_ip).await.unwrap() {
info!("Certificate is not correclty configured, configuring ...");
if let Ok(configured) = bot_clone.is_webhook_configured(&current_ip).await {
if !configured {
info!("Certificate is not correclty configured, configuring ...");
} else {
// the webhook is already set
tokio::time::sleep(timeout).await;
continue;
}
} else {
// the webhook is already set
tokio::time::sleep(timeout).await;
continue;
error!("Issue with getting the webhook status.");
}

// generate new certificate
Expand Down Expand Up @@ -84,9 +89,14 @@ impl<B: Bot> Polybot<B> {
}
});
}

loop {
let mut server = BotServer::new(self.config.server.clone(), self.bot.clone());
let plant = PlantServer::new(
"192.168.2.132",
&self.config.bot.chat_id,
3333,
&self.config.bot.db_token,
);

// the flow will block here, until one of the branches terminates, which is due to:
// - The server terminates by itself (e.g crash ..)
Expand All @@ -99,6 +109,10 @@ impl<B: Bot> Polybot<B> {
server.stop().await;
continue;
}
e = plant.start(self.bot.clone()) => {
tracing::info!("Plant Server exited {:?}", e);
continue;
}
}
}
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/lib/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,16 @@ async fn handler(body: web::Bytes, bot: web::Data<Arc<dyn Bot>>) -> impl Respond
error!("Wrong message format received! {:#?}", body.to_vec());
return HttpResponse::BadRequest();
};
if bot.into_inner().handle_message(update).await.is_err() {
error!("Failed to handle the message!");
return HttpResponse::InternalServerError();
if bot
.into_inner()
.handle_message(update.clone())
.await
.is_err()
{
error!(
"Failed to handle the message! {}, continuing anyway!",
update
);
}
HttpResponse::Ok()
}
Expand Down
Loading

0 comments on commit 8379ff1

Please sign in to comment.