-
Notifications
You must be signed in to change notification settings - Fork 0
/
ApplyCDC
145 lines (118 loc) · 5.8 KB
/
ApplyCDC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.bac.rctt.apps.flowbot.processors
import java.io.{ InputStream, InputStreamReader }
import com.bac.rctt.apps.flowbot.envelope.{ DataFrameEnvelope, FlowEnvelope }
import com.google.common.base.Charsets
import com.google.common.io.CharStreams
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.hive.HiveContext
import com.bac.ecr.hdf.frameworks.logging.HadoopLogger
import com.bac.ecr.hdf.frameworks.logging.HadoopLogFactory
import com.bac.ecr.hdf.frameworks.logging.HadoopLogger.DDI_LAYER
import com.bac.ecr.hdf.components.cdc.driver.ChangeDataCapture
import collection.JavaConverters._
import collection.mutable._
import java.util.HashMap
/*************************ApplyCDC Processor***********************
Processor : ApplyCDC
Description: The ApplyCDC processor can be configured to run HDPF CDC against in-memory dataframes/ Hive tables
Output : dataframe
Sample step:
step-01 {
type=output
name=runCDC
processor=ApplyCDC
enabled=true
cdcSnapshot=db.table/dataframe_name
cdcHistory=db.table/dataframe_name
primaryKeys=dept
cdcRunDateValue="2017-02-24"
cdcArgs="nonComparedKeys=period_dt,start_dt,end_dt"
rowCountEst=100
sla=10 mins
product="test_product"
controlPoint="test_cp"
}
HDPF-CDC supported Pipeline signature : DataFrame ChangeDataCapture.performCDC(DataFrame snapshtDF,DataFrame histDF,String primaryKeys,String cdcRunDateValue,Map<String,String> optionalArgsMap,HiveContext hiveCtx,HadoopLogger hadoopLogger)
ChangeLog:
Initial Version 1.0.0
*************************************************************/
class ApplyCDC extends BaseProcessor {
val PRINT_FROMATTER="******************"
def run(sc: SparkContext, hiveContext: org.apache.spark.sql.hive.HiveContext, flowEnvelope: FlowEnvelope, step: String): FlowEnvelope = {
logger.info(PRINT_FROMATTER+"found output step! Going to pass '" + step + "' to sp.ApplyCDC")
try{
if (!flowEnvelope.hasAttribute(step + ".controlPoint")) flowEnvelope.setAttribute(step + ".controlPoint" , "")
if (!flowEnvelope.hasAttribute(step + ".product")) flowEnvelope.setAttribute(step + ".product" , "")
logger.info(PRINT_FROMATTER+"found ApplyCDC step! Going to pass '" + step + "' to sp.ApplyCDC")
flowEnvelope.mergeAttributes(callHDPFCDC(sc,hiveContext, flowEnvelope, step).getAttributes())
}catch{
case e: Exception => logger.error(PRINT_FROMATTER + "Error occurred while running ApplyCDC."+ e)
throw e
}
flowEnvelope
}
def callHDPFCDC(sc: SparkContext, hiveContext: org.apache.spark.sql.hive.HiveContext, flowEnvelope: FlowEnvelope, step: String): DataFrameEnvelope = {
logger.info(PRINT_FROMATTER+"Running ApplyCDC")
try{
val envelope: DataFrameEnvelope = new DataFrameEnvelope(step)
envelope.setAttribute(step + ".function-uuid", "f1720382-317a-4e4d-b0c3-be878b462786")
//envelope.mergeAttributesWithFilter(flowEnvelope.getAttributes(), step)
envelope.mergeAttributes(flowEnvelope.getAttributes())
envelope.setDataFrame(hiveContext.sql("select * from " + envelope.getAttribute(step + ".cdcSnapshot")))
val snapshotDf = envelope.getDataFrame()
envelope.setDataFrame(hiveContext.sql("select * from " + envelope.getAttribute(step + ".cdcHistory")))
val historyDf = envelope.getDataFrame()
val argsArray : Array[String] = flowEnvelope.getAttribute(step + ".cdcArgs").split(";")
val cdcMap : java.util.Map[String, String] = getArgsMap(argsArray)
val conf: Configuration = new Configuration()
val fs: FileSystem = FileSystem.get(conf)
val jsc = JavaSparkContext.fromSparkContext(sc)
val hadoopLogger: HadoopLogger = HadoopLogFactory.getInstance(getClass().getSimpleName(), DDI_LAYER.OTHER, jsc.applicationId, jsc.hadoopConfiguration())
hadoopLogger.setProduct(flowEnvelope.getAttribute(step + ".product"))
hadoopLogger.setControlPoint(flowEnvelope.getAttribute(step + ".controlPoint"))
logger.info(PRINT_FROMATTER+"Arguments passed to CDC are : " + argsArray.mkString)
envelope.setDataFrame(ChangeDataCapture.performCDC(snapshotDf,historyDf,envelope.getAttribute(step + ".primaryKeys"),envelope.getAttribute(step + ".cdcRunDateValue"),cdcMap,hiveContext,hadoopLogger))
envelope
}catch{
case e: Exception => logger.error(PRINT_FROMATTER + "Error occurred while running ApplyCDC."+ e)
throw e
}
}
def getArgsMap(argsArray : Array[String]) : java.util.Map[String, String] = {
val cdcArgMap : java.util.Map[String, String] = new HashMap()
//@TODO replace with advance scala code
for (e <- argsArray) {
val s = e.split("=", 2)
cdcArgMap.put(s(0), s(1))}
cdcArgMap
}
override def validate(flowEnvelope: FlowEnvelope, step: String): Boolean = {
logger.info("Verifying " + step)
var confGood = true
if (!flowEnvelope.hasAttribute(step + ".cdcSnapshot")) {
logger.error(PRINT_FROMATTER+step + " is missing attribute cdcSnapshot")
confGood = false
}
if (!flowEnvelope.hasAttribute(step + ".cdcHistory")) {
logger.error(PRINT_FROMATTER+step + " is missing attribute cdcHistory")
confGood = false
}
if (!flowEnvelope.hasAttribute(step + ".primaryKeys")) {
logger.error(PRINT_FROMATTER+step + " is missing attribute primaryKeys")
confGood = false
}
if (!flowEnvelope.hasAttribute(step + ".cdcRunDateValue")) {
logger.error(PRINT_FROMATTER+step + " is missing attribute cdcRunDateValue")
confGood = false
}
if (!flowEnvelope.hasAttribute(step + ".cdcArgs")) {
logger.error(PRINT_FROMATTER+step + " is missing attribute cdcArgs")
confGood = false
}
confGood
}
}