Skip to content

Commit

Permalink
Merge pull request #66 from badrinathpatchikolla/spark-2.4
Browse files Browse the repository at this point in the history
Added GitHub Actions and fixed Deserializer Class
  • Loading branch information
mantovani authored Oct 4, 2022
2 parents e87106b + 34ed741 commit 3bd2a55
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 66 deletions.
41 changes: 41 additions & 0 deletions .github/workflows/almaren-framework.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Almaren Framework
on: [push, pull_request]

jobs:
Build:
runs-on: ubuntu-20.04
services:
postgres:
image: postgres:13.4
env:
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: trust
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name : Check out repository code
uses: actions/checkout@v2
- name: Setup JDK
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
cache: sbt
- name: Build and test scala version
run: |
PGPASSWORD="postgres" psql -c 'create database almaren;' -U postgres -h localhost
PGPASSWORD="postgres" psql -c "ALTER USER postgres PASSWORD 'foo' ;" -U postgres -h localhost
PGPASSWORD="postgres" psql -c 'create role runner;' -U postgres -h localhost
PGPASSWORD="postgres" psql -c 'ALTER ROLE "runner" WITH LOGIN SUPERUSER INHERIT CREATEDB CREATEROLE REPLICATION;' -U postgres -h localhost
sbt ++2.11.12 test
sbt ++2.12.10 test
rm -rf "$HOME/.ivy2/local" || true
find $HOME/Library/Caches/Coursier/v1 -name "ivydata-*.properties" -delete || true
find $HOME/.ivy2/cache -name "ivydata-*.properties" -delete || true
find $HOME/.cache/coursier/v1 -name "ivydata-*.properties" -delete || true
find $HOME/.sbt -name "*.lock" -delete || true
35 changes: 0 additions & 35 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The Almaren Framework provides a simplified consistent minimalistic layer over Apache Spark, while still allowing you to take advantage of native Apache Spark features. You can even combine it with standard Spark code.

[![Build Status](https://travis-ci.com/mantovani/almaren-framework.svg?branch=master)](https://travis-ci.com/mantovani/almaren-framework)
[![Build Status](https://github.com/music-of-the-ainur/almaren-framework/actions/workflows/almaren-framework.yml/badge.svg)](https://github.com/music-of-the-ainur/almaren-framework/actions/workflows/almaren-framework.yml)
[![Gitter Community](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/music-of-the-ainur/community)

## Table of Contents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.github.music.of.the.ainur.almaren.state.core._
import com.github.music.of.the.ainur.almaren.{Tree, InvalidDecoder, SchemaRequired, State}

private[almaren] trait Deserializer extends Core {
def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None): Option[Tree] = {
def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None,options:Map[String, String] = Map(),autoFlatten:Boolean = true): Option[Tree] = {

def json(): State =
JsonDeserializer(columnName,schemaInfo)
JsonDeserializer(columnName,schemaInfo,options,autoFlatten)
def xml(): State =
XMLDeserializer(columnName,schemaInfo)
def avro(): State =
AvroDeserializer(columnName,schemaInfo.getOrElse(throw SchemaRequired(decoder)))
XMLDeserializer(columnName,schemaInfo,options,autoFlatten)
def avro: State =
AvroDeserializer(columnName, None, options, autoFlatten,schemaInfo.getOrElse(throw SchemaRequired(decoder)))


decoder.toUpperCase match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,61 +1,104 @@
package com.github.music.of.the.ainur.almaren.state.core

import com.github.music.of.the.ainur.almaren.State
import org.apache.spark.sql.DataFrame
import com.github.music.of.the.ainur.almaren.{Almaren, SchemaRequired, State}
import org.apache.spark.sql.{DataFrame, DataFrameReader, Dataset}
import org.apache.spark.sql.types.{DataType, StructType}

import scala.language.implicitConversions
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.util.Constants
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.col

import javax.xml.crypto.Data

trait Deserializer extends State {

def columnName: String
def schema: Option[String]
def options: Map[String, String]
def autoFlatten: Boolean

override def executor(df: DataFrame): DataFrame = {
val newDf = deserializer(df)
if (autoFlatten)
autoFlatten(newDf, columnName)
else
newDf
}

abstract class Deserializer() extends State {
override def executor(df: DataFrame): DataFrame = deserializer(df)
def deserializer(df: DataFrame): DataFrame
implicit def string2Schema(schema: String): DataType =

implicit def string2Schema(schema: String): StructType =
StructType.fromDDL(schema)

def autoFlatten(df: DataFrame, columnName: String): DataFrame =
df.
select("*", columnName.concat(".*")).
drop(columnName)

def sampleData[T](df: Dataset[T]): Dataset[T] = {
df.sample(
options.getOrElse("samplingRatio", "1.0").toDouble
).limit(
options.getOrElse("samplingMaxLines", "10000").toInt
)
}

def getReadWithOptions: DataFrameReader =
Almaren.spark.getOrCreate().read.options(options)

def getDDL(df: DataFrame): String =
df.schema.toDDL
}

case class AvroDeserializer(columnName: String,schema: String) extends Deserializer {
case class AvroDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean, mandatorySchema: String) extends Deserializer {

import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions._
import collection.JavaConversions._

schema.map(_ => throw SchemaRequired(s"AvroDeserializer, don't use 'schema' it must be None, use 'mandatorySchema' "))

override def deserializer(df: DataFrame): DataFrame = {
logger.info(s"columnName:{$columnName}, schema:{$schema}")
df.withColumn(columnName,from_avro(col(columnName),schema))
.select("*",columnName.concat(".*")).drop(columnName)
import df.sparkSession.implicits._
logger.info(s"columnName:{$columnName}, schema:{$mandatorySchema}, options:{$options}, autoFlatten:{$autoFlatten}")
df.withColumn(columnName, from_avro(col(columnName), mandatorySchema))
}
}

case class JsonDeserializer(columnName: String,schema: Option[String]) extends Deserializer {
case class JsonDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer {

import org.apache.spark.sql.functions._
import collection.JavaConversions._

override def deserializer(df: DataFrame): DataFrame = {
import df.sparkSession.implicits._
logger.info(s"columnName:{$columnName}, schema:{$schema}")
logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}")
df.withColumn(columnName,
from_json(col(columnName),
schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)]))))
.select("*",columnName.concat(".*"))
.drop(columnName)
from_json(
col(columnName),
schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])),
options
))
}

private def getSchemaDDL(df: Dataset[String]): String =
Almaren.spark.getOrCreate().read.json(df.sample(Constants.sampleDeserializer)).schema.toDDL
getDDL(getReadWithOptions.json(sampleData(df)))
}

case class XMLDeserializer(columnName: String, schema: Option[String]) extends Deserializer {
case class XMLDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer {

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.sql.functions._

override def deserializer(df: DataFrame): DataFrame = {
logger.info(s"columnName:{$columnName}")
logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}")
import df.sparkSession.implicits._
val xmlSchema = schema match {
case Some(s) => StructType.fromDDL(s)
case None => schema_of_xml(df.select(columnName).as[String])
case None => schema_of_xml(sampleData(df.select(columnName).as[String]), options = options)
}
df
.withColumn(columnName, from_xml(col(columnName), xmlSchema))
.select("*",columnName.concat(".*"))
.drop(columnName)
.withColumn(columnName, from_xml(col(columnName), xmlSchema, options))
}
}
}

0 comments on commit 3bd2a55

Please sign in to comment.