-
Notifications
You must be signed in to change notification settings - Fork 0
/
OrderedStepFlow
89 lines (72 loc) · 4.07 KB
/
OrderedStepFlow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package flowbot.flows
import java.util.concurrent.TimeUnit
import com.bac.rctt.apps.flowbot.envelope.FlowEnvelope
import com.bac.rctt.apps.flowbot.util.{FormatUtils, Functions}
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
class OrderedStepFlow() extends Flow {
private val uuid: String = "db64f53e-3574-4eea-a280-ec505f20558b"
private val logger = Logger.getLogger(getClass().getName());
def run(sc: SparkContext, hiveContext: org.apache.spark.sql.hive.HiveContext, flowEnvelope: FlowEnvelope) {
logger.debug("step processor loaded conf=" + flowEnvelope.getAttributes())
logger.info("Starting OrderedStepFlow")
flowEnvelope.setAttribute("flow.uuid", uuid)
val stepNames = flowEnvelope.getSteps("flow.steps.step-").keySet
var sortedSteps: Set[String] = collection.immutable.SortedSet[String]() ++ stepNames
logger.info("Validating steps...")
validateSteps()
logger.info("Finished validating steps")
for (stepName <- sortedSteps) {
logger.info("Processing step: " + stepName)
if (flowEnvelope.getAttribute(stepName + ".enabled").toBoolean.equals(false) || flowEnvelope.getAttribute("flow.skipSteps").contains(stepName)) {
logger.info("found DISABLED input step! Skipping " + stepName)
flowEnvelope.setAttribute(stepName + ".skipped", "true")
} else {
flowEnvelope.setAttribute(stepName + ".step_ts_start", Functions.getCurrentMillis())
val className = if(flowEnvelope.hasAttribute(stepName + ".action")){
flowEnvelope.getAttribute(stepName + ".action")
} else {
flowEnvelope.getAttribute(stepName + ".processor")
}
val processor = Class.forName("com.bac.rctt.apps.flowbot.processors." + className).newInstance.asInstanceOf[{ def run(sc: SparkContext,hiveContext: org.apache.spark.sql.hive.HiveContext, flowEnvelope: FlowEnvelope, step: String): FlowEnvelope }]
flowEnvelope.mergeAttributes(processor.run(sc, hiveContext, flowEnvelope, stepName).getAttributes)
flowEnvelope.setAttribute(stepName + ".step_ts_end", Functions.getCurrentMillis())
if (flowEnvelope.hasAttribute(stepName + ".sla")) {
var stepTime = flowEnvelope.getAttribute(stepName + ".step_ts_end").toLong - flowEnvelope.getAttribute(stepName + ".step_ts_start").toLong
var sla = FormatUtils.getTimeDuration(flowEnvelope.getAttribute(stepName + ".sla"), TimeUnit.MILLISECONDS)
flowEnvelope.setAttribute(stepName + ".sla_runtime", FormatUtils.formatHoursMinutesSeconds(stepTime, TimeUnit.MILLISECONDS).toString())
if (stepTime > sla) {
flowEnvelope.setAttribute(stepName + ".sla_result", "missed")
} else {
flowEnvelope.setAttribute(stepName + ".sla_result", "made")
}
}
if(flowEnvelope.getAttribute("flow.status") == "failure"){
throw new Exception("Step Failed: " + stepName)
}
}
}
def validateSteps(): Unit = {
logger.info("Verifying config")
var confGood = true
for (stepName <- flowEnvelope.getSteps("flow.steps.step-").keySet) {
logger.info("Checking " + stepName)
if (!flowEnvelope.hasAttribute(stepName + ".action") && !flowEnvelope.hasAttribute(stepName + ".processor")) {
logger.error(stepName + " is missing attribute action or processor. Unable to further validate " + stepName)
confGood = false
} else {
val className = if(flowEnvelope.hasAttribute(stepName + ".action")){
flowEnvelope.getAttribute(stepName + ".action")
} else {
flowEnvelope.getAttribute(stepName + ".processor")
}
val plugin = Class.forName("com.bac.rctt.apps.flowbot.processors." + className).newInstance.asInstanceOf[ {def validate(flowEnvelope: FlowEnvelope, step: String): Boolean}]
if (!plugin.validate(flowEnvelope, stepName)) confGood = false
}
}
if (!confGood) {
throw new Exception("Errors exist in configuration file")
}
}
}
}