diff --git a/Cargo.lock b/Cargo.lock index 7251408..5a60f70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,7 +56,6 @@ dependencies = [ "aquila_store", "async-stream", "async-trait", - "clokwerk", "dotenv", "futures", "json_env_logger2", @@ -462,15 +461,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "clokwerk" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd108d365fcb6d7eddf17a6718eb6a33db18ba4178f8cc6b667f480710f10d76" -dependencies = [ - "chrono", -] - [[package]] name = "combine" version = "4.6.7" diff --git a/Cargo.toml b/Cargo.toml index f2f2a21..7f4b992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ futures = "0.3" serde_json = { version = "1.0" } json_env_logger2 = "0.2" log = "0.4.22" -clokwerk = { version = "0.4", features = ["async"] } tucana = { version = "0.0.13", features = ["sagittarius", "aquila"] } dotenv = "0.15.0" rabbitmq-stream-client = "0.7.0" diff --git a/aquila/Cargo.toml b/aquila/Cargo.toml index af3b331..08a8d07 100644 --- a/aquila/Cargo.toml +++ b/aquila/Cargo.toml @@ -8,7 +8,6 @@ aquila_cache = { workspace = true } aquila_store = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } -clokwerk = { workspace = true } dotenv = { workspace = true } futures = { workspace = true } log = { workspace = true } diff --git a/aquila/src/client/sagittarius/flow_client.rs b/aquila/src/client/sagittarius/flow_client.rs index 70f53f6..8403f85 100644 --- a/aquila/src/client/sagittarius/flow_client.rs +++ b/aquila/src/client/sagittarius/flow_client.rs @@ -1,16 +1,14 @@ use aquila_store::{FlowService, FlowServiceBase}; use async_trait::async_trait; use futures::StreamExt; -use log::{error, info}; +use log::{debug, error}; use std::sync::Arc; use tokio::sync::Mutex; use tonic::transport::Channel; use tonic::Request; +use tucana::sagittarius::flow_response::Data; use tucana::sagittarius::flow_service_client::FlowServiceClient; -use tucana::sagittarius::{FlowCommandType, FlowGetRequest, FlowLogonRequest, FlowResponse}; - -const INSERT: i32 = FlowCommandType::Insert as i32; -const DELETE: i32 = FlowCommandType::Delete as i32; +use tucana::sagittarius::{FlowLogonRequest, FlowResponse}; /// Struct representing a service for receiving flows from `Sagittarius`. #[derive(Clone)] @@ -26,8 +24,7 @@ pub trait SagittariusFlowClient { sagittarius_url: String, flow_service: Arc>, ) -> SagittariusFlowClientBase; - async fn send_flow_update_request(&mut self); - async fn send_start_request(&mut self); + async fn init_flow_stream(&mut self); } /// Implementation for a service for receiving flows from `Sagittarius`. @@ -55,38 +52,10 @@ impl SagittariusFlowClient for SagittariusFlowClientBase { } } - /// Will send a request `FlowGetRequest` to `Sagittarius` - /// Inserts/Deletes flows contained in the response into Redis - async fn send_flow_update_request(&mut self) { - let mut flow_service = self.flow_service.lock().await; - let flow_ids = match flow_service.get_all_flow_ids().await { - Ok(result) => result, - Err(redis_error) => { - error!("Service wasn't able to get ids {}", redis_error); - return; - } - }; - - let request = Request::new(FlowGetRequest { flow_ids }); - - let response = match self.client.get(request).await { - Ok(res) => res.into_inner(), - Err(status) => { - error!("Received a {status}, can't retrieve flows from Sagittarius"); - return; - } - }; - - let update_flows = response.updated_flows; - let deleted_flow_ids = response.deleted_flow_ids; - flow_service.insert_flows(update_flows).await; - flow_service.delete_flows(deleted_flow_ids).await - } - /// Will send a request `FlowLogonRequest` to `Sagittarius` /// Will establish a stream. /// `Sagittarius` will send update/delete commands and the flow to do that with. - async fn send_start_request(&mut self) { + async fn init_flow_stream(&mut self) { let request = Request::new(FlowLogonRequest {}); let response = match self.client.update(request).await { Ok(res) => res, @@ -102,33 +71,40 @@ impl SagittariusFlowClient for SagittariusFlowClientBase { response: FlowResponse, flow_service: Arc>, ) { - match response.r#type { - INSERT => { - let flow = response.updated_flow; - if flow.is_none() { - info!("Received insert request without any flow"); - return; - } - - { - let mut flow_service = flow_service.lock().await; - flow_service.insert_flow(flow.unwrap()).await; - } + let data = match response.data { + Some(data) => data, + None => { + debug!("Received a FlowLogonResponse but no FlowLogonResponse"); + return; } - DELETE => { - let flow_id = response.deleted_flow_id; - if flow_id.is_none() { - info!("Received delete request without any flow"); - return; - } + }; - { - let mut flow_service = flow_service.lock().await; - flow_service.delete_flow(flow_id.unwrap()).await; - } + match data { + // Will delete the flow id it receives + Data::DeletedFlowId(id) => { + let mut flow_service = flow_service.lock().await; + flow_service.delete_flow(id).await; + } + //Will update the flow it receives + Data::UpdatedFlow(flow) => { + let mut flow_service = flow_service.lock().await; + flow_service.insert_flow(flow).await; } - _ => { - error!("Received unknown response type") + //WIll drop all flows that it holds and insert all new ones + Data::Flows(flows) => { + let mut flow_service = flow_service.lock().await; + let result_ids = flow_service.get_all_flow_ids().await; + + let ids = match result_ids { + Ok(ids) => ids, + Err(err) => { + error!("Service wasn't able to get ids {}", err); + return; + } + }; + + flow_service.delete_flows(ids).await; + flow_service.insert_flows(flows.flows).await; } } } @@ -149,309 +125,5 @@ impl SagittariusFlowClient for SagittariusFlowClientBase { #[cfg(test)] mod tests { - use crate::client::sagittarius::action_client::{ - SagittariusActionClient, SagittariusActionClientBase, - }; - use crate::client::sagittarius::flow_client::{ - SagittariusFlowClient, SagittariusFlowClientBase, - }; - use aquila_container::setup_redis_test_container; - use aquila_store::{FlowService, FlowServiceBase}; - use async_trait::async_trait; - use std::pin::Pin; - use std::sync::Arc; - use tokio::net::TcpListener; - use tokio::sync::{oneshot, Mutex}; - use tokio::task::JoinHandle; - use tonic::codegen::tokio_stream::wrappers::TcpListenerStream; - use tonic::codegen::tokio_stream::Stream; - use tonic::transport::Server; - use tonic::{Request, Response, Status}; - use tucana::sagittarius::flow_service_server::{ - FlowService as SagittariusFlowService, FlowServiceServer, - }; - use tucana::sagittarius::{ - Flow, FlowGetRequest, FlowGetResponse, FlowLogonRequest, FlowResponse, - }; - - struct MockFlowService { - flow_get_result: FlowGetResponse, - } - - #[derive(Default)] - struct BrokenMockFlowService; - - impl MockFlowService { - pub fn new(flow_get_result: FlowGetResponse) -> Self { - MockFlowService { flow_get_result } - } - } - - #[async_trait] - impl SagittariusFlowService for MockFlowService { - async fn get( - &self, - _request: Request, - ) -> Result, Status> { - Ok(Response::new(self.flow_get_result.clone())) - } - - type UpdateStream = Pin> + Send>>; - - async fn update( - &self, - _request: Request, - ) -> Result, Status> { - let flow = Flow { - flow_id: 1, - start_node: None, - definition: None, - }; - - let response_stream = async_stream::try_stream! { - yield FlowResponse { - updated_flow: Some(flow), - deleted_flow_id: None, - r#type: 0, - }; - }; - - Ok(Response::new( - Box::pin(response_stream) as Self::UpdateStream - )) - } - } - - #[async_trait] - impl SagittariusFlowService for BrokenMockFlowService { - async fn get( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::internal("An unhandled error occurred!")) - } - - type UpdateStream = Pin> + Send>>; - - async fn update( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::internal("An unhandled error occurred!")) - } - } - - async fn setup_sagittarius_mock( - flow_get_response: FlowGetResponse, - ) -> (JoinHandle<()>, oneshot::Sender<()>, String) { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let incoming = TcpListenerStream::new(listener); - - let mock_service = MockFlowService::new(flow_get_response); - - let server_handle = tokio::spawn(async move { - Server::builder() - .add_service(FlowServiceServer::new(mock_service)) - .serve_with_incoming_shutdown(incoming, async { - shutdown_rx.await.ok(); - }) - .await - .unwrap(); - }); - - (server_handle, shutdown_tx, format!("http://{}", addr)) - } - - async fn setup_broken_sagittarius_mock() -> (JoinHandle<()>, oneshot::Sender<()>, String) { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let incoming = TcpListenerStream::new(listener); - - let mock_service = BrokenMockFlowService::default(); - - let server_handle = tokio::spawn(async move { - Server::builder() - .add_service(FlowServiceServer::new(mock_service)) - .serve_with_incoming_shutdown(incoming, async { - shutdown_rx.await.ok(); - }) - .await - .unwrap(); - }); - - (server_handle, shutdown_tx, format!("http://{}", addr)) - } - - #[tokio::test] - async fn test_get_flow_insert_successfully() { - let response = FlowGetResponse { - updated_flows: vec![], - deleted_flow_ids: vec![], - }; - - let (connection, _container) = setup_redis_test_container().await; - let (server_handle, shutdown, url) = setup_sagittarius_mock(response).await; - - let redis_client = Arc::new(Mutex::new(Box::new(connection))); - let service = FlowServiceBase::new(redis_client.clone()).await; - let service_arc = Arc::new(Mutex::new(service)); - - let mut client = SagittariusFlowClientBase::new(url.clone(), service_arc.clone()).await; - client.send_start_request().await; - - let data_after = { - let mut current_service = service_arc.lock().await; - current_service.get_all_flow_ids().await - }; - - assert!(data_after.is_ok()); - assert_eq!(data_after.unwrap().len(), 1); - - shutdown.send(()).expect("Failed to send shutdown signal"); - server_handle.await.expect("Failed to await server handle"); - } - - #[tokio::test] - async fn test_delete_flows_empty_list_not_crash() { - let (connection, _container) = setup_redis_test_container().await; - let (_server_handle, shutdown, url) = setup_broken_sagittarius_mock().await; - - let redis_client = Arc::new(Mutex::new(Box::new(connection))); - let service = FlowServiceBase::new(redis_client.clone()).await; - let service_arc = Arc::new(Mutex::new(service)); - - let mut client = SagittariusFlowClientBase::new(url, service_arc.clone()).await; - client.send_start_request().await; - - shutdown.send(()).expect("Failed to send shutdown signal"); - } - - #[tokio::test] - async fn test_get_flows_update_only() { - let response = FlowGetResponse { - updated_flows: vec![ - Flow { - flow_id: 1, - start_node: None, - definition: None, - }, - Flow { - flow_id: 2, - start_node: None, - definition: None, - }, - Flow { - flow_id: 3, - start_node: None, - definition: None, - }, - ], - deleted_flow_ids: vec![], - }; - - let (connection, _container) = setup_redis_test_container().await; - let (server_handle, shutdown, url) = setup_sagittarius_mock(response).await; - - let redis_client = Arc::new(Mutex::new(Box::new(connection))); - let service = FlowServiceBase::new(redis_client.clone()).await; - let service_arc = Arc::new(Mutex::new(service)); - - let mut client = SagittariusFlowClientBase::new(url.clone(), service_arc.clone()).await; - client.send_flow_update_request().await; - - let data_after = { - let mut current_service = service_arc.lock().await; - current_service.get_all_flow_ids().await - }; - - assert!(data_after.is_ok()); - assert_eq!(data_after.unwrap().len(), 3); - - shutdown.send(()).expect("Failed to send shutdown signal"); - server_handle.await.expect("Failed to await server handle"); - } - - #[tokio::test] - async fn test_get_flows_update_and_delete() { - let (connection, _container) = setup_redis_test_container().await; - let redis_client = Arc::new(Mutex::new(Box::new(connection))); - - { - let response = FlowGetResponse { - updated_flows: vec![ - Flow { - flow_id: 1, - start_node: None, - definition: None, - }, - Flow { - flow_id: 2, - start_node: None, - definition: None, - }, - Flow { - flow_id: 3, - start_node: None, - definition: None, - }, - ], - deleted_flow_ids: vec![], - }; - - let (server_handle, shutdown, url) = setup_sagittarius_mock(response).await; - let service = FlowServiceBase::new(redis_client.clone()).await; - let service_arc = Arc::new(Mutex::new(service)); - - let mut client = SagittariusFlowClientBase::new(url.clone(), service_arc.clone()).await; - client.send_flow_update_request().await; - - let data_after = { - let mut current_service = service_arc.lock().await; - current_service.get_all_flow_ids().await - }; - - assert!(data_after.is_ok()); - assert_eq!(data_after.unwrap().len(), 3); - - shutdown.send(()).expect("Failed to send shutdown signal"); - server_handle.await.expect("Failed to await server handle"); - }; - - { - let response = FlowGetResponse { - updated_flows: vec![], - deleted_flow_ids: vec![1, 2], - }; - - let (server_handle, shutdown, url) = setup_sagittarius_mock(response).await; - let service = FlowServiceBase::new(redis_client.clone()).await; - let service_arc = Arc::new(Mutex::new(service)); - - let mut client = SagittariusFlowClientBase::new(url.clone(), service_arc.clone()).await; - client.send_flow_update_request().await; - - let data_after = { - let mut current_service = service_arc.lock().await; - current_service.get_all_flow_ids().await - }; - - assert!(data_after.is_ok()); - assert_eq!(data_after.unwrap().len(), 1); - - shutdown.send(()).expect("Failed to send shutdown signal"); - server_handle.await.expect("Failed to await server handle"); - }; - } - - #[tokio::test] - #[should_panic(expected = "Can't start client")] - async fn test_sagittarius_action_client_new_should_panic() { - let sagittarius_url = "http://127.0.0.1:25565".to_string(); - let _client = SagittariusActionClientBase::new(sagittarius_url).await; - } + //TODO: rewrite tests :( } diff --git a/aquila/src/configuration/mode.rs b/aquila/src/configuration/mode.rs index 5e4b9f9..9e9eea2 100644 --- a/aquila/src/configuration/mode.rs +++ b/aquila/src/configuration/mode.rs @@ -11,7 +11,6 @@ pub enum Mode { STATIC, DYNAMIC, - HYBRID, } impl Mode { @@ -19,7 +18,6 @@ impl Mode { match string.to_lowercase().as_str() { "static" => Mode::STATIC, "dynamic" => Mode::DYNAMIC, - "hybrid" => Mode::HYBRID, _ => Mode::STATIC, } } @@ -29,7 +27,6 @@ impl PartialEq for &Mode { fn eq(&self, other: &Mode) -> bool { match (*self, other) { (Mode::STATIC, Mode::STATIC) => true, - (Mode::HYBRID, Mode::HYBRID) => true, (Mode::DYNAMIC, Mode::DYNAMIC) => true, _ => false } diff --git a/aquila/src/configuration/start_configuration.rs b/aquila/src/configuration/start_configuration.rs index 8c8b2cd..6196698 100644 --- a/aquila/src/configuration/start_configuration.rs +++ b/aquila/src/configuration/start_configuration.rs @@ -3,8 +3,6 @@ use crate::configuration::config::Config; use crate::configuration::mode::Mode; use aquila_store::{FlowService, FlowServiceBase}; use async_trait::async_trait; -use clokwerk::AsyncScheduler; -use clokwerk::Interval::Seconds; use log::{debug, error, info}; use redis::aio::MultiplexedConnection; use serde_json::from_str; @@ -42,7 +40,7 @@ impl StartConfiguration for StartConfigurationBase { } } - /// Function to initialize the connection to `Sagittarius` to receive latest flows. + /// Function to initialize the connection to `Sagittarius` to receive flows. /// /// Behavior: /// If scheduling is disabled a request will be sent once. @@ -52,33 +50,16 @@ impl StartConfiguration for StartConfigurationBase { /// - `Sagittarius` connection buildup fails /// - Redis connection buildup fails async fn init_flows_from_sagittarius(&mut self) { + if &self.config.mode != Mode::DYNAMIC { + return; + } + let flow_service = FlowServiceBase::new(self.connection_arc.clone()).await; let flow_service_arc = Arc::new(Mutex::new(flow_service)); let mut sagittarius_client = SagittariusFlowClientBase::new(self.config.backend_url.clone(), flow_service_arc).await; - if &self.config.mode == Mode::STATIC { - return; - } - - if &self.config.mode == Mode::DYNAMIC { - info!("Receiving flows from sagittarius once"); - sagittarius_client.send_start_request().await; - return; - } - - info!("Receiving flows from sagittarius on a scheduled basis."); - let schedule_interval = self.config.update_schedule_interval; - let mut scheduler = AsyncScheduler::new(); - - scheduler.every(Seconds(schedule_interval)).run(move || { - let local_flow_client = Arc::new(Mutex::new(sagittarius_client.clone())); - - async move { - let mut current_flow_client = local_flow_client.lock().await; - current_flow_client.send_start_request().await - } - }); + sagittarius_client.init_flow_stream().await } /// Function to start `Aquila` from a JSON containing the flows. diff --git a/aquila/src/main.rs b/aquila/src/main.rs index 3ce3bc1..cc67a58 100644 --- a/aquila/src/main.rs +++ b/aquila/src/main.rs @@ -22,7 +22,14 @@ async fn main() { // Redis connection let client = build_connection(config.redis_url.clone()); - let con = client.get_multiplexed_async_connection().await.unwrap(); + + let con = match client.get_multiplexed_async_connection().await { + Ok(con) => con, + Err(err) => { + panic!("Failed to connect to server: {}", err); + } + }; + let connection = Arc::new(Mutex::new(Box::new(con))); // Startup