Skip to content

Commit

Permalink
ref: moved aquila into its own workspace module
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael-goetz committed Dec 19, 2024
1 parent 1c01050 commit 93c9472
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 65 deletions.
12 changes: 6 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!src/main/**/build/
!src/test/**/build/
!aquila/src/main/**/build/
!aquila/src/test/**/build/

### IntelliJ IDEA ###
.idea/modules.xml
Expand All @@ -13,8 +13,8 @@ build/
*.iml
*.ipr
out/
!src/main/**/out/
!src/test/**/out/
!aquila/src/main/**/out/
!aquila/src/test/**/out/

### Eclipse ###
.apt_generated
Expand All @@ -25,8 +25,8 @@ out/
.springBeans
.sts4-cache
bin/
!src/main/**/bin/
!src/test/**/bin/
!aquila/src/main/**/bin/
!aquila/src/test/**/bin/

### NetBeans ###
/nbproject/private/
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::service::flow_service::{FlowService, FlowServiceBase};
use aquila_store::{FlowService, FlowServiceBase};
use async_trait::async_trait;
use futures::StreamExt;
use log::{error, info};
Expand All @@ -22,7 +22,10 @@ pub struct SagittariusFlowClientBase {
/// Trait representing a service for receiving flows from `Sagittarius`.
#[async_trait]
pub trait SagittariusFlowClient {
async fn new(sagittarius_url: String, flow_service: Arc<Mutex<FlowServiceBase>>) -> SagittariusFlowClientBase;
async fn new(
sagittarius_url: String,
flow_service: Arc<Mutex<FlowServiceBase>>,
) -> SagittariusFlowClientBase;
async fn send_flow_update_request(&mut self);
async fn send_start_request(&mut self);
}
Expand All @@ -35,15 +38,21 @@ impl SagittariusFlowClient for SagittariusFlowClientBase {
///
/// Behavior:
/// Will panic when a connection can`t be established
async fn new(sagittarius_url: String, flow_service: Arc<Mutex<FlowServiceBase>>) -> SagittariusFlowClientBase {
async fn new(
sagittarius_url: String,
flow_service: Arc<Mutex<FlowServiceBase>>,
) -> SagittariusFlowClientBase {
let client = match FlowServiceClient::connect(sagittarius_url).await {
Ok(res) => res,
Err(start_error) => {
panic!("Can't start client {}", start_error);
}
};

SagittariusFlowClientBase { flow_service, client }
SagittariusFlowClientBase {
flow_service,
client,
}
}

/// Will send a request `FlowGetRequest` to `Sagittarius`
Expand Down Expand Up @@ -89,7 +98,10 @@ impl SagittariusFlowClient for SagittariusFlowClientBase {

let mut stream = response.into_inner();

async fn handle_response(response: FlowResponse, flow_service: Arc<Mutex<FlowServiceBase>>) {
async fn handle_response(
response: FlowResponse,
flow_service: Arc<Mutex<FlowServiceBase>>,
) {
match response.r#type {
INSERT => {
let flow = response.updated_flow;
Expand Down Expand Up @@ -137,11 +149,14 @@ impl SagittariusFlowClient for SagittariusFlowClientBase {

#[cfg(test)]
mod tests {
use crate::client::sagittarius::action_client::{SagittariusActionClient, SagittariusActionClientBase};
use crate::client::sagittarius::flow_client::{SagittariusFlowClient, SagittariusFlowClientBase};
use crate::data::redis::setup_redis_test_container;
use crate::service::flow_service::FlowService;
use crate::service::flow_service::FlowServiceBase;
use crate::client::sagittarius::action_client::{
SagittariusActionClient, SagittariusActionClientBase,
};
use crate::client::sagittarius::flow_client::{
SagittariusFlowClient, SagittariusFlowClientBase,
};
use aquila_container::setup_redis_test_container;
use aquila_store::{FlowService, FlowServiceBase};
use async_trait::async_trait;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -152,8 +167,12 @@ mod tests {
use tonic::codegen::tokio_stream::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tucana::sagittarius::flow_service_server::{FlowService as SagittariusFlowService, FlowServiceServer};
use tucana::sagittarius::{Flow, FlowGetRequest, FlowGetResponse, FlowLogonRequest, FlowResponse};
use tucana::sagittarius::flow_service_server::{
FlowService as SagittariusFlowService, FlowServiceServer,
};
use tucana::sagittarius::{
Flow, FlowGetRequest, FlowGetResponse, FlowLogonRequest, FlowResponse,
};

struct MockFlowService {
flow_get_result: FlowGetResponse,
Expand All @@ -170,13 +189,19 @@ mod tests {

#[async_trait]
impl SagittariusFlowService for MockFlowService {
async fn get(&self, _request: Request<FlowGetRequest>) -> Result<Response<FlowGetResponse>, Status> {
async fn get(
&self,
_request: Request<FlowGetRequest>,
) -> Result<Response<FlowGetResponse>, Status> {
Ok(Response::new(self.flow_get_result.clone()))
}

type UpdateStream = Pin<Box<dyn Stream<Item=Result<FlowResponse, Status>> + Send>>;
type UpdateStream = Pin<Box<dyn Stream<Item = Result<FlowResponse, Status>> + Send>>;

async fn update(&self, _request: Request<FlowLogonRequest>) -> Result<Response<Self::UpdateStream>, Status> {
async fn update(
&self,
_request: Request<FlowLogonRequest>,
) -> Result<Response<Self::UpdateStream>, Status> {
let flow = Flow {
flow_id: 1,
start_node: None,
Expand All @@ -191,24 +216,34 @@ mod tests {
};
};

Ok(Response::new(Box::pin(response_stream) as Self::UpdateStream))
Ok(Response::new(
Box::pin(response_stream) as Self::UpdateStream
))
}
}

#[async_trait]
impl SagittariusFlowService for BrokenMockFlowService {
async fn get(&self, _request: Request<FlowGetRequest>) -> Result<Response<FlowGetResponse>, Status> {
async fn get(
&self,
_request: Request<FlowGetRequest>,
) -> Result<Response<FlowGetResponse>, Status> {
Err(Status::internal("An unhandled error occurred!"))
}

type UpdateStream = Pin<Box<dyn Stream<Item=Result<FlowResponse, Status>> + Send>>;
type UpdateStream = Pin<Box<dyn Stream<Item = Result<FlowResponse, Status>> + Send>>;

async fn update(&self, _request: Request<FlowLogonRequest>) -> Result<Response<Self::UpdateStream>, Status> {
async fn update(
&self,
_request: Request<FlowLogonRequest>,
) -> Result<Response<Self::UpdateStream>, Status> {
Err(Status::internal("An unhandled error occurred!"))
}
}

async fn setup_sagittarius_mock(flow_get_response: FlowGetResponse) -> (JoinHandle<()>, oneshot::Sender<()>, String) {
async fn setup_sagittarius_mock(
flow_get_response: FlowGetResponse,
) -> (JoinHandle<()>, oneshot::Sender<()>, String) {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
Expand Down Expand Up @@ -300,9 +335,21 @@ mod tests {
async fn test_get_flows_update_only() {
let response = FlowGetResponse {
updated_flows: vec![
Flow { flow_id: 1, start_node: None, definition: None },
Flow { flow_id: 2, start_node: None, definition: None },
Flow { flow_id: 3, start_node: None, definition: None },
Flow {
flow_id: 1,
start_node: None,
definition: None,
},
Flow {
flow_id: 2,
start_node: None,
definition: None,
},
Flow {
flow_id: 3,
start_node: None,
definition: None,
},
],
deleted_flow_ids: vec![],
};
Expand Down Expand Up @@ -337,9 +384,21 @@ mod tests {
{
let response = FlowGetResponse {
updated_flows: vec![
Flow { flow_id: 1, start_node: None, definition: None },
Flow { flow_id: 2, start_node: None, definition: None },
Flow { flow_id: 3, start_node: None, definition: None },
Flow {
flow_id: 1,
start_node: None,
definition: None,
},
Flow {
flow_id: 2,
start_node: None,
definition: None,
},
Flow {
flow_id: 3,
start_node: None,
definition: None,
},
],
deleted_flow_ids: vec![],
};
Expand Down Expand Up @@ -395,4 +454,4 @@ mod tests {
let sagittarius_url = "http://127.0.0.1:25565".to_string();
let _client = SagittariusActionClientBase::new(sagittarius_url).await;
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::env;
use std::fmt::{Debug, Display};
use std::str::FromStr;
use dotenv::from_filename;
use log::{error, info};
use crate::configuration::environment::Environment;

Expand Down Expand Up @@ -48,7 +49,7 @@ pub struct Config {
/// Searches for the env. file at root level. Filename: `.env`
impl Config {
pub fn new() -> Self {
let result = dotenv::from_filename(".env");
let result = from_filename("../../../.env");
match result {
Ok(_) => info!(".env file loaded successfully"),
Err(e) => error!("Error loading .env file: {}", e),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ impl Environment {
_ => Environment::Development,
}
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::fs::File;
use std::io::Read;
use std::sync::Arc;
use crate::client::sagittarius::flow_client::{SagittariusFlowClient, SagittariusFlowClientBase};
use crate::configuration::config::Config;
use aquila_store::{FlowService, FlowServiceBase};
use async_trait::async_trait;
use clokwerk::{AsyncScheduler};
use clokwerk::AsyncScheduler;
use clokwerk::Interval::Seconds;
use log::{debug, error, info};
use redis::aio::MultiplexedConnection;
use serde_json::from_str;
use std::fs::File;
use std::io::Read;
use std::sync::Arc;
use tokio::sync::Mutex;
use tucana::sagittarius::Flow;
use crate::client::sagittarius::flow_client::{SagittariusFlowClient, SagittariusFlowClientBase};
use crate::configuration::config::Config;
use crate::service::flow_service::{FlowService, FlowServiceBase};

pub struct StartConfigurationBase {
connection_arc: Arc<Mutex<Box<MultiplexedConnection>>>,
Expand All @@ -19,16 +20,25 @@ pub struct StartConfigurationBase {

#[async_trait]
pub trait StartConfiguration {
async fn new(connection_arc: Arc<Mutex<Box<MultiplexedConnection>>>, config: Config) -> StartConfigurationBase;
async fn new(
connection_arc: Arc<Mutex<Box<MultiplexedConnection>>>,
config: Config,
) -> StartConfigurationBase;
async fn init_flows_from_sagittarius(&mut self);
async fn init_flows_from_json(mut self);
}

/// `Aquila's` startup configuration logic.
#[async_trait]
impl StartConfiguration for StartConfigurationBase {
async fn new(connection_arc: Arc<Mutex<Box<MultiplexedConnection>>>, config: Config) -> StartConfigurationBase {
StartConfigurationBase { connection_arc, config }
async fn new(
connection_arc: Arc<Mutex<Box<MultiplexedConnection>>>,
config: Config,
) -> StartConfigurationBase {
StartConfigurationBase {
connection_arc,
config,
}
}

/// Function to initialize the connection to `Sagittarius` to receive latest flows.
Expand All @@ -43,7 +53,8 @@ impl StartConfiguration for StartConfigurationBase {
async fn init_flows_from_sagittarius(&mut self) {
let flow_service = FlowServiceBase::new(self.connection_arc.clone()).await;
let flow_service_arc = Arc::new(Mutex::new(flow_service));
let mut sagittarius_client = SagittariusFlowClientBase::new(self.config.backend_url.clone(), flow_service_arc).await;
let mut sagittarius_client =
SagittariusFlowClientBase::new(self.config.backend_url.clone(), flow_service_arc).await;

if !self.config.enable_scheduled_update {
info!("Receiving flows from sagittarius once");
Expand All @@ -55,16 +66,14 @@ impl StartConfiguration for StartConfigurationBase {
let schedule_interval = self.config.update_schedule_interval;
let mut scheduler = AsyncScheduler::new();

scheduler
.every(Seconds(schedule_interval))
.run(move || {
let local_flow_client = Arc::new(Mutex::new(sagittarius_client.clone()));
scheduler.every(Seconds(schedule_interval)).run(move || {
let local_flow_client = Arc::new(Mutex::new(sagittarius_client.clone()));

async move {
let mut current_flow_client = local_flow_client.lock().await;
current_flow_client.send_start_request().await
}
});
async move {
let mut current_flow_client = local_flow_client.lock().await;
current_flow_client.send_start_request().await
}
});
}

/// Function to start `Aquila` from a JSON containing the flows.
Expand Down Expand Up @@ -105,15 +114,18 @@ impl StartConfiguration for StartConfigurationBase {
}
}

let flows: Vec<Flow> = match serde_json::from_str(&data) {
let flows: Vec<Flow> = match from_str(&data) {
Ok(flows) => flows,
Err(error) => {
error!("Error deserializing json file {}", error);
panic!("There was a problem deserializing the json file: {:?}", error);
panic!(
"There was a problem deserializing the json file: {:?}",
error
);
}
};

info!("Loaded {} Flows!", &flows.len());
flow_service.insert_flows(flows).await;
}
}
}
Loading

0 comments on commit 93c9472

Please sign in to comment.