-
Notifications
You must be signed in to change notification settings - Fork 0
/
RDBMSSumCheckUtil
256 lines (215 loc) · 14.7 KB
/
RDBMSSumCheckUtil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package com.bac.rctt.apps.flowbot.util
/**
* This code is used to fetch the sqlLiteral or sqlFile from configuration file
* and execute those HQL's and return the dataframeEnvelope.
*
* Date Developer Change
* ========= ========================== ==============================================
* 31-Aug-17 Sundarrajan Raman This Util class is used to accomodate all the
* helper functions needed for RDBMS Sum Check.
* While checkSumValidation is the entry point to the Sum Check
* It inturn uses getDBSumCheckSourceDataFrame, getDBSumCheckTargetDataFrame, addSumCheckStatusToMetaData, sumCheckValues
* todo the SUm Validation.
* getDBSumCheckSourceDataFrame========>
* ||===>sumCheckValues=>checkSumValidation===>Update Flow Envelope Status attributes
* getDBSumCheckTargetDataFrame========>
*
*/
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Actor
import akka.util.Timeout
import com.bac.rctt.apps.flowbot.envelope._
import org.apache.log4j.Logger
import org.apache.spark.sql.hive.HiveContext
import util.SchemaUtil
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.Decimal
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types.FloatType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.LongType
import scala.collection.immutable.StringOps
class RDBMSSumCheckUtil {
val logger: Logger = Logger.getLogger(getClass.getName)
object Status extends Enumeration {
type Status = Value
val FAIL = Value("FAIL")
val PASS = Value("PASS")
}
/** This method get the Sum value for all the SUM Check columns from the Database using query.
* sumCheckColumns="user age salary" ===> Query would be "select sum(age) as age_SUM, sum(salary) as salary_SUM from source".
*
* Return the DataFrame that represents the above query.
*/
def getDBSumCheckSourceDataFrame(hiveContext: org.apache.spark.sql.hive.HiveContext, step: String, envelope: DataFrameEnvelope) : DataFrame = {
//Get the sum of all the sum columns
val decryptedPassword = new EncryptionUtil().decryptPassword(envelope, step)
var sumDataFrameSource:DataFrame = null
envelope.getAttribute(step + ".dbtype") match {
case "teradata" => {
sumDataFrameSource = hiveContext.read.format("jdbc")
.option("url", "jdbc:" + envelope.getAttribute(step + ".dbtype") + "://" + envelope.getAttribute(step + ".hostname"))
.option("driver", envelope.getAttribute(step + ".driver"))
.option("dbtable", envelope.getAttribute(step + ".sumCheckQuery"))
.option("user", envelope.getAttribute(step + ".username"))
.option("password", decryptedPassword).load()
}
case "sqlserver" => {
sumDataFrameSource = hiveContext.read.format("jdbc")
.option("url", "jdbc:" + envelope.getAttribute(step + ".dbtype") + "://" + envelope.getAttribute(step + ".hostname") + ":" + envelope.getAttribute(step + ".port"))
.option("driver", envelope.getAttribute(step + ".driver"))
.option("dbtable", envelope.getAttribute(step + ".sumCheckQuery"))
.option("user", envelope.getAttribute(step + ".username"))
.option("password", decryptedPassword).load()
}
case _ => {
sumDataFrameSource = hiveContext.read.format("jdbc")
.option("url", "jdbc:" + envelope.getAttribute(step + ".dbtype") + "://" + envelope.getAttribute(step + ".hostname") + ":" + envelope.getAttribute(step + ".port") + "/" + envelope.getAttribute(step + ".database"))
.option("driver", envelope.getAttribute(step + ".driver"))
.option("dbtable", envelope.getAttribute(step + ".sumCheckQuery"))
.option("user", envelope.getAttribute(step + ".username"))
.option("password", decryptedPassword).load()
}
}
sumDataFrameSource
}
/**
* This Method aggregates the sum check Columns obtained from the Data Frame which is computed in the Trget environment where
* this Job is running.
*
* return the DataFrame that will have all the sumcheck columns as aggregated columns
*/
def getDBSumCheckTargetDataFrame(step: String, envelope: DataFrameEnvelope) : DataFrame = {
val sumCheckColumns = envelope.getAttribute(step + ".sumCheckDataFrameExpression")
var sumColumnExprMap = scala.collection.mutable.Map[String, String]()
sumCheckColumns.split(",").map(sumCheckItems => {
sumColumnExprMap += (sumCheckItems -> "SUM")
})
logger.info("*****************sumCheckDataFrameExpression::::" + envelope.getAttribute(step + ".sumCheckDataFrameExpression"))
var sumDataFrameDestination = envelope.getDataFrame().agg(sumColumnExprMap.toMap)
sumCheckColumns.split(",").map(sumCheckItems => {
logger.info("****renaming columns ::::" + sumCheckItems + "** to ::: " + sumCheckItems + "_SUM")
sumDataFrameDestination = sumDataFrameDestination.withColumnRenamed("sum(" + sumCheckItems + ")", sumCheckItems + "_SUM")
})
sumDataFrameDestination
}
/**
* This method is helper method to do the MetaData additions for the Sum Values obtained.
*/
def addSumCheckStatusToMetaData(sumDataFrameSource:DataFrame, sumDataFrameDestination:DataFrame,
step:String, envelope: DataFrameEnvelope, sourcecolumnIndex: Int, destcolumnIndex: Int, status:String) : DataFrameEnvelope = {
envelope.setAttribute(step + ".dataIntegrity.datasetSum." + envelope.getAttribute(step + ".name") + "." + extractColumnName(sumDataFrameDestination.schema.fieldNames(destcolumnIndex)), if (sumDataFrameDestination.first().get(destcolumnIndex) == null) "Null" else sumDataFrameDestination.first().get(destcolumnIndex).toString())
envelope.setAttribute(step + ".dataIntegrity.querySum." + envelope.getAttribute(step + ".name") + "." + extractColumnName(sumDataFrameSource.schema.fieldNames(sourcecolumnIndex)), if (sumDataFrameSource.first().get(sourcecolumnIndex) == null) "null" else sumDataFrameSource.first().get(sourcecolumnIndex).toString())
envelope.setAttribute(step + ".dataIntegrity.sumCheckStatus", status)
envelope
}
/**
* This method applies precision given and return both source and target sum. If the Precision can not be applied exception will be returned.
*
* returns Source SUm and Target SUM vaues with precision applied.
*/
def sumCheckValues(step:String, envelope: DataFrameEnvelope,sumDataFrameSource:DataFrame, sumDataFrameDestination:DataFrame, sourceColumnIndex: Int,destColumnIndex: Int) = {
logger.info("*****" + "Source Column Name : " + sumDataFrameSource.schema.apply(sourceColumnIndex).name + " Source dataType : " + sumDataFrameSource.schema.apply(sourceColumnIndex).dataType)
logger.info("*****" + "Destination Column Name : " + sumDataFrameDestination.schema.apply(destColumnIndex).name + " Source dataType : " + sumDataFrameDestination.schema.apply(destColumnIndex).dataType)
var sumValueSourceAsDecimal : Decimal = null
var sumValueDestinationAsDecimal : Decimal = null
sumDataFrameSource.schema.apply(sourceColumnIndex).dataType match{
case DecimalType() => {
sumValueSourceAsDecimal = Decimal.apply(sumDataFrameSource.first().getDecimal(sourceColumnIndex))
}
case DoubleType => {
sumValueSourceAsDecimal = Decimal.apply(sumDataFrameSource.first().getDouble(sourceColumnIndex))
}
case FloatType => {
sumValueSourceAsDecimal = Decimal.apply(sumDataFrameSource.first().getFloat(sourceColumnIndex))
}
case IntegerType => {
sumValueSourceAsDecimal = Decimal.apply(sumDataFrameSource.first().getInt(sourceColumnIndex))
}
case LongType => {
sumValueSourceAsDecimal = Decimal.apply(sumDataFrameSource.first().getLong(sourceColumnIndex))
}
}
sumDataFrameDestination.schema.apply(destColumnIndex).dataType match{
case DecimalType() => {
sumValueDestinationAsDecimal = Decimal.apply(sumDataFrameDestination.first().getDecimal(destColumnIndex))
}
case DoubleType => {
sumValueDestinationAsDecimal = Decimal.apply(sumDataFrameDestination.first().getDouble(destColumnIndex))
}
case FloatType => {
sumValueDestinationAsDecimal = Decimal.apply(sumDataFrameDestination.first().getFloat(destColumnIndex))
}
case IntegerType => {
sumValueDestinationAsDecimal = Decimal.apply(sumDataFrameDestination.first().getInt(destColumnIndex))
}
case LongType => {
sumValueDestinationAsDecimal = Decimal.apply(sumDataFrameDestination.first().getLong(destColumnIndex))
}
}
logger.info("Source Value : " + sumValueSourceAsDecimal)
logger.info("target Value : " + sumValueDestinationAsDecimal)
//If there is a precision then apply the same here before comaparing
if (envelope.hasAttribute(step + ".sumcheckprecision")){
val precision:Int = envelope.getAttribute(step + ".sumcheckprecision").split(",")(0).trim.toInt
val scale:Int = envelope.getAttribute(step + ".sumcheckprecision").split(",")(1).trim.toInt
if (! sumValueSourceAsDecimal.changePrecision(precision, scale)){
addSumCheckStatusToMetaData(sumDataFrameSource, sumDataFrameDestination, step, envelope, sourceColumnIndex, destColumnIndex, Status.FAIL.toString())
throw new Exception("Not able to apply the precision to Source Sum Column : " + extractColumnName(sumDataFrameSource.schema.fieldNames(sourceColumnIndex)))
}
if (! sumValueDestinationAsDecimal.changePrecision(precision, scale)){
addSumCheckStatusToMetaData(sumDataFrameSource, sumDataFrameDestination, step, envelope, sourceColumnIndex, destColumnIndex, Status.FAIL.toString())
throw new Exception("Not able to apply the precision to Destination Sum Column : " + extractColumnName(sumDataFrameDestination.schema.fieldNames(destColumnIndex)))
}
}
(sumValueSourceAsDecimal,sumValueDestinationAsDecimal)
}
/** **/
def extractColumnName(colNameWithSum:String) : String = {
if (colNameWithSum == null)
return ""
val lastIndex = colNameWithSum.lastIndexOf("_")
if (lastIndex >= 0)
colNameWithSum.substring(0, lastIndex)
else
colNameWithSum
}
/**
* This method does the SUM Check on the Sum Check Columns and throw Exception if the SUM Check Fails.
* The Metadaa status will also be updated with the Status and SOurce/Target Sum Values.
*/
def checkSumValidation(hiveContext: org.apache.spark.sql.hive.HiveContext, step: String, envelope: DataFrameEnvelope): Unit = {
val sumDataFrameSource:DataFrame = getDBSumCheckSourceDataFrame(hiveContext, step, envelope)
val sumDataFrameDestination = getDBSumCheckTargetDataFrame(step, envelope)
val sumColumnLength = sumDataFrameDestination.columns.length
logger.info("*****Total Columns on which Sum Check to be done : " + sumColumnLength)
val sumCheckColumns = envelope.getAttribute(step + ".sumCheckDataFrameExpression")
sumCheckColumns.split(",").map(sumCheckItems => {
//for (i <- 0 to sumColumnLength-1){ //Loop through each of the columns to get the Sum and compare
//Get Source DF Column Name and Destination Column Names
logger.info("Source Schema : " + sumDataFrameSource.schema)
logger.info("Destination Schema : " + sumDataFrameDestination.schema)
val sourceDFColIndex = sumDataFrameSource.schema.fieldIndex(sumCheckItems + "_SUM")
val destDFColIndex = sumDataFrameDestination.schema.fieldIndex(sumCheckItems + "_SUM")
if(sumDataFrameSource.first().get(sourceDFColIndex) == null || sumDataFrameDestination.first().get(destDFColIndex) == null){
logger.info("*****Validating sum value for " + extractColumnName(sumDataFrameSource.schema.fieldNames(sourceDFColIndex)))
logger.info("Found Source or Target Sum Column Values to be null.*****sumValueSource:: " + sumDataFrameSource.first().get(sourceDFColIndex) + " ::*****sumValueDestination:: " + sumDataFrameDestination.first().get(destDFColIndex) )
addSumCheckStatusToMetaData(sumDataFrameSource, sumDataFrameDestination, step, envelope, sourceDFColIndex,destDFColIndex, Status.FAIL.toString())
throw new Exception("Found Source or Target Sum Column Values to be null for " + sumDataFrameSource.schema.fieldNames(sourceDFColIndex) + ": *****sumValueSource:: " + sumDataFrameSource.first().get(sourceDFColIndex) + " ::*****sumValueDestination:: " + sumDataFrameDestination.first().get(destDFColIndex))
}
val(sumValueSourceAsDecimal, sumValueDestinationAsDecimal) = sumCheckValues(step,envelope,sumDataFrameSource,sumDataFrameDestination,sourceDFColIndex,destDFColIndex)
logger.info("*****Validating sum value for " + extractColumnName(sumDataFrameSource.schema.fieldNames(sourceDFColIndex)))
logger.info("*****sumValueSource:: " + sumValueSourceAsDecimal + " ::*****sumValueDestination:: " + sumValueDestinationAsDecimal + "Comparison result : " + (sumValueSourceAsDecimal == sumValueDestinationAsDecimal))
envelope.setAttribute(step + ".dataIntegrity.datasetSum." + envelope.getAttribute(step + ".name") + "." + extractColumnName(sumDataFrameDestination.schema.fieldNames(destDFColIndex)), sumValueDestinationAsDecimal.toString())
envelope.setAttribute(step + ".dataIntegrity.querySum." + envelope.getAttribute(step + ".name") + "." + extractColumnName(sumDataFrameSource.schema.fieldNames(sourceDFColIndex)), sumValueSourceAsDecimal.toString())
if (sumValueSourceAsDecimal == sumValueDestinationAsDecimal){
envelope.setAttribute(step + ".dataIntegrity.sumCheckStatus", Status.PASS.toString())
}else{
envelope.setAttribute(step + ".dataIntegrity.sumCheckStatus", Status.FAIL.toString())
throw new Exception("Sum Check failed for : " + extractColumnName(sumDataFrameSource.schema.fieldNames(sourceDFColIndex)) + ":: Source Value : " + sumValueSourceAsDecimal + " :: Target Value : " + sumValueDestinationAsDecimal)
}
})
}
}