diff --git a/README.md b/README.md index 0891224..641a255 100644 --- a/README.md +++ b/README.md @@ -45,12 +45,13 @@ Light weight easy to use task manager written by Rust JSON Body的格式如下: { - "id": "唯一编码", + "id": "唯一编码, 任务的唯一编号", "method": "作业执行的类型 GET|POST|EXEC|MAIL_TO", "delay": 0, # 延迟执行的秒数,0为即时触发 "name": "命令名", # GET|POST的时候是请求地址,EXEC为命令名,MAIL_TO为邮件名 "params": "参数", # GET时为QueryString,POST时是JSON字符串,EXEC时为空格隔开的参数,MAIL_TO的时候为专门定义的JSON字符串 "cc": "1 3", # 空格隔开的指定线程编号,这里指定两个线程,表示最多可以并行执行2个任务,留空表示不限制 + "wait": 0 # 可选,大于0表示需要在提交任务后等待执行的结果,数值为等待超时时间 } MAIL_TO 的params的JSON结构 @@ -71,3 +72,9 @@ CRON配置文件格式 cron不支持MAIL_TO +### 获取待执行结果: + +系统提供long pulling获取执行结果的接口,以提供给需要更长执行时间的任务用于获取执行结果:比如压缩视频 + + curl http://127.0.0.1:8000/task_resp/任务编号 + diff --git a/src/main.rs b/src/main.rs index e053f2b..03b5aba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use redis::{AsyncCommands, Commands}; use std::net::SocketAddr; use std::net::IpAddr; use std::net::Ipv4Addr; -use axum::{extract::{Json, State}, Router, routing::post, response::IntoResponse}; +use axum::{extract::{Json, path::Path as PathParam, State}, Router, routing::{post, get}, response::IntoResponse}; use std::{env, thread}; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use runner::{Task, AppConfig}; use dirs; - +use redis::aio::Connection; #[derive(Clone)] @@ -37,7 +37,8 @@ struct WebTask { delay: i32, name: String, params: String, - cc: String + cc: String, + wait: Option } impl WebTask { @@ -60,7 +61,8 @@ impl WebTask { exec_time: 0, retry: 0, cc, - error: "".to_string() + error: "".to_string(), + waiting_resp: if let Some(w) = self.wait { w }else { 0usize } } } } @@ -297,6 +299,8 @@ async fn main() { if !if_err.is_empty() { task.error = if_err.to_string(); println!("执行错误:{}", task.error); + + redis_connection.rpush::(format!("resp:{}", task.id.clone()), if_err).expect("返回执行结果错误"); if let Ok(save_payload) = serde_json::to_string(&task) { redis_connection.set::(task.id.clone(), save_payload).expect("回存变更错误"); } @@ -421,6 +425,7 @@ async fn main() { }; let app = Router::new() .route("/task_in_queue", post(handler)) + .route("/task_resp/:key", get(waiting)) .with_state(app_state); println!("server will start at 0.0.0.0:{}", port); @@ -459,12 +464,40 @@ pub async fn handler(State(mut state): State, let delay_key = format!("{} {}", task.id, now_ts + web_task.delay as i64); conn.sadd::(TASK_DELAY.to_string(), delay_key.clone()).await.expect("set list error"); } - Json(serde_json::json!({"result":"OK", "reason": ""})) + if let Some(is_wait) = web_task.wait { + if is_wait > 0usize { + let resp = waiting_for_result(&mut conn, task.id.clone(), is_wait).await; + Json(resp) + }else{ + Json(serde_json::json!({"result":"OK", "reason": ""})) + } + }else{ + Json(serde_json::json!({"result":"OK", "reason": ""})) + } }else{ Json(serde_json::json!({"result":"Fail", "reason": format!("非法的请求:{}", payload.clone())})) } } +async fn waiting(State(mut state): State, PathParam(key): PathParam) -> impl IntoResponse { + let mut conn = state.redis_client.get_async_connection().await.unwrap(); + let resp = waiting_for_result(&mut conn, key, 60).await; + Json(resp) +} + +async fn waiting_for_result(conn: &mut Connection, flag: String, waiting: usize) -> serde_json::Value { + let result_key = format!("resp:{}", flag); + if let Ok(rp) = conn.blpop::(result_key, waiting).await { + if rp == "OK" { + serde_json::json!({"result":"OK", "reason": ""}) + }else{ + serde_json::json!({"result":"Fail", "reason": format!("错误:{}", rp)}) + } + }else{ + serde_json::json!({"result":"Timeout", "reason": "等待超时"}) + } +} + async fn read_crontab_file(config_path: PathBuf, tasks: &mut Vec) -> io::Result<()> { let file = File::open(config_path).await?; let reader = BufReader::new(file); @@ -543,7 +576,8 @@ fn get_tasks_avaliable(tasks: &Vec, avaliable_tasks: &mut Vec) { exec_time:0, retry:0, cc: vec![], - error: "".to_string() + error: "".to_string(), + waiting_resp: 0usize }; avaliable_tasks.insert(0, task); } diff --git a/src/runner.rs b/src/runner.rs index b17b278..630d346 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -35,7 +35,8 @@ pub struct Task { pub exec_time: i64, pub retry: i32, pub cc: Vec, - pub error: String + pub error: String, + pub waiting_resp: usize } impl Task {