Skip to content

Commit

Permalink
Merge pull request #56 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.2.1
  • Loading branch information
civitaspo authored Jul 16, 2019
2 parents 1a98ee8 + ff30b02 commit 30be2c0
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 19 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
0.2.1 (2019-07-16)
==================

* [Enhancement] Add `format`, `compression` and `field_delimiter` options to `athena.apas>` operator. If not set, these are detected automatically.

0.2.0 (2019-07-16)
==================

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.2.0
- pro.civitaspo:digdag-operator-athena:0.2.1
athena:
auth_method: profile

Expand Down Expand Up @@ -125,6 +125,9 @@ Nothing
- **partition_kv**: key-value pairs for partitioning (string to string map, required)
- **location**: The location of the partition. If not specified, this operator generates like hive automatically. (string, default: auto generated like the below)
- `${table location}/${partition key1}=${partition value1}/${partition key2}=${partition value2}/...`
- **format**: The data format for the query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: auto detection)
- **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: auto detection)
- **field_delimiter**: The field delimiter for files in CSV, TSV, and text files. This option is applied only when **format** is specific to text-based data storage formats. (string, auto detection)
- **save_mode**: Specify the expected behavior. Available values are `"skip_if_exists"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`)
- `"skip_if_exists"`: Skip processing if the partition or the location exists.
- `"error_if_exists"`: Raise error if the partition or the location exists.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.2.0'
version = '0.2.1'

def digdagVersion = '0.9.37'
def awsSdkVersion = "1.11.587"
Expand Down
2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.2.0
- pro.civitaspo:digdag-operator-athena:0.2.1
athena:
auth_method: profile
value: 5
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package pro.civitaspo.digdag.plugin.athena.apas


import com.amazonaws.services.glue.model.{SerDeInfo, Table}
import com.amazonaws.services.glue.model.Table
import com.google.common.base.Optional
import io.digdag.client.config.{Config, ConfigException}
import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, TemplateEngine}
import io.digdag.util.DurationParam
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator

import scala.jdk.CollectionConverters._
import scala.util.Random
import scala.util.{Random, Try}
import scala.util.chaining._


class AthenaApasOperator(operatorName: String,
Expand Down Expand Up @@ -46,9 +47,9 @@ class AthenaApasOperator(operatorName: String,
protected val table: String = params.get("table", classOf[String])
protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String])
protected val partitionKv: Map[String, String] = params.getMap("partition_kv", classOf[String], classOf[String]).asScala.toMap
protected val _format: Option[String] = Option(params.getOptional("format", classOf[String]).orNull())
protected val _compression: Option[String] = Option(params.getOptional("compression", classOf[String]).orNull())
protected val _fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull())
protected val format: Option[String] = Option(params.getOptional("format", classOf[String]).orNull())
protected val compression: Option[String] = Option(params.getOptional("compression", classOf[String]).orNull())
protected val fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull())
protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite"))
protected val bucketedBy: Seq[String] = params.getListOrEmpty("bucketed_by", classOf[String]).asScala.toSeq
protected val bucketCount: Optional[Int] = params.getOptional("bucket_count", classOf[Int])
Expand Down Expand Up @@ -109,25 +110,37 @@ class AthenaApasOperator(operatorName: String,
if (!prepareCondition()) return TaskResult.empty(cf)

val t: Table = aws.glue.table.describe(catalogId, database, table)
val format: String = _format.getOrElse(detectFormat(t.getStorageDescriptor.getSerdeInfo.getSerializationLibrary))
val compression: Option[String] = _compression.orElse(detectCompression(t.getParameters.asScala.toMap))
val fieldDelimiter: Option[String] = _fieldDelimiter.orElse(detectFieldDelimiter(t.getStorageDescriptor.getSerdeInfo))
val fmt: String = format.getOrElse {
detectFormat(Try(t.getStorageDescriptor.getSerdeInfo.getSerializationLibrary).getOrElse("")).tap { s =>
logger.info(s"Detect $s as format.")
}
}
val c: Option[String] = compression.orElse {
detectCompression(Try(t.getParameters.asScala.toMap).getOrElse(Map())).tap { opt =>
opt.foreach(s => logger.info(s"Detect $s as compression."))
}
}
val fd: Option[String] = fieldDelimiter.orElse {
detectFieldDelimiter(Try(t.getStorageDescriptor.getSerdeInfo.getParameters.asScala.toMap).getOrElse(Map())).tap { opt =>
opt.foreach(s => logger.info(s"Detect $s as field_delimiter."))
}
}

val dummyTable: String = genDummyTableName()

val subTask: Config = cf.create()
subTask.set("+create-empty-dummy", buildCtasSubTaskConfig(tableName = dummyTable,
format = format,
compression = compression,
fieldDelimiter = fieldDelimiter,
format = fmt,
compression = c,
fieldDelimiter = fd,
tableMode = "empty"))
subTask.set("+diff-schema", buildDiffSchemaInternalSubTaskConfig(comparisonTableName = dummyTable))
subTask.set("+drop-dummy", buildDropTableSubTaskConfig(tableName = dummyTable))
subTask.set("+store-data-by-ctas", buildCtasSubTaskConfig(tableName = dummyTable,
outputLocation = Option(location),
format = format,
compression = compression,
fieldDelimiter = fieldDelimiter,
format = fmt,
compression = c,
fieldDelimiter = fd,
tableMode = "data_only"))
subTask.set("+add-partition", buildAddPartitionSubTaskConfig())

Expand Down Expand Up @@ -162,9 +175,10 @@ class AthenaApasOperator(operatorName: String,
None
}

protected def detectFieldDelimiter(serDeInfo: SerDeInfo): Option[String] =
protected def detectFieldDelimiter(serDeProperties: Map[String, String]): Option[String] =
{
Option(serDeInfo.getParameters.getOrDefault("field.delim", null))
if (serDeProperties.contains("field.delim")) return Option(serDeProperties("field.delim"))
None
}

protected def genDummyTableName(): String =
Expand Down

0 comments on commit 30be2c0

Please sign in to comment.