Skip to content

Commit

Permalink
feat: reimplemented flow handling
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael-goetz committed Dec 28, 2024
1 parent 70dc37d commit cc05c92
Showing 1 changed file with 36 additions and 60 deletions.
96 changes: 36 additions & 60 deletions aquila/src/client/sagittarius/flow_client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use aquila_store::{FlowService, FlowServiceBase};
use async_trait::async_trait;
use futures::StreamExt;
use log::{error, info};
use log::{debug, error};
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tonic::Request;
use tucana::sagittarius::flow_response::Data;
use tucana::sagittarius::flow_service_client::FlowServiceClient;
use tucana::sagittarius::{FlowCommandType, FlowGetRequest, FlowLogonRequest, FlowResponse};

const INSERT: i32 = FlowCommandType::Insert as i32;
const DELETE: i32 = FlowCommandType::Delete as i32;
use tucana::sagittarius::{FlowLogonRequest, FlowResponse};

/// Struct representing a service for receiving flows from `Sagittarius`.
#[derive(Clone)]
Expand All @@ -26,8 +24,7 @@ pub trait SagittariusFlowClient {
sagittarius_url: String,
flow_service: Arc<Mutex<FlowServiceBase>>,
) -> SagittariusFlowClientBase;
async fn send_flow_update_request(&mut self);
async fn send_start_request(&mut self);
async fn init_flow_stream(&mut self);
}

/// Implementation for a service for receiving flows from `Sagittarius`.
Expand Down Expand Up @@ -55,38 +52,10 @@ impl SagittariusFlowClient for SagittariusFlowClientBase {
}
}

/// Will send a request `FlowGetRequest` to `Sagittarius`
/// Inserts/Deletes flows contained in the response into Redis
async fn send_flow_update_request(&mut self) {
let mut flow_service = self.flow_service.lock().await;
let flow_ids = match flow_service.get_all_flow_ids().await {
Ok(result) => result,
Err(redis_error) => {
error!("Service wasn't able to get ids {}", redis_error);
return;
}
};

let request = Request::new(FlowGetRequest { flow_ids });

let response = match self.client.get(request).await {
Ok(res) => res.into_inner(),
Err(status) => {
error!("Received a {status}, can't retrieve flows from Sagittarius");
return;
}
};

let update_flows = response.updated_flows;
let deleted_flow_ids = response.deleted_flow_ids;
flow_service.insert_flows(update_flows).await;
flow_service.delete_flows(deleted_flow_ids).await
}

/// Will send a request `FlowLogonRequest` to `Sagittarius`
/// Will establish a stream.
/// `Sagittarius` will send update/delete commands and the flow to do that with.
async fn send_start_request(&mut self) {
async fn init_flow_stream(&mut self) {
let request = Request::new(FlowLogonRequest {});
let response = match self.client.update(request).await {
Ok(res) => res,
Expand All @@ -102,33 +71,40 @@ impl SagittariusFlowClient for SagittariusFlowClientBase {
response: FlowResponse,
flow_service: Arc<Mutex<FlowServiceBase>>,
) {
match response.r#type {
INSERT => {
let flow = response.updated_flow;
if flow.is_none() {
info!("Received insert request without any flow");
return;
}

{
let mut flow_service = flow_service.lock().await;
flow_service.insert_flow(flow.unwrap()).await;
}
let data = match response.data {
Some(data) => data,
None => {
debug!("Received a FlowLogonResponse but no FlowLogonResponse");
return;
}
DELETE => {
let flow_id = response.deleted_flow_id;
if flow_id.is_none() {
info!("Received delete request without any flow");
return;
}
};

{
let mut flow_service = flow_service.lock().await;
flow_service.delete_flow(flow_id.unwrap()).await;
}
match data {
// Will delete the flow id it receives
Data::DeletedFlowId(id) => {
let mut flow_service = flow_service.lock().await;
flow_service.delete_flow(id).await;
}
//Will update the flow it receives
Data::UpdatedFlow(flow) => {
let mut flow_service = flow_service.lock().await;
flow_service.insert_flow(flow).await;
}
_ => {
error!("Received unknown response type")
//WIll drop all flows that it holds and insert all new ones
Data::Flows(flows) => {
let mut flow_service = flow_service.lock().await;
let result_ids = flow_service.get_all_flow_ids().await;

let ids = match result_ids {
Ok(ids) => ids,
Err(err) => {
error!("Service wasn't able to get ids {}", err);
return;
}
};

flow_service.delete_flows(ids).await;
flow_service.insert_flows(flows.flows).await;
}
}
}
Expand Down

0 comments on commit cc05c92

Please sign in to comment.