Skip to content

Commit

Permalink
ver-0.2.2
Browse files Browse the repository at this point in the history
增加获取系统运行状态的接口
  • Loading branch information
ipconfiger committed Jan 2, 2024
1 parent f93d4ae commit 10b27de
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,10 @@ cron不支持MAIL_TO

curl http://127.0.0.1:8000/task_resp/任务编号

### 获取系统当前状态

通过一个GET请求获取当前运行状态,包括正在执行的任务id列表,等待执行的延迟任务id列表,执行报错任务id列表,以及各个工作线程的执行队列长度信息

curl http://127.0.0.1:8000/sys_info

返回JSON对象,方便和运维工具集成,比如队列过长的时候告警
34 changes: 33 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod runner;

extern crate redis;
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use clap::{App, Arg, ArgMatches};
use redis::{AsyncCommands, Commands};
use std::net::SocketAddr;
Expand Down Expand Up @@ -426,6 +426,7 @@ async fn main() {
let app = Router::new()
.route("/task_in_queue", post(handler))
.route("/task_resp/:key", get(waiting))
.route("/sys_info", get(system_info_handler))
.with_state(app_state);

println!("server will start at 0.0.0.0:{}", port);
Expand Down Expand Up @@ -498,6 +499,37 @@ async fn waiting_for_result(conn: &mut Connection, flag: String, waiting: usize)
}
}

async fn system_info_handler(State(mut state): State<AppState>) -> impl IntoResponse {
let mut conn = state.redis_client.get_async_connection().await.unwrap();
let working_keys = if let Ok(_working_keys) = conn.smembers::<String, Vec<String>>(TASK_WORKING.to_string()).await {
_working_keys
}else{
vec![]
};
let delaying_keys = if let Ok(_delaying_keys) = conn.smembers::<String, Vec<String>>(TASK_DELAY.to_string()).await {
_delaying_keys
}else{
vec![]
};
let wronging_keys = if let Ok(_wronging_keys) = conn.smembers::<String, Vec<String>>(TASK_WORKING.to_string()).await {
_wronging_keys
}else{
vec![]
};
let mut chn_map: HashMap<i32, usize> = HashMap::new();
for chn in 0..state.queue.size {
let ct = state.queue.get_queue_len(chn);
chn_map.insert(chn, ct);
}
let payload = serde_json::json!({
"working_keys": working_keys,
"delaying_keys": delaying_keys,
"wronging_keys": wronging_keys,
"channel_sizes": chn_map
});
Json(payload)
}

async fn read_crontab_file(config_path: PathBuf, tasks: &mut Vec<String>) -> io::Result<()> {
let file = File::open(config_path).await?;
let reader = BufReader::new(file);
Expand Down

0 comments on commit 10b27de

Please sign in to comment.