diff --git a/.github/workflows/almaren-framework.yml b/.github/workflows/almaren-framework.yml new file mode 100644 index 00000000..a3f7a7be --- /dev/null +++ b/.github/workflows/almaren-framework.yml @@ -0,0 +1,41 @@ +name: Almaren Framework +on: [push, pull_request] + +jobs: + Build: + runs-on: ubuntu-20.04 + services: + postgres: + image: postgres:13.4 + env: + POSTGRES_PASSWORD: postgres + POSTGRES_HOST_AUTH_METHOD: trust + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: + - name : Check out repository code + uses: actions/checkout@v2 + - name: Setup JDK + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 8 + cache: sbt + - name: Build and test scala version + run: | + PGPASSWORD="postgres" psql -c 'create database almaren;' -U postgres -h localhost + PGPASSWORD="postgres" psql -c "ALTER USER postgres PASSWORD 'foo' ;" -U postgres -h localhost + PGPASSWORD="postgres" psql -c 'create role runner;' -U postgres -h localhost + PGPASSWORD="postgres" psql -c 'ALTER ROLE "runner" WITH LOGIN SUPERUSER INHERIT CREATEDB CREATEROLE REPLICATION;' -U postgres -h localhost + sbt ++2.11.12 test + sbt ++2.12.10 test + rm -rf "$HOME/.ivy2/local" || true + find $HOME/Library/Caches/Coursier/v1 -name "ivydata-*.properties" -delete || true + find $HOME/.ivy2/cache -name "ivydata-*.properties" -delete || true + find $HOME/.cache/coursier/v1 -name "ivydata-*.properties" -delete || true + find $HOME/.sbt -name "*.lock" -delete || true \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 3f5480d9..00000000 --- a/.travis.yml +++ /dev/null @@ -1,35 +0,0 @@ -language: scala - -jdk: openjdk8 - -script: - - sbt +test - -install: - - | - # update this only when sbt-the-bash-script needs to be updated - export SBT_LAUNCHER=1.4.8 - export SBT_OPTS="-Dfile.encoding=UTF-8" - curl -L --silent "https://github.com/sbt/sbt/releases/download/v$SBT_LAUNCHER/sbt-$SBT_LAUNCHER.tgz" > $HOME/sbt.tgz - tar zxf $HOME/sbt.tgz -C $HOME - sudo rm /usr/local/bin/sbt - sudo ln -s $HOME/sbt/bin/sbt /usr/local/bin/sbt - -cache: - directories: - - $HOME/.cache/coursier - - $HOME/.ivy2/cache - - $HOME/.sbt - -before_cache: - - rm -fv $HOME/.ivy2/.sbt.ivy.lock - - find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete - - find $HOME/.sbt -name "*.lock" -print -delete - -services: - - postgresql - -before_script: - - psql -c 'create database almaren;' -U postgres - - psql -c "ALTER USER postgres PASSWORD 'foo' ;" - diff --git a/README.md b/README.md index 4d5800e8..69ba2d59 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ The Almaren Framework provides a simplified consistent minimalistic layer over Apache Spark, while still allowing you to take advantage of native Apache Spark features. You can even combine it with standard Spark code. -[![Build Status](https://travis-ci.com/mantovani/almaren-framework.svg?branch=master)](https://travis-ci.com/mantovani/almaren-framework) +[![Build Status](https://github.com/music-of-the-ainur/almaren-framework/actions/workflows/almaren-framework.yml/badge.svg)](https://github.com/music-of-the-ainur/almaren-framework/actions/workflows/almaren-framework.yml) [![Gitter Community](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/music-of-the-ainur/community) ## Table of Contents diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala index ef5816ba..305c921d 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala @@ -5,14 +5,14 @@ import com.github.music.of.the.ainur.almaren.state.core._ import com.github.music.of.the.ainur.almaren.{Tree, InvalidDecoder, SchemaRequired, State} private[almaren] trait Deserializer extends Core { - def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None): Option[Tree] = { + def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None,options:Map[String, String] = Map(),autoFlatten:Boolean = true): Option[Tree] = { def json(): State = - JsonDeserializer(columnName,schemaInfo) + JsonDeserializer(columnName,schemaInfo,options,autoFlatten) def xml(): State = - XMLDeserializer(columnName,schemaInfo) - def avro(): State = - AvroDeserializer(columnName,schemaInfo.getOrElse(throw SchemaRequired(decoder))) + XMLDeserializer(columnName,schemaInfo,options,autoFlatten) + def avro: State = + AvroDeserializer(columnName, None, options, autoFlatten,schemaInfo.getOrElse(throw SchemaRequired(decoder))) decoder.toUpperCase match { diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala index 8b6faf8a..b4990ba8 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala @@ -1,61 +1,104 @@ package com.github.music.of.the.ainur.almaren.state.core -import com.github.music.of.the.ainur.almaren.State -import org.apache.spark.sql.DataFrame +import com.github.music.of.the.ainur.almaren.{Almaren, SchemaRequired, State} +import org.apache.spark.sql.{DataFrame, DataFrameReader, Dataset} import org.apache.spark.sql.types.{DataType, StructType} import scala.language.implicitConversions -import com.github.music.of.the.ainur.almaren.Almaren import com.github.music.of.the.ainur.almaren.util.Constants -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.functions.col + +import javax.xml.crypto.Data + +trait Deserializer extends State { + + def columnName: String + def schema: Option[String] + def options: Map[String, String] + def autoFlatten: Boolean + + override def executor(df: DataFrame): DataFrame = { + val newDf = deserializer(df) + if (autoFlatten) + autoFlatten(newDf, columnName) + else + newDf + } -abstract class Deserializer() extends State { - override def executor(df: DataFrame): DataFrame = deserializer(df) def deserializer(df: DataFrame): DataFrame - implicit def string2Schema(schema: String): DataType = + + implicit def string2Schema(schema: String): StructType = StructType.fromDDL(schema) + + def autoFlatten(df: DataFrame, columnName: String): DataFrame = + df. + select("*", columnName.concat(".*")). + drop(columnName) + + def sampleData[T](df: Dataset[T]): Dataset[T] = { + df.sample( + options.getOrElse("samplingRatio", "1.0").toDouble + ).limit( + options.getOrElse("samplingMaxLines", "10000").toInt + ) + } + + def getReadWithOptions: DataFrameReader = + Almaren.spark.getOrCreate().read.options(options) + + def getDDL(df: DataFrame): String = + df.schema.toDDL } -case class AvroDeserializer(columnName: String,schema: String) extends Deserializer { +case class AvroDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean, mandatorySchema: String) extends Deserializer { + import org.apache.spark.sql.avro._ import org.apache.spark.sql.functions._ + import collection.JavaConversions._ + + schema.map(_ => throw SchemaRequired(s"AvroDeserializer, don't use 'schema' it must be None, use 'mandatorySchema' ")) + override def deserializer(df: DataFrame): DataFrame = { - logger.info(s"columnName:{$columnName}, schema:{$schema}") - df.withColumn(columnName,from_avro(col(columnName),schema)) - .select("*",columnName.concat(".*")).drop(columnName) + import df.sparkSession.implicits._ + logger.info(s"columnName:{$columnName}, schema:{$mandatorySchema}, options:{$options}, autoFlatten:{$autoFlatten}") + df.withColumn(columnName, from_avro(col(columnName), mandatorySchema)) } } -case class JsonDeserializer(columnName: String,schema: Option[String]) extends Deserializer { +case class JsonDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer { + import org.apache.spark.sql.functions._ + import collection.JavaConversions._ + override def deserializer(df: DataFrame): DataFrame = { import df.sparkSession.implicits._ - logger.info(s"columnName:{$columnName}, schema:{$schema}") + logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}") df.withColumn(columnName, - from_json(col(columnName), - schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])))) - .select("*",columnName.concat(".*")) - .drop(columnName) + from_json( + col(columnName), + schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])), + options + )) } + private def getSchemaDDL(df: Dataset[String]): String = - Almaren.spark.getOrCreate().read.json(df.sample(Constants.sampleDeserializer)).schema.toDDL + getDDL(getReadWithOptions.json(sampleData(df))) } -case class XMLDeserializer(columnName: String, schema: Option[String]) extends Deserializer { +case class XMLDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer { + import com.databricks.spark.xml.functions.from_xml import com.databricks.spark.xml.schema_of_xml import org.apache.spark.sql.functions._ override def deserializer(df: DataFrame): DataFrame = { - logger.info(s"columnName:{$columnName}") + logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}") import df.sparkSession.implicits._ val xmlSchema = schema match { case Some(s) => StructType.fromDDL(s) - case None => schema_of_xml(df.select(columnName).as[String]) + case None => schema_of_xml(sampleData(df.select(columnName).as[String]), options = options) } df - .withColumn(columnName, from_xml(col(columnName), xmlSchema)) - .select("*",columnName.concat(".*")) - .drop(columnName) + .withColumn(columnName, from_xml(col(columnName), xmlSchema, options)) } -} +} \ No newline at end of file