Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base feature #1

Merged
merged 16 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# top-most EditorConfig file
dk1844 marked this conversation as resolved.
Show resolved Hide resolved
root = true

[*]
charset = utf-8
end_of_line = lf
trim_trailing_whitespace = true

[*.xml]
indent_size = 4
indent_style = space
insert_final_newline = true

[*.{java,scala,js,json,css}]
indent_size = 2
indent_style = space
insert_final_newline = true
max_line_length = 120

[*.md]
trim_trailing_whitespace = false
53 changes: 53 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


ThisBuild / name := "Standardization"
dk1844 marked this conversation as resolved.
Show resolved Hide resolved
ThisBuild / organization := "za.co.absa"
ThisBuild / version := "0.0.1-SNAPSHOT"
ThisBuild / scalaVersion := "2.11.12"

libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % "2.4.7" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.7" % "provided",
"za.co.absa" %% "spark-hats" % "0.2.2",
"za.co.absa" %% "spark-hofs" % "0.4.0",
"org.scalatest" %% "scalatest" % "3.2.2" % Test,
"com.typesafe" % "config" % "1.4.1"
)

parallelExecution in Test := false
mainClass in assembly := Some("za.co.absa.SparkApp")

assemblyMergeStrategy in assembly := {
case PathList("org", "aopalliance", _@_*) => MergeStrategy.last
case PathList("javax", "inject", _@_*) => MergeStrategy.last
case PathList("javax", "servlet", _@_*) => MergeStrategy.last
case PathList("javax", "activation", _@_*) => MergeStrategy.last
case PathList("org", "apache", _@_*) => MergeStrategy.last
case PathList("com", "google", _@_*) => MergeStrategy.last
case PathList("com", "esotericsoftware", _@_*) => MergeStrategy.last
case PathList("com", "codahale", _@_*) => MergeStrategy.last
case PathList("com", "yammer", _@_*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.5.5
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
30 changes: 30 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configuration added here is considered the application default and it will be used
# for keys that are not specified in the provided 'application.conf' or system properties.
# Here is the precedence of configuration (top ones have higher precedence):
# 1. System Properties (e.g. passed as '-Dkey=value')
# 2. application.conf (e.g. provided as '-Dconfig.file=...')
# 3. reference.conf

# 'enceladus_record_id' with an id can be added containing either true UUID, always the same IDs (row-hash-based) or the
# column will not be added at all. Allowed values: "uuid", "stableHashId", "none"
standardization.recordId.generation.strategy="uuid"

# system-wide time zone
timezone="UTC"

standardization.testUtils.sparkTestBaseMaster="local[4]"

standardization.failOnInputNotPerSchema=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.standardization

import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.functions.{callUDF, col, struct}
import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
import org.slf4j.LoggerFactory
import za.co.absa.standardization.schema.SchemaUtils

object ArrayTransformations {
private val logger = LoggerFactory.getLogger(this.getClass)
def flattenArrays(df: Dataset[Row], colName: String)(implicit spark: SparkSession): Dataset[Row] = {
val typ = SchemaUtils.getFieldType(colName, df.schema).getOrElse(throw new Error(s"Field $colName does not exist in ${df.schema.printTreeString()}"))
if (!typ.isInstanceOf[ArrayType]) {
logger.info(s"Field $colName is not an ArrayType, returning the original dataset!")
df
} else {
val arrType = typ.asInstanceOf[ArrayType]
if (!arrType.elementType.isInstanceOf[ArrayType]) {
logger.info(s"Field $colName is not a nested array, returning the original dataset!")
df
} else {
val udfName = colName.replace('.', '_') + System.currentTimeMillis()

spark.udf.register(udfName, new UDF1[Seq[Seq[Row]], Seq[Row]] {
def call(t1: Seq[Seq[Row]]): Seq[Row] = if (t1 == null) null.asInstanceOf[Seq[Row]] else t1.filter(_ != null).flatten // scalastyle:ignore null
}, arrType.elementType)

nestedWithColumn(df)(colName, callUDF(udfName, col(colName)))
}
}

}

def nestedWithColumn(ds: Dataset[Row])(columnName: String, column: Column): Dataset[Row] = {
val toks = columnName.split("\\.").toList

def helper(tokens: List[String], pathAcc: Seq[String]): Column = {
val currPath = (pathAcc :+ tokens.head).mkString(".")
val topType = SchemaUtils.getFieldType(currPath, ds.schema)

// got a match
if (currPath == columnName) {
column as tokens.head
} // some other attribute
else if (!columnName.startsWith(currPath)) {
arrCol(currPath)
} // partial match, keep going
else if (topType.isEmpty) {
struct(helper(tokens.tail, pathAcc ++ List(tokens.head))) as tokens.head
} else {
topType.get match {
case s: StructType =>
val cols = s.fields.map(_.name)
val fields = if (tokens.size > 1 && !cols.contains(tokens(1))) {
cols :+ tokens(1)
} else {
cols
}
struct(fields.map(field => helper((List(field) ++ tokens.tail).distinct, pathAcc :+ tokens.head) as field): _*) as tokens.head
case _: ArrayType => throw new IllegalStateException("Cannot reconstruct array columns. Please use this within arrayTransform.")
case _: DataType => arrCol(currPath) as tokens.head
}
}
}

ds.withColumn(toks.head, helper(toks, Seq()))
}

def arrCol(any: String): Column = {
val toks = any.replaceAll("\\[(\\d+)\\]", "\\.$1").split("\\.")
toks.tail.foldLeft(col(toks.head)){
case (acc, tok) =>
if (tok.matches("\\d+")) {
acc(tok.toInt)
} else {
acc(tok)
}
}
}
}
179 changes: 179 additions & 0 deletions src/main/scala/za/co/absa/standardization/ConfigReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.standardization

import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._

object ConfigReader {
type ConfigExceptionBadValue = ConfigException.BadValue

val redactedReplacement: String = "*****"
private val defaultConfig: ConfigReader = new ConfigReader(ConfigFactory.load())

def apply(): ConfigReader = defaultConfig
def apply(config: Config): ConfigReader = new ConfigReader(config)
def apply(configMap: Map[String, String]): ConfigReader = {
val config = ConfigFactory.parseMap(configMap.asJava)
apply(config)
}

def parseString(configLine: String): ConfigReader = {
val config = ConfigFactory.parseString(configLine)
apply(config)
}
}

class ConfigReader(val config: Config = ConfigFactory.load()) {
import ConfigReader._


def hasPath(path: String): Boolean = {
config.hasPath(path)
}

def getString(path: String): String = {
config.getString(path)
}

def getInt(path: String): Int = {
config.getInt(path)
}

def getBoolean(path: String): Boolean = {
config.getBoolean(path)
}

/**
* Inspects the config for the presence of the `path` and returns an optional result.
*
* @param path path to look for, e.g. "group1.subgroup2.value3
* @return None if not found or defined Option[String]
*/
def getStringOption(path: String): Option[String] = {
getIfExists(path)(getString)
}

def getIntOption(path: String): Option[Int] = {
getIfExists(path)(getInt)
}

/**
* Inspects the config for the presence of the `path` and returns an optional result.
*
* @param path path to look for, e.g. "group1.subgroup2.value3
* @return None if not found or defined Option[Boolean]
*/
def getBooleanOption(path: String): Option[Boolean] = {
getIfExists(path)(getBoolean)
}

/** Handy shorthand of frequent `config.withValue(key, ConfigValueFactory.fromAnyRef(value))` */
def withAnyRefValue(key: String, value: AnyRef) : ConfigReader = {
ConfigReader(config.withValue(key, ConfigValueFactory.fromAnyRef(value)))
}

/**
* Given a configuration returns a new configuration which has all sensitive keys redacted.
*
* @param keysToRedact A set of keys to be redacted.
*/
def getRedactedConfig(keysToRedact: Set[String]): ConfigReader = {
def withAddedKey(accumulatedConfig: Config, key: String): Config = {
if (config.hasPath(key)) {
accumulatedConfig.withValue(key, ConfigValueFactory.fromAnyRef(redactedReplacement))
} else {
accumulatedConfig
}
}

val redactingConfig = keysToRedact.foldLeft(ConfigFactory.empty)(withAddedKey)

ConfigReader(redactingConfig.withFallback(config))
}

def getLong(path: String): Long = {
config.getLong(path)
}

def getLongOption(path: String): Option[Long] = {
getIfExists(path)(getLong)
}

/**
* Flattens TypeSafe config tree and returns the effective configuration
* while redacting sensitive keys.
*
* @param keysToRedact A set of keys for which should be redacted.
* @return the effective configuration as a map
*/
def getFlatConfig(keysToRedact: Set[String] = Set()): Map[String, AnyRef] = {
getRedactedConfig(keysToRedact).config.entrySet().asScala.map({ entry =>
entry.getKey -> entry.getValue.unwrapped()
}).toMap
}

/**
* Logs the effective configuration while redacting sensitive keys
* in HOCON format.
*
* @param keysToRedact A set of keys for which values shouldn't be logged.
*/
def logEffectiveConfigHocon(keysToRedact: Set[String] = Set(), log: Logger = LoggerFactory.getLogger(this.getClass)): Unit = {
val redactedConfig = getRedactedConfig(keysToRedact)

val renderOptions = ConfigRenderOptions.defaults()
.setComments(false)
.setOriginComments(false)
.setJson(false)

val rendered = redactedConfig.config.root().render(renderOptions)

log.info(s"Effective configuration:\n$rendered")
}

/**
* Logs the effective configuration while redacting sensitive keys
* in Properties format.
*
* @param keysToRedact A set of keys for which values shouldn't be logged.
*/
def logEffectiveConfigProps(keysToRedact: Set[String] = Set(), log: Logger = LoggerFactory.getLogger(this.getClass)): Unit = {
val redactedConfig = getFlatConfig(keysToRedact)

val rendered = redactedConfig.map {
case (k, v) => s"$k = $v"
}.toArray
.sortBy(identity)
.mkString("\n")

log.info(s"Effective configuration:\n$rendered")
}

private def getIfExists[T](path: String)(readFnc: String => T): Option[T] = {
if (config.hasPathOrNull(path)) {
if (config.getIsNull(path)) {
None
} else {
Option(readFnc(path))
}
} else {
None
}
}

}
Loading