diff --git a/CHANGELOG.md b/CHANGELOG.md index 788a2f5..35c5f23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +0.2.1 (2019-07-16) +================== + +* [Enhancement] Add `format`, `compression` and `field_delimiter` options to `athena.apas>` operator. If not set, these are detected automatically. + 0.2.0 (2019-07-16) ================== diff --git a/README.md b/README.md index e6d5e3f..1e3c71e 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.2.0 + - pro.civitaspo:digdag-operator-athena:0.2.1 athena: auth_method: profile @@ -125,6 +125,9 @@ Nothing - **partition_kv**: key-value pairs for partitioning (string to string map, required) - **location**: The location of the partition. If not specified, this operator generates like hive automatically. (string, default: auto generated like the below) - `${table location}/${partition key1}=${partition value1}/${partition key2}=${partition value2}/...` +- **format**: The data format for the query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: auto detection) +- **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: auto detection) +- **field_delimiter**: The field delimiter for files in CSV, TSV, and text files. This option is applied only when **format** is specific to text-based data storage formats. (string, auto detection) - **save_mode**: Specify the expected behavior. Available values are `"skip_if_exists"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) - `"skip_if_exists"`: Skip processing if the partition or the location exists. - `"error_if_exists"`: Raise error if the partition or the location exists. diff --git a/build.gradle b/build.gradle index a0d9477..60bfb09 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.2.0' +version = '0.2.1' def digdagVersion = '0.9.37' def awsSdkVersion = "1.11.587" diff --git a/example/example.dig b/example/example.dig index 3a52173..118542e 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.2.0 + - pro.civitaspo:digdag-operator-athena:0.2.1 athena: auth_method: profile value: 5 diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala index 6881328..6607952 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala @@ -1,7 +1,7 @@ package pro.civitaspo.digdag.plugin.athena.apas -import com.amazonaws.services.glue.model.{SerDeInfo, Table} +import com.amazonaws.services.glue.model.Table import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException} import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, TemplateEngine} @@ -9,7 +9,8 @@ import io.digdag.util.DurationParam import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator import scala.jdk.CollectionConverters._ -import scala.util.Random +import scala.util.{Random, Try} +import scala.util.chaining._ class AthenaApasOperator(operatorName: String, @@ -46,9 +47,9 @@ class AthenaApasOperator(operatorName: String, protected val table: String = params.get("table", classOf[String]) protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String]) protected val partitionKv: Map[String, String] = params.getMap("partition_kv", classOf[String], classOf[String]).asScala.toMap - protected val _format: Option[String] = Option(params.getOptional("format", classOf[String]).orNull()) - protected val _compression: Option[String] = Option(params.getOptional("compression", classOf[String]).orNull()) - protected val _fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull()) + protected val format: Option[String] = Option(params.getOptional("format", classOf[String]).orNull()) + protected val compression: Option[String] = Option(params.getOptional("compression", classOf[String]).orNull()) + protected val fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull()) protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite")) protected val bucketedBy: Seq[String] = params.getListOrEmpty("bucketed_by", classOf[String]).asScala.toSeq protected val bucketCount: Optional[Int] = params.getOptional("bucket_count", classOf[Int]) @@ -109,25 +110,37 @@ class AthenaApasOperator(operatorName: String, if (!prepareCondition()) return TaskResult.empty(cf) val t: Table = aws.glue.table.describe(catalogId, database, table) - val format: String = _format.getOrElse(detectFormat(t.getStorageDescriptor.getSerdeInfo.getSerializationLibrary)) - val compression: Option[String] = _compression.orElse(detectCompression(t.getParameters.asScala.toMap)) - val fieldDelimiter: Option[String] = _fieldDelimiter.orElse(detectFieldDelimiter(t.getStorageDescriptor.getSerdeInfo)) + val fmt: String = format.getOrElse { + detectFormat(Try(t.getStorageDescriptor.getSerdeInfo.getSerializationLibrary).getOrElse("")).tap { s => + logger.info(s"Detect $s as format.") + } + } + val c: Option[String] = compression.orElse { + detectCompression(Try(t.getParameters.asScala.toMap).getOrElse(Map())).tap { opt => + opt.foreach(s => logger.info(s"Detect $s as compression.")) + } + } + val fd: Option[String] = fieldDelimiter.orElse { + detectFieldDelimiter(Try(t.getStorageDescriptor.getSerdeInfo.getParameters.asScala.toMap).getOrElse(Map())).tap { opt => + opt.foreach(s => logger.info(s"Detect $s as field_delimiter.")) + } + } val dummyTable: String = genDummyTableName() val subTask: Config = cf.create() subTask.set("+create-empty-dummy", buildCtasSubTaskConfig(tableName = dummyTable, - format = format, - compression = compression, - fieldDelimiter = fieldDelimiter, + format = fmt, + compression = c, + fieldDelimiter = fd, tableMode = "empty")) subTask.set("+diff-schema", buildDiffSchemaInternalSubTaskConfig(comparisonTableName = dummyTable)) subTask.set("+drop-dummy", buildDropTableSubTaskConfig(tableName = dummyTable)) subTask.set("+store-data-by-ctas", buildCtasSubTaskConfig(tableName = dummyTable, outputLocation = Option(location), - format = format, - compression = compression, - fieldDelimiter = fieldDelimiter, + format = fmt, + compression = c, + fieldDelimiter = fd, tableMode = "data_only")) subTask.set("+add-partition", buildAddPartitionSubTaskConfig()) @@ -162,9 +175,10 @@ class AthenaApasOperator(operatorName: String, None } - protected def detectFieldDelimiter(serDeInfo: SerDeInfo): Option[String] = + protected def detectFieldDelimiter(serDeProperties: Map[String, String]): Option[String] = { - Option(serDeInfo.getParameters.getOrDefault("field.delim", null)) + if (serDeProperties.contains("field.delim")) return Option(serDeProperties("field.delim")) + None } protected def genDummyTableName(): String =