Skip to content

workflow exec and maintain

qifeng dai edited this page Oct 11, 2017 · 1 revision

工作流执行,主要目标是完成 "工作流执行、结果查询、日志查询等功能"。

1.执行工作流

说明:手工执行一个工作流包括 "直接运行、补数据" 两种情况,对项目 "有执行权限的用户和项目所有者" 可以发起工作流的执行。这个工作流必须是配置过的。

POST     /executors
Parameters: projectName={projectName}&workflowName={workflowName}&schedule={schedule}&execType={execType}&nodeName={nodeName}&nodeDep={nodeDep}&notifyType={notifyType}&failurePolicy={failurePolicy}&notifyMails={notifyMails}&timeout={timeout}

Response:
Status: 201 Created
{
  "execIds": [1379, 1378, 1377]
}

参数说明:

参数 类型 是否必选 描述 说明
projectName string 项目名称
workflowName string 工作流名称
schedule jsonObject 调度周期信息 补数据场景使用,见: [[调度参数说明
execType enum 运行类型 DIRECT-直接运行, COMPLEMENT_DATA-补数据
failurePolicy enum 失败之后的策略 END 停止所有的 node 运行,CONTINUE-尽量执行,默认为 END
nodeName string 节点名称 表示执行的节点名称, 为空表示执行所有的工作流
nodeDep enum 节点依赖类型 NODE_ONLY-仅节点,NODE_POST-节点及后续,NODE_PRE-节点及前续,默认是 NODE_ONLY
notifyType enum 报警类型
notifyMails jsonArray 报警邮箱列表
timeout int 超时时间 单位: 秒

返回说明:

参数 类型 是否必选 描述 说明
execIds jsonArray 执行 ID,可能是多个 补数据模式下不会返回ID,补数据是异步操作,暂时不能获取到所有的ID

2.直接执行工作流

说明:直接执行一个工作流,运行的时候,会提交工作流的信息,如果工作流名称已经存在,则会报错,必须是不存在的。对项目 "有执行权限的用户和项目所有者" 可以发起工作流的运行。需要特别注意的是,这种方式执行的工作流,后面是没有办法对其进行修改的,另外其日志在超过 24 小时后不保证有效,另外要注意的是这里不支持长任务

POST     /executors/direct
Content-Type 使用:multipart/form-data
Parameters: projectName={projectName}&workflowName={workflowName}&proxyUser={proxyUser}&queue={queue}&data={data}&file={zipfile}&failurePolicy={failurePolicy}&notifyType={notifyType}&notifyMails={notifyMails}&timeout={timeout}&extras={extras}

Response:
Status: 201 Created
{
  "execId": 1379
}

注意,data 和 file 必须至少存在一个,都存在则会 file 有效

3.查询任务运行情况

说明:查询任务的运行情况,所属项目 "有执行权限的用户和项目所有者可以查询"。

GET  /executors?
Parameters: startDate={startDate}&endDate={endDate}&projectName={projectName}&workflowName={workflowName}&status={status}&from={from}&size={size}

Response:
Status: 200 OK
{
  "total": 123,
  "length": 10,
  "executions": [
    {
      "execId": 111,
      "projectName": "the name of project",
      "workflowName": "the name of workflow",
      "execType": "DIRECT",
      "submitTime": 1468391778000,
      "startTime": 1468391778000,
      "endTime": 1468391778000,
      "scheduleTime": 1468391778000,
      "duration": 654,
      "submitUser": "the user of submit workflow",
      "proxyUser": "bdi",
      "queue": "myqueue",
      "status": [2,3],
      "owner": "the owner of the workflow",
      "extras": {...}
    }
  ]
}

参数说明:

参数 类型 是否必选 描述 说明
startDate long 起始时间 按照任务的提交时间计算
endDate long 结束时间 按照任务的提交时间计算
projectName string 项目名称
workflowName jsonArray<string> 工作流名称数组 注意这里采用的是模糊后缀匹配
status jsonArray<int> 默认全部查询
from int 起始记录行 默认为 0
size int 请求记录数 默认为 100, 有效值 (0,1000]

返回说明:

参数 类型 是否必选 描述 说明
total int 记录总数
length int 当前记录数
executions jsonArray 具体的执行结果,是一个 jsonArray 注意返回结果根据 startTime 倒序排序

executions 说明:

参数 类型 是否必选 描述 说明
execId int 执行的 id
projectName string 项目名称
workflowName string 工作流名称
execType enum 执行类型
submitTime long 提交的起始时间
startTime long 执行的起始时间
endTime long 执行的结束时间
duration int 运行时长(单位为: 秒)
submitUser string 提交用户名称
scheduleTime long 调度时间
proxyUser string 执行的代理用户
queue string 执行的队列
status int 运行状态
owner string 工作流的 owner
extras jsonObject 扩展信息 工作流上传的扩展信息

4.查询具体某个任务的运行情况

说明:查询任务的运行情况,所属项目 "有执行权限的用户和项目所有者可以查询"。

GET  /executors/:execId

Response:
Status: 200 OK
[
  {
    "execId": 111,
    "projectName": "the name of project",
    "workflowName": "the name of workflow",
    "execType": "DIRECT",
    "submitTime": 1468391778000,
    "startTime": 1468391778000,
    "endTime": 1468391778000,
    "scheduleTime": 1468391778000,
    "duration": 654,
    "submitUser": "the user of submit workflow",
    "proxyUser": "bdi",
    "queue": "myqueue",
    "status": 2,
    "owner": "the owner of the workflow",
    "data": {
        "nodes": [{
            "name": "node1",
            "desc": "run mapreduce task",
            "type": "MR",
            "startTime": 1468391778000, 
            "endTime": 1468391791000, 
            "parameter": {...},
            "duration": 13, 
            "status": 1, 
            "appLinks": ["http://...", "http://..."],
            "jobId": "mr_12345",
            "dep": ["nodex", "nodey"],
            "extras": {...}
          }
        ],
        "userDefParams": [{
            "prop": "year",
            "value": "$[yyyy]"
          }]
    },
    "extras": {...}
  }
]

data/nodes 说明:

参数 类型 是否必选 描述 说明
name string 节点名称
type string 节点的 job 类型 参见: [[任务描述
startTime long 起始时间
endTime long 结束时间
duration int 持续时间
status int 节点运行状态
appLinks jsonArray<string> 对应 hadoop/spark 的日志链接
jobId string 日志 id 用于查询日志
parameter jsonObject 节点详细参数 节点详细参数
extras jsonObject 扩展信息 节点上传的扩展信息

5.查询日志信息

说明:查询节点的运行日志,所属项目 "有执行权限的用户和项目所有者可以查询"。

GET /executors/:jobId/logs
Parameters: from={from}&size={size}&query={query-string}

Response:
Status: 200 OK
{
  "total": 121,
  "length": 10,
  "took": 15,
  "content": ["[INFO]...", "[ERROR]..."]
}

请求参数说明:

参数 类型 是否必选 描述 说明
from int 起始记录行(注意:不是页行) 默认 0, 有效值 (0, max-int]
size int 请求多少个日志 默认是:100, 有效值 (0, 1000]
query string 查询信息

返回参数说明:

参数 类型 是否必选 描述 说明
total int 总的日志数,行数
length int 日志长度
took int 查询时长,单位(ms),可以忽略
content jsonArray<string> 日志信息,每条记录是一个字符串

6.停止运行

说明:停止某个正在运行的任务,所属项目 "有执行权限的用户和项目所有者可以 kill 正在运行的任务"。

POST    /executors/:execId/kill

Response:
Status: 201 Created

任务状态说明

  • 0: 表示初始化
  • 1: 依赖任务中
  • 2: 依赖资源中
  • 3: 正在运行
  • 4: 运行成功
  • 5: 被 kill
  • 6: 运行失败
  • 7: 依赖失败
  • 8: 任务暂停

运行方式说明

  • DIRECT: 直接运行
  • SCHEDULER: 调度运行
  • COMPLEMENT_DATA: 补数据