Skip to content

Commit

Permalink
improve(SelectiveMerge):merge using select instead of iterating df. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Doe-Ed authored May 14, 2020
1 parent ef770ef commit 82bc20e
Showing 1 changed file with 58 additions and 13 deletions.
71 changes: 58 additions & 13 deletions src/main/scala/com/yotpo/metorikku/code/steps/SelectiveMerge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package com.yotpo.metorikku.code.steps

import com.yotpo.metorikku.exceptions.MetorikkuException
import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType

object SelectiveMerge {
private val message = "You need to send 3 parameters with the names of the dataframes to merge and the key(s) to merge on" +
"(merged df1 into df2 favouring values from df2): df1, df2, Seq[String]"
private val log: Logger = LogManager.getLogger(this.getClass)
private val colRenamePrefix = "df2_"
private val colRenamePrefixLen = colRenamePrefix.length
private class InputMatcher[K](ks: K*) {
def unapplySeq[V](m: Map[K, V]): Option[Seq[V]] = if (ks.forall(m.contains)) Some(ks.map(m)) else None
}
Expand Down Expand Up @@ -52,26 +55,68 @@ object SelectiveMerge {

df2.select(
columns.zip(columnsRenamed).map{
case (x: Column, y) => {
case (x: Column, y: String) => {
x.alias(y)
}
}: _*
).join(df1, joinKeys,"outer")
}

def overrideConflictingValues(df1: DataFrame, df2: DataFrame, mergedDf: DataFrame, joinKeys: Seq[String]): DataFrame = {
var mergedDfBuilder = mergedDf
for (col <- df2.schema) {
val colNameDf2 = colRenamePrefix + col.name
if (df1.schema.contains(col) && !joinKeys.contains(col.name)) {
mergedDfBuilder = mergedDfBuilder
.withColumn(colNameDf2,
when(mergedDfBuilder(colNameDf2).isNotNull, mergedDfBuilder(colNameDf2))
.otherwise(df1(col.name)))
.drop(col.name)
}
mergedDfBuilder = mergedDfBuilder.withColumnRenamed(colNameDf2, col.name)
def isMemberOfDf1(schema: StructType, name: String): Boolean = {
val schemaNames = schema.map(f => f.name)

if (name.startsWith(colRenamePrefix)) {
schemaNames.contains(name.substring(colRenamePrefixLen))
}
else {
false
}
}

def getMergedSchema(df1: DataFrame, df2: DataFrame, joinKeys: Seq[String]): Seq[Column] = {
val mergedSchemaNames = (df1.schema.map(f => f.name) ++ df2.schema.map(f => f.name)).distinct

val mergedSchema = mergedSchemaNames.map(s =>
if (df2.columns.contains(s) && !joinKeys.contains(s)) {
col(colRenamePrefix + s)
}
else {
col(s)
}
)

mergedSchema
}


def overrideConflictingValues(df1: DataFrame, df2: DataFrame, mergedDf: DataFrame, joinKeys: Seq[String]): DataFrame = {
val mergedSchema = getMergedSchema(df1, df2, joinKeys)

val mergedDfBuilder = mergedDf.select(
mergedSchema.map{
case (currColumn: Column) => {
val colName = currColumn.expr.asInstanceOf[NamedExpression].name
// Column is a part of df1 and doesn't belong to the join keys.
if (isMemberOfDf1(df1.schema, colName) && !joinKeys.contains(colName)) {
val origColName = colName.substring(colRenamePrefixLen)
when(mergedDf(colName).isNotNull, mergedDf(colName).cast(df1.schema(origColName).dataType))
.otherwise(df1(origColName))
.alias(origColName)
}
// Column doesn't belong to df1 or is join key
else {
// Column doesn't belong to df1
if (colName.startsWith(colRenamePrefix)) {
currColumn.alias(colName.substring(colRenamePrefixLen))
}
// Column is the merge key
else {
currColumn
}
}
}
}: _*
)

mergedDfBuilder
}
Expand Down

0 comments on commit 82bc20e

Please sign in to comment.