From 99d0a28f717439436931e74b9be1983744ed8faf Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 12 Sep 2018 09:02:02 +0900 Subject: [PATCH 01/13] Write athena.preview operator usage and skeleton --- README.md | 22 +++++++++++++++++++ .../operator/AthenaPreviewOperator.scala | 8 +++++++ 2 files changed, 30 insertions(+) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala diff --git a/README.md b/README.md index bb58ce3..13b56e7 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,28 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **athena.last_query.submitted_at**: The unix timestamp that the query was submitted. (integer) - **athena.last_query.completed_at**: The unix timestamp that the query completed. (integer) +## Configuration for `athena.preview>` operator + +### Options + +- **athena.preview>**: The identifier for the query execution that is succeeded. (string, required) +- **max_rows**: The maximum number of rows to preview. 0 ~ 100 is valid. (integer, default: `10`) + +### Output Parameters + +- **athena.last_preview.id**: The identifier for the query execution. (string) +- **athena.last_preview.columns**: The information that describes the column structure and data types of a table of query results. (map of array) + - **case_sensitive**: Indicates whether values in the column are case-sensitive. (boolean) + - **catalog_name**: The catalog to which the query results belong. (string) + - **label**: A column label. (string) + - **name**: The name of the column. (string) + - **nullable**: Indicates the column's nullable status. (one of `NOT_NULL`, `NULLABLE`, `UNKNOWN`) + - **precision**: For `DECIMAL` data types, specifies the total number of digits, up to 38. For performance reasons, we recommend up to 18 digits. (integer) + - **scale**: For `DECIMAL` data types, specifies the total number of digits in the fractional part of the value. Defaults to 0. (integer) + - **database**: The database name to which the query results belong. (string) + - **table**: The table name for the query results. (string) + - **type**: The data type of the column. (string) +- **athena.last_preview.rows**: The rows in the preview results. (array of array) # Development diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala new file mode 100644 index 0000000..ec8d403 --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -0,0 +1,8 @@ +package pro.civitaspo.digdag.plugin.athena.operator +import io.digdag.client.config.Config +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} + +class AthenaPreviewOperator (operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { + override def runTask(): TaskResult = ??? +} From 43f04a32749476686ae1bd2a82ad516c1eec1942 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Sep 2018 20:06:36 +0900 Subject: [PATCH 02/13] Change output param key name --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 13b56e7..acca006 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,7 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **athena.last_preview.id**: The identifier for the query execution. (string) - **athena.last_preview.columns**: The information that describes the column structure and data types of a table of query results. (map of array) - **case_sensitive**: Indicates whether values in the column are case-sensitive. (boolean) - - **catalog_name**: The catalog to which the query results belong. (string) + - **catalog**: The catalog to which the query results belong. (string) - **label**: A column label. (string) - **name**: The name of the column. (string) - **nullable**: Indicates the column's nullable status. (one of `NOT_NULL`, `NULLABLE`, `UNKNOWN`) From ee608f56945187432e904b3bc8f197610f48d63a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Sep 2018 20:18:07 +0900 Subject: [PATCH 03/13] Impl athena.preview> operator --- .../operator/AthenaPreviewOperator.scala | 143 +++++++++++++++++- 1 file changed, 141 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala index ec8d403..71a705a 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -1,8 +1,147 @@ package pro.civitaspo.digdag.plugin.athena.operator -import io.digdag.client.config.Config +import com.amazonaws.services.athena.model.{GetQueryResultsRequest, GetQueryResultsResult} +import com.google.common.base.Optional +import com.google.common.collect.ImmutableList +import io.digdag.client.config.{Config, ConfigKey} import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} +import scala.collection.JavaConverters._ +import scala.util.Try + class AthenaPreviewOperator (operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { - override def runTask(): TaskResult = ??? + + protected val execId: String = params.get("_command", classOf[String]) + protected val maxRows: Int = params.get("max_rows", classOf[Int], 10) + + protected case class LastPreview( + id: String, + columns: Seq[LastPreviewColumnInfo], + rows: Seq[Seq[String]] // TODO: Support types JSON can express + ) + + protected case class LastPreviewColumnInfo( + caseSensitive: Option[Boolean], + catalog: Option[String], + label: Option[String], + name: String, + nullable: Option[String], + precision: Option[Int], + scale: Option[Int], + database: Option[String], + table: Option[String], + `type`: String + ) + + protected object LastPreview { + + def apply(id: String, r: GetQueryResultsResult): LastPreview = { + new LastPreview( + id = id, + columns = r.getResultSet.getResultSetMetadata.getColumnInfo.asScala.map { ci => + LastPreviewColumnInfo( + caseSensitive = Try(Option(Boolean(ci.getCaseSensitive))).getOrElse(None), + catalog = Try(Option(ci.getCatalogName)).getOrElse(None), + label = Try(Option(ci.getLabel)).getOrElse(None), + name = ci.getName, + nullable = Try(Option(ci.getNullable)).getOrElse(None), + precision = Try(Option(ci.getPrecision.toInt)).getOrElse(None), + scale = Try(Option(ci.getScale.toInt)).getOrElse(None), + database = Try(Option(ci.getSchemaName)).getOrElse(None), + table = Try(Option(ci.getTableName)).getOrElse(None), + `type` = ci.getType + ) + }, + rows = r.getResultSet.getRows.asScala.map { row => row.getData.asScala.map(_.getVarCharValue) + } + ) + } + } + + override def runTask(): TaskResult = { + val lastPreview: LastPreview = preview() + + val table = Tabulator.format(Seq(lastPreview.columns.map(_.name)) ++ lastPreview.rows) + logger.info(s"[${operatorName}] Preview rows.\n$table") + + val p: Config = buildLastPreviewParam(lastPreview) + + val builder = TaskResult.defaultBuilder(request) + builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_preview"))) + builder.storeParams(p) + builder.build() + } + + protected def preview(): LastPreview = { + def requestRecursive(nextToken: Option[String] = None): LastPreview = { + val req: GetQueryResultsRequest = new GetQueryResultsRequest() + .withQueryExecutionId(execId) + .withMaxResults(maxRows) + + if (nextToken.isDefined) req.setNextToken(nextToken.get) + + logger.info(s"[$operatorName] req => $req") + val res = withAthena(_.getQueryResults(req)) + val previewResult = LastPreview(execId, res) + + Option(res.getNextToken) match { + case None => previewResult + case Some(token) => + val next = requestRecursive(Option(token)) + LastPreview(id = execId, columns = previewResult.columns, rows = previewResult.rows ++ next.rows) + } + } + requestRecursive() + } + + protected def buildLastPreviewParam(lastPreview: LastPreview): Config = { + val ret = cf.create() + val lastPreviewParam = ret.getNestedOrSetEmpty("athena").getNestedOrSetEmpty("last_preview") + + lastPreviewParam.set("id", lastPreview.id) + val columns = lastPreview.columns.map { ci => + val cp = cf.create() + cp.set("case_sensitive", ci.caseSensitive.getOrElse(Optional.absent())) + cp.set("catalog", ci.catalog.getOrElse(Optional.absent())) + cp.set("label", ci.label.getOrElse(Optional.absent())) + cp.set("name", ci.name) + cp.set("nullable", ci.nullable.getOrElse(Optional.absent())) + cp.set("precision", ci.precision.getOrElse(Optional.absent())) + cp.set("scale", ci.scale.getOrElse(Optional.absent())) + cp.set("database", ci.database.getOrElse(Optional.absent())) + cp.set("table", ci.table.getOrElse(Optional.absent())) + cp.set("type", ci.`type`) + } + lastPreviewParam.set("columns", columns.asJava) + lastPreviewParam.set("rows", lastPreview.rows.map(_.asJava).asJava) + + ret + } + + protected object Tabulator { + + def format(table: Seq[Seq[Any]]) = table match { + case Seq() => "" + case _ => + val sizes = for (row <- table) yield (for (cell <- row) yield if (cell == null) 0 else cell.toString.length) + val colSizes = for (col <- sizes.transpose) yield col.max + val rows = for (row <- table) yield formatRow(row, colSizes) + formatRows(rowSeparator(colSizes), rows) + } + + def formatRows(rowSeparator: String, rows: Seq[String]): String = + (rowSeparator :: + rows.head :: + rowSeparator :: + rows.tail.toList ::: + rowSeparator :: + List()).mkString("\n") + + def formatRow(row: Seq[Any], colSizes: Seq[Int]) = { + val cells = for ((item, size) <- row.zip(colSizes)) yield if (size == 0) "" else ("%" + size + "s").format(item) + cells.mkString("|", "|", "|") + } + + def rowSeparator(colSizes: Seq[Int]) = colSizes map { "-" * _ } mkString ("+", "+", "+") + } } From d1cabc9f1a31c49517b13089ca668c371a210159 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Sep 2018 20:26:39 +0900 Subject: [PATCH 04/13] Apply athena.preview> to athena.query> --- .../digdag/plugin/athena/AthenaPlugin.scala | 4 ++-- .../athena/operator/AthenaQueryOperator.scala | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index 7385289..6c4da7e 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -6,7 +6,7 @@ import java.lang.reflect.Constructor import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject -import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaQueryOperator} +import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator} object AthenaPlugin { @@ -16,7 +16,7 @@ object AthenaPlugin { @Inject protected var templateEngine: TemplateEngine = null override def get(): JList[OperatorFactory] = { - JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator])) + JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator]), operatorFactory("athena.preview", classOf[AthenaPreviewOperator])) } private def operatorFactory[T <: AbstractAthenaOperator](operatorName: String, klass: Class[T]): OperatorFactory = { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 017c075..a1c6aa9 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -94,6 +94,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system protected val keepMetadata: Boolean = params.get("keep_metadata", classOf[Boolean], false) protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite")) protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) + protected val preview: Boolean = params.get("preview", classOf[Boolean], true) protected lazy val query: String = { val t = Try { @@ -146,6 +147,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system val builder = TaskResult.defaultBuilder(request) builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query"))) builder.storeParams(p) + if (preview) builder.subtaskConfig(buildPreviewSubTaskConfig(lastQuery)) builder.build() } @@ -221,4 +223,20 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system ret } + + protected def buildPreviewSubTaskConfig(lastQuery: LastQuery): Config = { + val subTask: Config = cf.create() + subTask.set("_type", "athena.preview") + subTask.set("_command", lastQuery.id) + subTask.set("max_rows", 10) + + subTask.set("auth_method", authMethod) + subTask.set("profile_name", profileName) + if (profileFile.isPresent) subTask.set("profile_file", profileFile.get()) + subTask.set("use_http_proxy", useHttpProxy) + if (region.isPresent) subTask.set("region", region.get()) + if (endpoint.isPresent) subTask.set("endpoint", endpoint.get()) + + subTask + } } From 047587619e2194c8bd6a073e1129ad3d4d2d7eec Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Sep 2018 21:45:36 +0900 Subject: [PATCH 05/13] Fix boolean boxing --- .../digdag/plugin/athena/operator/AthenaPreviewOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala index 71a705a..97564f3 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -40,7 +40,7 @@ class AthenaPreviewOperator (operatorName: String, context: OperatorContext, sys id = id, columns = r.getResultSet.getResultSetMetadata.getColumnInfo.asScala.map { ci => LastPreviewColumnInfo( - caseSensitive = Try(Option(Boolean(ci.getCaseSensitive))).getOrElse(None), + caseSensitive = Try(Option(Boolean.unbox(ci.getCaseSensitive))).getOrElse(None), catalog = Try(Option(ci.getCatalogName)).getOrElse(None), label = Try(Option(ci.getLabel)).getOrElse(None), name = ci.getName, From 4ff79b194d1d5b73c4d6f336e408e92f41d202b0 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Sep 2018 21:46:48 +0900 Subject: [PATCH 06/13] Apply code instruction --- .../plugin/athena/operator/AthenaPreviewOperator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala index 97564f3..b5feecb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -120,10 +120,10 @@ class AthenaPreviewOperator (operatorName: String, context: OperatorContext, sys protected object Tabulator { - def format(table: Seq[Seq[Any]]) = table match { + def format(table: Seq[Seq[Any]]): String = table match { case Seq() => "" case _ => - val sizes = for (row <- table) yield (for (cell <- row) yield if (cell == null) 0 else cell.toString.length) + val sizes = for (row <- table) yield for (cell <- row) yield if (cell == null) 0 else cell.toString.length val colSizes = for (col <- sizes.transpose) yield col.max val rows = for (row <- table) yield formatRow(row, colSizes) formatRows(rowSeparator(colSizes), rows) @@ -137,7 +137,7 @@ class AthenaPreviewOperator (operatorName: String, context: OperatorContext, sys rowSeparator :: List()).mkString("\n") - def formatRow(row: Seq[Any], colSizes: Seq[Int]) = { + def formatRow(row: Seq[Any], colSizes: Seq[Int]): String = { val cells = for ((item, size) <- row.zip(colSizes)) yield if (size == 0) "" else ("%" + size + "s").format(item) cells.mkString("|", "|", "|") } From 0543a7ba6fcb718d6f9af70f6a64e8c61c078eeb Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Sep 2018 21:51:02 +0900 Subject: [PATCH 07/13] Use rows.tail as rows --- .../digdag/plugin/athena/operator/AthenaPreviewOperator.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala index b5feecb..1f32f59 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -52,8 +52,7 @@ class AthenaPreviewOperator (operatorName: String, context: OperatorContext, sys `type` = ci.getType ) }, - rows = r.getResultSet.getRows.asScala.map { row => row.getData.asScala.map(_.getVarCharValue) - } + rows = r.getResultSet.getRows.asScala.map(_.getData.asScala.map(_.getVarCharValue)).tail // the first row is column names ) } } From 42bb64724616bee5b06527e3ab925dc3348ad66e Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 20 Sep 2018 09:15:26 +0900 Subject: [PATCH 08/13] Add athena.remove_metadata operator for internal use --- .../digdag/plugin/athena/AthenaPlugin.scala | 8 ++++++-- .../athena/operator/AthenaQueryOperator.scala | 15 ++++++++++++++ .../AthenaRemoveMetadataOperator.scala | 20 +++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index 6c4da7e..fd74e82 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -6,7 +6,7 @@ import java.lang.reflect.Constructor import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject -import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator} +import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator, AthenaRemoveMetadataOperator} object AthenaPlugin { @@ -16,7 +16,11 @@ object AthenaPlugin { @Inject protected var templateEngine: TemplateEngine = null override def get(): JList[OperatorFactory] = { - JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator]), operatorFactory("athena.preview", classOf[AthenaPreviewOperator])) + JArrays.asList( + operatorFactory("athena.query", classOf[AthenaQueryOperator]), + operatorFactory("athena.preview", classOf[AthenaPreviewOperator]), + operatorFactory("athena.remove_metadata", classOf[AthenaRemoveMetadataOperator]) + ) } private def operatorFactory[T <: AbstractAthenaOperator](operatorName: String, klass: Class[T]): OperatorFactory = { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index a1c6aa9..6bfc9fb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -239,4 +239,19 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system subTask } + + protected def buildRemoveMetadataSubTaskConfig(lastQuery: LastQuery): Config = { + val subTask: Config = cf.create() + subTask.set("_type", "athena.remove_metadata") + subTask.set("_command", lastQuery.outputCsvMetadataUri.toString) + + subTask.set("auth_method", authMethod) + subTask.set("profile_name", profileName) + if (profileFile.isPresent) subTask.set("profile_file", profileFile.get()) + subTask.set("use_http_proxy", useHttpProxy) + if (region.isPresent) subTask.set("region", region.get()) + if (endpoint.isPresent) subTask.set("endpoint", endpoint.get()) + + subTask + } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala new file mode 100644 index 0000000..88636ce --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala @@ -0,0 +1,20 @@ +package pro.civitaspo.digdag.plugin.athena.operator +import com.amazonaws.services.s3.AmazonS3URI +import io.digdag.client.config.Config +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} + +class AthenaRemoveMetadataOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { + + object AmazonS3URI { + def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) + } + + protected val metadataUri: AmazonS3URI = AmazonS3URI(params.get("_command", classOf[String])) + + override def runTask(): TaskResult = { + logger.info(s"[$operatorName] Delete ${metadataUri.toString}.") + withS3(_.deleteObject(metadataUri.getBucket, metadataUri.getKey)) + TaskResult.empty(request) + } +} From 9d08ca6480a3bb246a4412df5c50e4af18df65aa Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 23 Sep 2018 03:03:08 +0900 Subject: [PATCH 09/13] Apply subtasks to athena.query operator --- .../athena/operator/AthenaQueryOperator.scala | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 6bfc9fb..3c11d3a 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -136,10 +136,6 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system withS3(_.deleteObject(summary.getBucketName, summary.getKey)) } } - if (!keepMetadata) { - logger.info(s"[$operatorName] Delete ${lastQuery.outputCsvMetadataUri.toString}.") - withS3(_.deleteObject(lastQuery.outputCsvMetadataUri.getBucket, lastQuery.outputCsvMetadataUri.getKey)) - } logger.info(s"[$operatorName] Created ${lastQuery.outputCsvUri} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") val p: Config = buildLastQueryParam(lastQuery) @@ -147,7 +143,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system val builder = TaskResult.defaultBuilder(request) builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query"))) builder.storeParams(p) - if (preview) builder.subtaskConfig(buildPreviewSubTaskConfig(lastQuery)) + builder.subtaskConfig(buildSubTaskConfig(lastQuery)) builder.build() } @@ -224,6 +220,23 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system ret } + protected def buildSubTaskConfig(lastQuery: LastQuery): Config = { + val subTask: Config = cf.create() + val export: Config = subTask.getNestedOrSetEmpty("_export").getNestedOrSetEmpty("athena") + + export.set("auth_method", authMethod) + export.set("profile_name", profileName) + if (profileFile.isPresent) export.set("profile_file", profileFile.get()) + export.set("use_http_proxy", useHttpProxy) + if (region.isPresent) export.set("region", region.get()) + if (endpoint.isPresent) export.set("endpoint", endpoint.get()) + + if (preview) subTask.setNested("+run-preview", buildPreviewSubTaskConfig(lastQuery)) + if (!keepMetadata) subTask.setNested("+run-remove-matadata", buildRemoveMetadataSubTaskConfig(lastQuery)) + + subTask + } + protected def buildPreviewSubTaskConfig(lastQuery: LastQuery): Config = { val subTask: Config = cf.create() subTask.set("_type", "athena.preview") From dc447d1061ff8af455abcb0c1acebc741edda653 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 23 Sep 2018 03:03:39 +0900 Subject: [PATCH 10/13] Show ${athena} as output in example --- example/example.dig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/example.dig b/example/example.dig index 3fb4223..75d77e6 100644 --- a/example/example.dig +++ b/example/example.dig @@ -15,5 +15,5 @@ _export: athena.query>: template.sql +step2: - echo>: ${athena.last_query} + echo>: ${athena} From c350d3866eab1d8263bf162a4ebde546ea6a0bed Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 23 Sep 2018 03:09:47 +0900 Subject: [PATCH 11/13] Update README --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index acca006..6ce9f07 100644 --- a/README.md +++ b/README.md @@ -85,12 +85,15 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **database**: The name of the database. (string, optional) - **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required) - **keep_metadata**: Indicate whether to keep the metadata after executing the query. (boolean, default: `false`) + - **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata. - **save_mode**: Specify the expected behavior of saving the query results. Available values are `"append"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) - `"append"`: When saving the query results, even if other CSVs already exist, the query results are expected to be saved as another CSV. - `"error_if_exists"`: When saving the query results, if other CSVs already exists, an exception is expected to be thrown. - `"ignore"`: When saving the query results, if other CSVs already exists, the save operation is expected to not save the query results and to not change the existing data. - `"overwrite"`: When saving the query results, if other CSVs already exist, existing data is expected to be overwritten by the query results. This operation is not atomic. - **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`) +- **preview**: Call `athena.preview>` operator after run `athena.query>`. (boolean, default: `true`) + - **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata. ### Output Parameters From 5559522bef0ae05c745d17f8da737c2b9b606553 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 23 Sep 2018 03:15:53 +0900 Subject: [PATCH 12/13] ./gradlew spotlessapply --- .../digdag/plugin/athena/operator/AthenaPreviewOperator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala index 1f32f59..0d83dba 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -8,8 +8,8 @@ import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} import scala.collection.JavaConverters._ import scala.util.Try -class AthenaPreviewOperator (operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) - extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { +class AthenaPreviewOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { protected val execId: String = params.get("_command", classOf[String]) protected val maxRows: Int = params.get("max_rows", classOf[Int], 10) From cf51cc1c1f23cd4065db56ad79ca236a995df24e Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 23 Sep 2018 03:24:40 +0900 Subject: [PATCH 13/13] Ship v0.0.5 --- CHANGELOG.md | 7 +++++++ README.md | 2 +- build.gradle | 2 +- example/example.dig | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d86242..8c8ff53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +0.0.5 (2018-09-23) +================== + +* [New Feature] Add `athena.preview` operator +* [New Feature] Add `preview` option for `athena.query` operator. This option is true, then run `athena.preview` operator after `athena.query` is executed. +* [Enhancement] Add `athena.remove_metadata` operator for internal use. `athena.query` execute this when `keep_metadata` is false after `athena.preview` operator because `athena.preview` requires matadata. + 0.0.4 (2018-08-13) ================== diff --git a/README.md b/README.md index 6ce9f07..051872b 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.0.4 + - pro.civitaspo:digdag-operator-athena:0.0.5 athena: auth_method: profile diff --git a/build.gradle b/build.gradle index 77e49ca..55c1931 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.4' +version = '0.0.5' def digdagVersion = '0.9.27' def awsSdkVersion = "1.11.372" diff --git a/example/example.dig b/example/example.dig index 75d77e6..8e702f2 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.0.4 + - pro.civitaspo:digdag-operator-athena:0.0.5 athena: auth_method: profile query: