diff --git a/src/config/api.rs b/src/config/api.rs deleted file mode 100644 index 91b407ea..00000000 --- a/src/config/api.rs +++ /dev/null @@ -1,310 +0,0 @@ -use super::core::{ConfigActor, ConfigCmd, ConfigKey, ConfigResult, ListenerItem, ListenerResult}; -use super::utils; -use crate::common::appdata::AppShareData; -use crate::common::web_utils::get_req_body; -use crate::raft::cluster::model::{DelConfigReq, SetConfigReq}; -use crate::utils::select_option_by_clone; -use chrono::Local; -use std::cmp::max; -use std::cmp::min; -use std::sync::Arc; - -use actix_web::{web, HttpRequest, HttpResponse, Responder}; - -use crate::config::config_type::ConfigType; -use actix::prelude::Addr; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use utils::param_utils; - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ConfigWebParams { - pub data_id: Option, - pub group: Option, - pub tenant: Option, - pub content: Option, -} - -impl ConfigWebParams { - pub fn select_option(&self, o: &Self) -> Self { - Self { - data_id: select_option_by_clone(&self.data_id, &o.data_id), - group: select_option_by_clone(&self.group, &o.group), - tenant: select_option_by_clone(&self.tenant, &o.tenant), - content: select_option_by_clone(&self.content, &o.content), - } - } - - pub fn to_confirmed_param(&self) -> Result { - let mut param = ConfigWebConfirmedParam::default(); - if let Some(v) = self.data_id.as_ref() { - if v.is_empty() { - return Err("dataId is empty".to_owned()); - } - param.data_id = v.to_owned(); - } - param.group = self - .group - .as_ref() - .unwrap_or(&"DEFAULT_GROUP".to_owned()) - .to_owned(); - //param.tenant= self.tenant.as_ref().unwrap_or(&"public".to_owned()).to_owned(); - param.tenant = self.tenant.as_ref().unwrap_or(&"".to_owned()).to_owned(); - if param.tenant == "public" { - param.tenant = "".to_owned(); - } - if let Some(v) = self.content.as_ref() { - if !v.is_empty() { - param.content = v.to_owned(); - } - } - Ok(param) - } -} - -#[derive(Debug, Default, Clone)] -pub struct ConfigWebConfirmedParam { - pub data_id: String, - pub group: String, - pub tenant: String, - pub content: String, -} - -pub(crate) async fn add_config( - a: web::Query, - payload: web::Payload, - appdata: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let selected_param = a.select_option(&b); - match param_utils::check_tenant(&selected_param.tenant) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - } - - match param_utils::check_param( - &selected_param.data_id, - &selected_param.group, - &Some(String::from("datumId")), - &selected_param.content, - ) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - } - - let param = selected_param.to_confirmed_param(); - match param { - Ok(p) => { - let req = SetConfigReq::new( - ConfigKey::new(&p.data_id, &p.group, &p.tenant), - Arc::new(p.content.to_owned()), - ); - match appdata.config_route.set_config(req).await { - Ok(_) => HttpResponse::Ok() - .content_type("text/html; charset=utf-8") - .body("true"), - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub(crate) async fn del_config( - a: web::Query, - payload: web::Payload, - appdata: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - - let selected_param = a.select_option(&b); - match param_utils::check_tenant(&selected_param.tenant) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - } - - match param_utils::check_param( - &selected_param.data_id, - &selected_param.group, - &Some(String::from("datumId")), - &Some(String::from("rm")), - ) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - } - - let param = selected_param.to_confirmed_param(); - match param { - Ok(p) => { - let req = DelConfigReq::new(ConfigKey::new(&p.data_id, &p.group, &p.tenant)); - match appdata.config_route.del_config(req).await { - Ok(_) => HttpResponse::Ok() - .content_type("text/html; charset=utf-8") - .body("true"), - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub(crate) async fn get_config( - a: web::Query, - config_addr: web::Data>, -) -> impl Responder { - let param = a.to_confirmed_param(); - match param { - Ok(p) => { - let cmd = ConfigCmd::GET(ConfigKey::new(&p.data_id, &p.group, &p.tenant)); - match config_addr.send(cmd).await { - Ok(res) => { - let r: ConfigResult = res.unwrap(); - match r { - ConfigResult::Data { - value: v, - md5, - config_type, - .. - } => HttpResponse::Ok() - .content_type( - config_type - .map(|v| ConfigType::new_by_value(&v)) - .unwrap_or_default() - .get_media_type(), - ) - .insert_header(("content-md5", md5.as_ref().to_string())) - .body(v.as_ref().as_bytes().to_vec()), - _ => HttpResponse::NotFound().body("config data not exist"), - } - } - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -#[derive(Serialize, Deserialize)] -pub struct ListenerParams { - #[serde(rename(serialize = "Listening-Configs", deserialize = "Listening-Configs"))] - configs: Option, -} - -impl ListenerParams { - pub fn select_option(&self, o: &Self) -> Self { - Self { - configs: select_option_by_clone(&self.configs, &o.configs), - } - } - - pub fn to_items(&self) -> Vec { - let config = self.configs.as_ref().unwrap_or(&"".to_owned()).to_owned(); - ListenerItem::decode_listener_items(&config) - } -} - -async fn listener_config( - _req: HttpRequest, - a: web::Query, - payload: web::Payload, - config_addr: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let list = a.select_option(&b).to_items(); - if list.is_empty() { - //println!("listener_config error: listener item len == 0"); - return HttpResponse::NoContent() - .content_type("text/html; charset=utf-8") - .body("error:listener empty"); - } - let (tx, rx) = tokio::sync::oneshot::channel(); - let current_time = Local::now().timestamp_millis(); - let mut time_out = 0; - if let Some(_timeout) = _req.headers().get("Long-Pulling-Timeout") { - match _timeout.to_str().unwrap().parse::() { - Ok(v) => { - time_out = current_time + min(max(10000, v), 120000) - 500; - } - Err(_) => { - time_out = 0; - } - } - } - //println!("timeout header:{:?},time_out:{}",_req.headers().get("Long-Pulling-Timeout") ,time_out); - let cmd = ConfigCmd::LISTENER(list, tx, time_out); - let _ = config_addr.send(cmd).await; - let res = rx.await.unwrap(); - let v = match res { - ListenerResult::DATA(list) => { - let mut data = "".to_string(); - for item in list { - data += &item.build_key(); - data += "\x01"; - } - let mut tmp_param = HashMap::new(); - tmp_param.insert("_", data); - let t = serde_urlencoded::to_string(&tmp_param).unwrap(); - t[2..t.len()].to_owned() + "\n" - } - ListenerResult::NULL => "".to_owned(), - }; - HttpResponse::Ok() - .content_type("text/html; charset=utf-8") - .body(v) -} - -pub fn app_config(config: &mut web::ServiceConfig) { - config.service( - web::scope("/nacos/v1/cs") - .service( - web::resource("/configs") - .route(web::get().to(get_config)) - .route(web::post().to(add_config)) - .route(web::put().to(add_config)) - .route(web::delete().to(del_config)), - ) - .service(web::resource("/configs/listener").route(web::post().to(listener_config))), - ); -} diff --git a/src/naming/api.rs b/src/naming/api.rs deleted file mode 100644 index e61f96a2..00000000 --- a/src/naming/api.rs +++ /dev/null @@ -1,688 +0,0 @@ -#![allow(unused_imports, unused_assignments, unused_variables)] -use crate::common::appdata::AppShareData; -use crate::common::web_utils::get_req_body; - -use super::super::utils::{get_bool_from_string, select_option_by_clone}; -use super::api_model::{InstanceVO, QueryListResult, ServiceInfoParam}; -use super::core::{NamingActor, NamingCmd, NamingResult}; -use super::model::{Instance, InstanceUpdateTag, ServiceKey}; -use super::ops::ops_api::query_opt_service_list; -use super::{ - NamingUtils, CLIENT_BEAT_INTERVAL_KEY, LIGHT_BEAT_ENABLED_KEY, RESPONSE_CODE_KEY, - RESPONSE_CODE_OK, -}; - -use actix_web::{ - http::header, middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer, - Responder, -}; - -use actix::prelude::*; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InstanceWebParams { - pub ip: Option, - pub port: Option, - pub namespace_id: Option, - pub weight: Option, - pub enabled: Option, - pub healthy: Option, - pub ephemeral: Option, - pub metadata: Option, - pub cluster_name: Option, - pub service_name: Option, - pub group_name: Option, -} - -impl InstanceWebParams { - fn select_option(&self, o: &Self) -> Self { - Self { - ip: select_option_by_clone(&self.ip, &o.ip), - port: select_option_by_clone(&self.port, &o.port), - namespace_id: select_option_by_clone(&self.namespace_id, &o.namespace_id), - weight: select_option_by_clone(&self.weight, &o.weight), - enabled: select_option_by_clone(&self.enabled, &o.enabled), - healthy: select_option_by_clone(&self.healthy, &o.healthy), - ephemeral: select_option_by_clone(&self.ephemeral, &o.ephemeral), - metadata: select_option_by_clone(&self.metadata, &o.metadata), - cluster_name: select_option_by_clone(&self.cluster_name, &o.cluster_name), - service_name: select_option_by_clone(&self.service_name, &o.service_name), - group_name: select_option_by_clone(&self.group_name, &o.group_name), - } - } - - fn convert_to_instance(self) -> Result { - let mut instance = Instance { - ip: Arc::new(self.ip.unwrap()), - port: self.port.unwrap(), - weight: self.weight.unwrap_or(1f32), - enabled: get_bool_from_string(&self.enabled, true), - healthy: true, - ephemeral: get_bool_from_string(&self.ephemeral, true), - cluster_name: NamingUtils::default_cluster( - self.cluster_name - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - ), - namespace_id: Arc::new(NamingUtils::default_namespace( - self.namespace_id - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - )), - ..Default::default() - }; - - let grouped_name = self.service_name.unwrap(); - if let Some((group_name, service_name)) = - NamingUtils::split_group_and_serivce_name(&grouped_name) - { - instance.service_name = Arc::new(service_name); - instance.group_name = Arc::new(group_name); - } else { - return Err("serivceName is unvaild!".to_owned()); - } - if let Some(group_name) = self.group_name { - if !group_name.is_empty() { - instance.group_name = Arc::new(group_name); - } - } - let metadata_str = self - .metadata - .as_ref() - .unwrap_or(&"{}".to_owned()) - .to_owned(); - if let Ok(metadata) = serde_json::from_str::>(&metadata_str) { - instance.metadata = Arc::new(metadata); - }; - instance.generate_key(); - Ok(instance) - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InstanceWebQueryListParams { - pub namespace_id: Option, - pub service_name: Option, - pub group_name: Option, - pub clusters: Option, - pub healthy_only: Option, - #[serde(rename = "clientIP")] - pub client_ip: Option, - pub udp_port: Option, -} - -impl InstanceWebQueryListParams { - fn to_clusters_key(&self) -> Result<(ServiceKey, String), String> { - let mut service_name = "".to_owned(); - let mut group_name = "".to_owned(); - let grouped_name = self.service_name.as_ref().unwrap().to_owned(); - if let Some((_group_name, _service_name)) = - NamingUtils::split_group_and_serivce_name(&grouped_name) - { - service_name = _service_name; - group_name = _group_name; - } else { - return Err("serivceName is unvaild!".to_owned()); - } - if let Some(_group_name) = self.group_name.as_ref() { - if !_group_name.is_empty() { - group_name = _group_name.to_owned(); - } - } - let namespace_id = NamingUtils::default_namespace( - self.namespace_id - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - ); - let key = ServiceKey::new(&namespace_id, &group_name, &service_name); - - /* - let mut clusters = vec![]; - if let Some(cluster_str) = self.clusters.as_ref() { - clusters = cluster_str.split(",").into_iter() - .filter(|e|{e.len()>0}).map(|e|{e.to_owned()}).collect::>(); - } - */ - Ok(( - key, - self.clusters.as_ref().unwrap_or(&"".to_owned()).to_owned(), - )) - } - - fn get_addr(&self) -> Option { - if let Some(port) = &self.udp_port { - if *port == 0u16 { - return None; - } - if let Some(ip_str) = &self.client_ip { - if let Ok(ip) = ip_str.parse() { - return Some(SocketAddr::new(ip, *port)); - } - } - } - None - } -} - -#[derive(Debug, Serialize, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct BeatRequest { - pub namespace_id: Option, - pub service_name: Option, - pub cluster_name: Option, - pub group_name: Option, - pub ephemeral: Option, - pub beat: Option, -} - -impl BeatRequest { - fn select_option(&self, o: &Self) -> Self { - Self { - namespace_id: select_option_by_clone(&self.namespace_id, &o.namespace_id), - cluster_name: select_option_by_clone(&self.cluster_name, &o.cluster_name), - service_name: select_option_by_clone(&self.service_name, &o.service_name), - group_name: select_option_by_clone(&self.group_name, &o.group_name), - ephemeral: select_option_by_clone(&self.ephemeral, &o.ephemeral), - beat: select_option_by_clone(&self.beat, &o.beat), - } - } - - pub fn convert_to_instance(self) -> Result { - let beat = self.beat.unwrap_or_default(); - let beat_info = match serde_json::from_str::(&beat) { - Ok(v) => v, - Err(err) => { - return Err(err.to_string()); - } - }; - let service_name_option = beat_info.service_name.clone(); - let mut instance = beat_info.convert_to_instance(); - if service_name_option.is_none() { - let grouped_name = self.service_name.unwrap(); - if let Some((group_name, service_name)) = - NamingUtils::split_group_and_serivce_name(&grouped_name) - { - instance.service_name = Arc::new(service_name); - instance.group_name = Arc::new(group_name); - } - if let Some(group_name) = self.group_name.as_ref() { - if !group_name.is_empty() { - instance.group_name = Arc::new(group_name.to_owned()); - } - } - } - instance.ephemeral = get_bool_from_string(&self.ephemeral, true); - instance.cluster_name = NamingUtils::default_cluster( - self.cluster_name - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - ); - instance.namespace_id = Arc::new(NamingUtils::default_namespace( - self.namespace_id - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - )); - instance.generate_key(); - Ok(instance) - } -} - -#[derive(Debug, Serialize, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct BeatInfo { - pub cluster: Option, - pub ip: Option, - pub port: Option, - pub metadata: Option>, - pub period: Option, - pub scheduled: Option, - pub service_name: Option, - pub stopped: Option, - pub weight: Option, -} - -impl BeatInfo { - pub fn convert_to_instance(self) -> Instance { - let mut instance = Instance { - ip: Arc::new(self.ip.unwrap()), - port: self.port.unwrap(), - cluster_name: NamingUtils::default_cluster( - self.cluster.as_ref().unwrap_or(&"".to_owned()).to_owned(), - ), - ..Default::default() - }; - let grouped_name = self.service_name.as_ref().unwrap().to_owned(); - if let Some((group_name, service_name)) = - NamingUtils::split_group_and_serivce_name(&grouped_name) - { - instance.service_name = Arc::new(service_name); - instance.group_name = Arc::new(group_name); - } - if let Some(metadata) = self.metadata { - instance.metadata = Arc::new(metadata); - } - //instance.generate_key(); - instance - } -} - -#[derive(Debug, Serialize, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct ServiceQueryListRequest { - pub page_no: Option, - pub page_size: Option, - pub namespace_id: Option, - pub group_name: Option, - pub service_name: Option, -} - -#[derive(Debug, Serialize, Deserialize, Default)] -pub struct ServiceQueryListResponce { - pub count: usize, - pub doms: Vec>, -} - -pub async fn get_instance( - param: web::Query, - naming_addr: web::Data>, -) -> impl Responder { - let instance = param.0.convert_to_instance(); - match instance { - Ok(instance) => match naming_addr.send(NamingCmd::Query(instance)).await { - Ok(res) => { - let result: NamingResult = res.unwrap(); - match result { - NamingResult::Instance(v) => { - let vo = InstanceVO::from_instance(&v); - HttpResponse::Ok() - .insert_header(header::ContentType(mime::APPLICATION_JSON)) - .body(serde_json::to_string(&vo).unwrap()) - } - _ => HttpResponse::InternalServerError().body("error"), - } - } - Err(_) => HttpResponse::InternalServerError().body("error"), - }, - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub async fn get_instance_list( - param: web::Query, - naming_addr: web::Data>, -) -> impl Responder { - let only_healthy = param.healthy_only.unwrap_or(true); - let addr = param.get_addr(); - match param.to_clusters_key() { - Ok((key, clusters)) => { - match naming_addr - .send(NamingCmd::QueryListString( - key.clone(), - clusters, - only_healthy, - addr, - )) - .await - { - Ok(res) => { - let result: NamingResult = res.unwrap(); - match result { - NamingResult::InstanceListString(v) => HttpResponse::Ok().body(v), - _ => HttpResponse::InternalServerError().body("error"), - } - } - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } - } - Err(err) => HttpResponse::InternalServerError().body(err), - } -} - -pub async fn add_instance( - a: web::Query, - payload: web::Payload, - appdata: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let param = a.select_option(&b); - let update_tag = InstanceUpdateTag { - weight: match ¶m.weight { - Some(v) => *v != 1.0f32, - None => false, - }, - metadata: match ¶m.metadata { - Some(v) => !v.is_empty(), - None => false, - }, - enabled: false, - ephemeral: false, - from_update: false, - }; - let instance = param.convert_to_instance(); - match instance { - Ok(instance) => { - if !instance.check_vaild() { - HttpResponse::InternalServerError().body("instance check is invalid") - } else { - match appdata.naming_route.update_instance(instance, None).await { - Ok(_) => HttpResponse::Ok().body("ok"), - Err(e) => HttpResponse::InternalServerError().body(e.to_string()), - } - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub async fn update_instance( - a: web::Query, - payload: web::Payload, - appdata: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let param = a.select_option(&b); - let update_tag = InstanceUpdateTag { - weight: match ¶m.weight { - Some(v) => *v != 1.0f32, - None => false, - }, - metadata: match ¶m.metadata { - //Some(v) => !v.is_empty() && v!="{}", - Some(v) => !v.is_empty(), - None => false, - }, - enabled: match ¶m.enabled { - Some(v) => true, - None => false, - }, - ephemeral: match ¶m.ephemeral { - Some(v) => true, - None => false, - }, - from_update: true, - }; - let instance = param.convert_to_instance(); - match instance { - Ok(instance) => { - if !instance.check_vaild() { - HttpResponse::InternalServerError().body("instance check is invalid") - } else { - match appdata - .naming_route - .update_instance(instance, Some(update_tag)) - .await - { - Ok(_) => HttpResponse::Ok().body("ok"), - Err(e) => HttpResponse::InternalServerError().body(e.to_string()), - } - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub async fn del_instance( - a: web::Query, - payload: web::Payload, - appdata: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let param = a.select_option(&b); - let instance = param.convert_to_instance(); - match instance { - Ok(instance) => { - if !instance.check_vaild() { - HttpResponse::InternalServerError().body("console: instance check is invalid") - } else { - match appdata.naming_route.delete_instance(instance).await { - Ok(_) => HttpResponse::Ok().body("ok"), - Err(e) => HttpResponse::InternalServerError().body(e.to_string()), - } - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub async fn beat_instance( - a: web::Query, - payload: web::Payload, - appdata: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let param = a.select_option(&b); - //debug - //log::info!("beat request param:{}",serde_json::to_string(¶m).unwrap()); - let instance = param.convert_to_instance(); - match instance { - Ok(instance) => { - if !instance.check_vaild() { - HttpResponse::InternalServerError().body("instance check is invalid") - } else { - let tag = InstanceUpdateTag { - weight: false, - enabled: false, - ephemeral: false, - metadata: false, - from_update: false, - }; - match appdata - .naming_route - .update_instance(instance, Some(tag)) - .await - { - Ok(_) => { - let mut result = HashMap::new(); - result.insert(RESPONSE_CODE_KEY, serde_json::json!(RESPONSE_CODE_OK)); - result.insert(CLIENT_BEAT_INTERVAL_KEY, serde_json::json!(5000)); - //result.insert(LIGHT_BEAT_ENABLED_KEY, serde_json::json!(false)); - let v = serde_json::to_string(&result).unwrap(); - HttpResponse::Ok() - .insert_header(header::ContentType(mime::APPLICATION_JSON)) - .body(v) - } - Err(e) => HttpResponse::InternalServerError().body(e.to_string()), - } - } - } - Err(e) => HttpResponse::InternalServerError().body(e), - } -} - -pub async fn query_service( - param: web::Query, - naming_addr: web::Data>, -) -> impl Responder { - HttpResponse::InternalServerError().body("error,not suport at present") -} - -pub async fn update_service( - a: web::Query, - payload: web::Payload, - naming_addr: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let param = ServiceInfoParam::merge_value(a.0, b); - match param.build_service_info() { - Ok(service_info) => { - let _ = naming_addr - .send(NamingCmd::UpdateService(service_info)) - .await; - HttpResponse::Ok().body("ok") - } - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } -} - -pub async fn remove_service( - a: web::Query, - payload: web::Payload, - naming_addr: web::Data>, -) -> impl Responder { - let body = match get_req_body(payload).await { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let b = match serde_urlencoded::from_bytes(&body) { - Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } - }; - let param = ServiceInfoParam::merge_value(a.0, b); - match param.build_service_info() { - Ok(service_info) => { - let key = service_info.to_service_key(); - match naming_addr.send(NamingCmd::RemoveService(key)).await { - Ok(res) => { - let res: anyhow::Result = res; - match res { - Ok(_) => HttpResponse::Ok().body("ok"), - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } - } - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } - } - Err(err) => HttpResponse::InternalServerError().body(err.to_string()), - } -} - -pub async fn query_service_list( - param: web::Query, - naming_addr: web::Data>, -) -> impl Responder { - let page_size = param.page_size.unwrap_or(0x7fffffff); - let page_index = param.page_no.unwrap_or(1); - let namespace_id = NamingUtils::default_namespace( - param - .namespace_id - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - ); - let group = NamingUtils::default_group( - param - .group_name - .as_ref() - .unwrap_or(&"".to_owned()) - .to_owned(), - ); - let key = ServiceKey::new(&namespace_id, &group, ""); - match naming_addr - .send(NamingCmd::QueryServicePage(key, page_size, page_index)) - .await - { - Ok(res) => { - let result: NamingResult = res.unwrap(); - match result { - NamingResult::ServicePage((c, v)) => { - let resp = ServiceQueryListResponce { count: c, doms: v }; - HttpResponse::Ok().body(serde_json::to_string(&resp).unwrap()) - } - _ => HttpResponse::InternalServerError().body("error"), - } - } - Err(_) => HttpResponse::InternalServerError().body("error"), - } -} - -pub(crate) async fn mock_operator_metrics() -> impl Responder { - "{\"status\":\"UP\"}" -} - -pub fn app_config(config: &mut web::ServiceConfig) { - config.service( - web::scope("/nacos/v1/ns") - .service( - web::resource("/instance") - .route(web::get().to(get_instance)) - .route(web::post().to(add_instance)) - .route(web::put().to(update_instance)) - .route(web::delete().to(del_instance)), - ) - .service(web::resource("/instance/beat").route(web::put().to(beat_instance))) - .service(web::resource("/instance/list").route(web::get().to(get_instance_list))) - .service( - web::resource("/service") - .route(web::post().to(update_service)) - .route(web::put().to(update_service)) - .route(web::delete().to(remove_service)) - .route(web::get().to(query_service)), - ) - .service(web::resource("/service/list").route(web::get().to(query_service_list))) - //ops - .service(web::resource("/operator/metrics").route(web::get().to(mock_operator_metrics))) - .service( - web::resource("/catalog/services").route(web::get().to(query_opt_service_list)), - ), - ); -}