Skip to content

Commit

Permalink
版本 ver 0.2.1
Browse files Browse the repository at this point in the history
增加了获取执行结果的特性
  • Loading branch information
ipconfiger committed Jan 1, 2024
1 parent 93ddc7f commit f93d4ae
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ Light weight easy to use task manager written by Rust
JSON Body的格式如下:

{
"id": "唯一编码",
"id": "唯一编码, 任务的唯一编号",
"method": "作业执行的类型 GET|POST|EXEC|MAIL_TO",
"delay": 0, # 延迟执行的秒数,0为即时触发
"name": "命令名", # GET|POST的时候是请求地址,EXEC为命令名,MAIL_TO为邮件名
"params": "参数", # GET时为QueryString,POST时是JSON字符串,EXEC时为空格隔开的参数,MAIL_TO的时候为专门定义的JSON字符串
"cc": "1 3", # 空格隔开的指定线程编号,这里指定两个线程,表示最多可以并行执行2个任务,留空表示不限制
"wait": 0 # 可选,大于0表示需要在提交任务后等待执行的结果,数值为等待超时时间
}

MAIL_TO 的params的JSON结构
Expand All @@ -71,3 +72,9 @@ CRON配置文件格式

cron不支持MAIL_TO

### 获取待执行结果:

系统提供long pulling获取执行结果的接口,以提供给需要更长执行时间的任务用于获取执行结果:比如压缩视频

curl http://127.0.0.1:8000/task_resp/任务编号

46 changes: 40 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use redis::{AsyncCommands, Commands};
use std::net::SocketAddr;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use axum::{extract::{Json, State}, Router, routing::post, response::IntoResponse};
use axum::{extract::{Json, path::Path as PathParam, State}, Router, routing::{post, get}, response::IntoResponse};
use std::{env, thread};
use std::path::{Path, PathBuf};
use std::str::FromStr;
Expand All @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use runner::{Task, AppConfig};
use dirs;

use redis::aio::Connection;


#[derive(Clone)]
Expand All @@ -37,7 +37,8 @@ struct WebTask {
delay: i32,
name: String,
params: String,
cc: String
cc: String,
wait: Option<usize>
}

impl WebTask {
Expand All @@ -60,7 +61,8 @@ impl WebTask {
exec_time: 0,
retry: 0,
cc,
error: "".to_string()
error: "".to_string(),
waiting_resp: if let Some(w) = self.wait { w }else { 0usize }
}
}
}
Expand Down Expand Up @@ -297,6 +299,8 @@ async fn main() {
if !if_err.is_empty() {
task.error = if_err.to_string();
println!("执行错误:{}", task.error);

redis_connection.rpush::<String, String, ()>(format!("resp:{}", task.id.clone()), if_err).expect("返回执行结果错误");
if let Ok(save_payload) = serde_json::to_string(&task) {
redis_connection.set::<String, String, ()>(task.id.clone(), save_payload).expect("回存变更错误");
}
Expand Down Expand Up @@ -421,6 +425,7 @@ async fn main() {
};
let app = Router::new()
.route("/task_in_queue", post(handler))
.route("/task_resp/:key", get(waiting))
.with_state(app_state);

println!("server will start at 0.0.0.0:{}", port);
Expand Down Expand Up @@ -459,12 +464,40 @@ pub async fn handler(State(mut state): State<AppState>,
let delay_key = format!("{} {}", task.id, now_ts + web_task.delay as i64);
conn.sadd::<String, String, ()>(TASK_DELAY.to_string(), delay_key.clone()).await.expect("set list error");
}
Json(serde_json::json!({"result":"OK", "reason": ""}))
if let Some(is_wait) = web_task.wait {
if is_wait > 0usize {
let resp = waiting_for_result(&mut conn, task.id.clone(), is_wait).await;
Json(resp)
}else{
Json(serde_json::json!({"result":"OK", "reason": ""}))
}
}else{
Json(serde_json::json!({"result":"OK", "reason": ""}))
}
}else{
Json(serde_json::json!({"result":"Fail", "reason": format!("非法的请求:{}", payload.clone())}))
}
}

async fn waiting(State(mut state): State<AppState>, PathParam(key): PathParam<String>) -> impl IntoResponse {
let mut conn = state.redis_client.get_async_connection().await.unwrap();
let resp = waiting_for_result(&mut conn, key, 60).await;
Json(resp)
}

async fn waiting_for_result(conn: &mut Connection, flag: String, waiting: usize) -> serde_json::Value {
let result_key = format!("resp:{}", flag);
if let Ok(rp) = conn.blpop::<String, String>(result_key, waiting).await {
if rp == "OK" {
serde_json::json!({"result":"OK", "reason": ""})
}else{
serde_json::json!({"result":"Fail", "reason": format!("错误:{}", rp)})
}
}else{
serde_json::json!({"result":"Timeout", "reason": "等待超时"})
}
}

async fn read_crontab_file(config_path: PathBuf, tasks: &mut Vec<String>) -> io::Result<()> {
let file = File::open(config_path).await?;
let reader = BufReader::new(file);
Expand Down Expand Up @@ -543,7 +576,8 @@ fn get_tasks_avaliable(tasks: &Vec<String>, avaliable_tasks: &mut Vec<Task>) {
exec_time:0,
retry:0,
cc: vec![],
error: "".to_string()
error: "".to_string(),
waiting_resp: 0usize
};
avaliable_tasks.insert(0, task);
}
Expand Down
3 changes: 2 additions & 1 deletion src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub struct Task {
pub exec_time: i64,
pub retry: i32,
pub cc: Vec<i32>,
pub error: String
pub error: String,
pub waiting_resp: usize
}

impl Task {
Expand Down

0 comments on commit f93d4ae

Please sign in to comment.