diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d86242..8c8ff53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +0.0.5 (2018-09-23) +================== + +* [New Feature] Add `athena.preview` operator +* [New Feature] Add `preview` option for `athena.query` operator. This option is true, then run `athena.preview` operator after `athena.query` is executed. +* [Enhancement] Add `athena.remove_metadata` operator for internal use. `athena.query` execute this when `keep_metadata` is false after `athena.preview` operator because `athena.preview` requires matadata. + 0.0.4 (2018-08-13) ================== diff --git a/README.md b/README.md index bb58ce3..051872b 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.0.4 + - pro.civitaspo:digdag-operator-athena:0.0.5 athena: auth_method: profile @@ -85,12 +85,15 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **database**: The name of the database. (string, optional) - **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required) - **keep_metadata**: Indicate whether to keep the metadata after executing the query. (boolean, default: `false`) + - **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata. - **save_mode**: Specify the expected behavior of saving the query results. Available values are `"append"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) - `"append"`: When saving the query results, even if other CSVs already exist, the query results are expected to be saved as another CSV. - `"error_if_exists"`: When saving the query results, if other CSVs already exists, an exception is expected to be thrown. - `"ignore"`: When saving the query results, if other CSVs already exists, the save operation is expected to not save the query results and to not change the existing data. - `"overwrite"`: When saving the query results, if other CSVs already exist, existing data is expected to be overwritten by the query results. This operation is not atomic. - **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`) +- **preview**: Call `athena.preview>` operator after run `athena.query>`. (boolean, default: `true`) + - **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata. ### Output Parameters @@ -105,6 +108,28 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **athena.last_query.submitted_at**: The unix timestamp that the query was submitted. (integer) - **athena.last_query.completed_at**: The unix timestamp that the query completed. (integer) +## Configuration for `athena.preview>` operator + +### Options + +- **athena.preview>**: The identifier for the query execution that is succeeded. (string, required) +- **max_rows**: The maximum number of rows to preview. 0 ~ 100 is valid. (integer, default: `10`) + +### Output Parameters + +- **athena.last_preview.id**: The identifier for the query execution. (string) +- **athena.last_preview.columns**: The information that describes the column structure and data types of a table of query results. (map of array) + - **case_sensitive**: Indicates whether values in the column are case-sensitive. (boolean) + - **catalog**: The catalog to which the query results belong. (string) + - **label**: A column label. (string) + - **name**: The name of the column. (string) + - **nullable**: Indicates the column's nullable status. (one of `NOT_NULL`, `NULLABLE`, `UNKNOWN`) + - **precision**: For `DECIMAL` data types, specifies the total number of digits, up to 38. For performance reasons, we recommend up to 18 digits. (integer) + - **scale**: For `DECIMAL` data types, specifies the total number of digits in the fractional part of the value. Defaults to 0. (integer) + - **database**: The database name to which the query results belong. (string) + - **table**: The table name for the query results. (string) + - **type**: The data type of the column. (string) +- **athena.last_preview.rows**: The rows in the preview results. (array of array) # Development diff --git a/build.gradle b/build.gradle index 77e49ca..55c1931 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.4' +version = '0.0.5' def digdagVersion = '0.9.27' def awsSdkVersion = "1.11.372" diff --git a/example/example.dig b/example/example.dig index 3fb4223..8e702f2 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.0.4 + - pro.civitaspo:digdag-operator-athena:0.0.5 athena: auth_method: profile query: @@ -15,5 +15,5 @@ _export: athena.query>: template.sql +step2: - echo>: ${athena.last_query} + echo>: ${athena} diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index 7385289..fd74e82 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -6,7 +6,7 @@ import java.lang.reflect.Constructor import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject -import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaQueryOperator} +import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator, AthenaRemoveMetadataOperator} object AthenaPlugin { @@ -16,7 +16,11 @@ object AthenaPlugin { @Inject protected var templateEngine: TemplateEngine = null override def get(): JList[OperatorFactory] = { - JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator])) + JArrays.asList( + operatorFactory("athena.query", classOf[AthenaQueryOperator]), + operatorFactory("athena.preview", classOf[AthenaPreviewOperator]), + operatorFactory("athena.remove_metadata", classOf[AthenaRemoveMetadataOperator]) + ) } private def operatorFactory[T <: AbstractAthenaOperator](operatorName: String, klass: Class[T]): OperatorFactory = { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala new file mode 100644 index 0000000..0d83dba --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaPreviewOperator.scala @@ -0,0 +1,146 @@ +package pro.civitaspo.digdag.plugin.athena.operator +import com.amazonaws.services.athena.model.{GetQueryResultsRequest, GetQueryResultsResult} +import com.google.common.base.Optional +import com.google.common.collect.ImmutableList +import io.digdag.client.config.{Config, ConfigKey} +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} + +import scala.collection.JavaConverters._ +import scala.util.Try + +class AthenaPreviewOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { + + protected val execId: String = params.get("_command", classOf[String]) + protected val maxRows: Int = params.get("max_rows", classOf[Int], 10) + + protected case class LastPreview( + id: String, + columns: Seq[LastPreviewColumnInfo], + rows: Seq[Seq[String]] // TODO: Support types JSON can express + ) + + protected case class LastPreviewColumnInfo( + caseSensitive: Option[Boolean], + catalog: Option[String], + label: Option[String], + name: String, + nullable: Option[String], + precision: Option[Int], + scale: Option[Int], + database: Option[String], + table: Option[String], + `type`: String + ) + + protected object LastPreview { + + def apply(id: String, r: GetQueryResultsResult): LastPreview = { + new LastPreview( + id = id, + columns = r.getResultSet.getResultSetMetadata.getColumnInfo.asScala.map { ci => + LastPreviewColumnInfo( + caseSensitive = Try(Option(Boolean.unbox(ci.getCaseSensitive))).getOrElse(None), + catalog = Try(Option(ci.getCatalogName)).getOrElse(None), + label = Try(Option(ci.getLabel)).getOrElse(None), + name = ci.getName, + nullable = Try(Option(ci.getNullable)).getOrElse(None), + precision = Try(Option(ci.getPrecision.toInt)).getOrElse(None), + scale = Try(Option(ci.getScale.toInt)).getOrElse(None), + database = Try(Option(ci.getSchemaName)).getOrElse(None), + table = Try(Option(ci.getTableName)).getOrElse(None), + `type` = ci.getType + ) + }, + rows = r.getResultSet.getRows.asScala.map(_.getData.asScala.map(_.getVarCharValue)).tail // the first row is column names + ) + } + } + + override def runTask(): TaskResult = { + val lastPreview: LastPreview = preview() + + val table = Tabulator.format(Seq(lastPreview.columns.map(_.name)) ++ lastPreview.rows) + logger.info(s"[${operatorName}] Preview rows.\n$table") + + val p: Config = buildLastPreviewParam(lastPreview) + + val builder = TaskResult.defaultBuilder(request) + builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_preview"))) + builder.storeParams(p) + builder.build() + } + + protected def preview(): LastPreview = { + def requestRecursive(nextToken: Option[String] = None): LastPreview = { + val req: GetQueryResultsRequest = new GetQueryResultsRequest() + .withQueryExecutionId(execId) + .withMaxResults(maxRows) + + if (nextToken.isDefined) req.setNextToken(nextToken.get) + + logger.info(s"[$operatorName] req => $req") + val res = withAthena(_.getQueryResults(req)) + val previewResult = LastPreview(execId, res) + + Option(res.getNextToken) match { + case None => previewResult + case Some(token) => + val next = requestRecursive(Option(token)) + LastPreview(id = execId, columns = previewResult.columns, rows = previewResult.rows ++ next.rows) + } + } + requestRecursive() + } + + protected def buildLastPreviewParam(lastPreview: LastPreview): Config = { + val ret = cf.create() + val lastPreviewParam = ret.getNestedOrSetEmpty("athena").getNestedOrSetEmpty("last_preview") + + lastPreviewParam.set("id", lastPreview.id) + val columns = lastPreview.columns.map { ci => + val cp = cf.create() + cp.set("case_sensitive", ci.caseSensitive.getOrElse(Optional.absent())) + cp.set("catalog", ci.catalog.getOrElse(Optional.absent())) + cp.set("label", ci.label.getOrElse(Optional.absent())) + cp.set("name", ci.name) + cp.set("nullable", ci.nullable.getOrElse(Optional.absent())) + cp.set("precision", ci.precision.getOrElse(Optional.absent())) + cp.set("scale", ci.scale.getOrElse(Optional.absent())) + cp.set("database", ci.database.getOrElse(Optional.absent())) + cp.set("table", ci.table.getOrElse(Optional.absent())) + cp.set("type", ci.`type`) + } + lastPreviewParam.set("columns", columns.asJava) + lastPreviewParam.set("rows", lastPreview.rows.map(_.asJava).asJava) + + ret + } + + protected object Tabulator { + + def format(table: Seq[Seq[Any]]): String = table match { + case Seq() => "" + case _ => + val sizes = for (row <- table) yield for (cell <- row) yield if (cell == null) 0 else cell.toString.length + val colSizes = for (col <- sizes.transpose) yield col.max + val rows = for (row <- table) yield formatRow(row, colSizes) + formatRows(rowSeparator(colSizes), rows) + } + + def formatRows(rowSeparator: String, rows: Seq[String]): String = + (rowSeparator :: + rows.head :: + rowSeparator :: + rows.tail.toList ::: + rowSeparator :: + List()).mkString("\n") + + def formatRow(row: Seq[Any], colSizes: Seq[Int]): String = { + val cells = for ((item, size) <- row.zip(colSizes)) yield if (size == 0) "" else ("%" + size + "s").format(item) + cells.mkString("|", "|", "|") + } + + def rowSeparator(colSizes: Seq[Int]) = colSizes map { "-" * _ } mkString ("+", "+", "+") + } +} diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 017c075..3c11d3a 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -94,6 +94,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system protected val keepMetadata: Boolean = params.get("keep_metadata", classOf[Boolean], false) protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite")) protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) + protected val preview: Boolean = params.get("preview", classOf[Boolean], true) protected lazy val query: String = { val t = Try { @@ -135,10 +136,6 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system withS3(_.deleteObject(summary.getBucketName, summary.getKey)) } } - if (!keepMetadata) { - logger.info(s"[$operatorName] Delete ${lastQuery.outputCsvMetadataUri.toString}.") - withS3(_.deleteObject(lastQuery.outputCsvMetadataUri.getBucket, lastQuery.outputCsvMetadataUri.getKey)) - } logger.info(s"[$operatorName] Created ${lastQuery.outputCsvUri} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") val p: Config = buildLastQueryParam(lastQuery) @@ -146,6 +143,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system val builder = TaskResult.defaultBuilder(request) builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query"))) builder.storeParams(p) + builder.subtaskConfig(buildSubTaskConfig(lastQuery)) builder.build() } @@ -221,4 +219,52 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system ret } + + protected def buildSubTaskConfig(lastQuery: LastQuery): Config = { + val subTask: Config = cf.create() + val export: Config = subTask.getNestedOrSetEmpty("_export").getNestedOrSetEmpty("athena") + + export.set("auth_method", authMethod) + export.set("profile_name", profileName) + if (profileFile.isPresent) export.set("profile_file", profileFile.get()) + export.set("use_http_proxy", useHttpProxy) + if (region.isPresent) export.set("region", region.get()) + if (endpoint.isPresent) export.set("endpoint", endpoint.get()) + + if (preview) subTask.setNested("+run-preview", buildPreviewSubTaskConfig(lastQuery)) + if (!keepMetadata) subTask.setNested("+run-remove-matadata", buildRemoveMetadataSubTaskConfig(lastQuery)) + + subTask + } + + protected def buildPreviewSubTaskConfig(lastQuery: LastQuery): Config = { + val subTask: Config = cf.create() + subTask.set("_type", "athena.preview") + subTask.set("_command", lastQuery.id) + subTask.set("max_rows", 10) + + subTask.set("auth_method", authMethod) + subTask.set("profile_name", profileName) + if (profileFile.isPresent) subTask.set("profile_file", profileFile.get()) + subTask.set("use_http_proxy", useHttpProxy) + if (region.isPresent) subTask.set("region", region.get()) + if (endpoint.isPresent) subTask.set("endpoint", endpoint.get()) + + subTask + } + + protected def buildRemoveMetadataSubTaskConfig(lastQuery: LastQuery): Config = { + val subTask: Config = cf.create() + subTask.set("_type", "athena.remove_metadata") + subTask.set("_command", lastQuery.outputCsvMetadataUri.toString) + + subTask.set("auth_method", authMethod) + subTask.set("profile_name", profileName) + if (profileFile.isPresent) subTask.set("profile_file", profileFile.get()) + subTask.set("use_http_proxy", useHttpProxy) + if (region.isPresent) subTask.set("region", region.get()) + if (endpoint.isPresent) subTask.set("endpoint", endpoint.get()) + + subTask + } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala new file mode 100644 index 0000000..88636ce --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala @@ -0,0 +1,20 @@ +package pro.civitaspo.digdag.plugin.athena.operator +import com.amazonaws.services.s3.AmazonS3URI +import io.digdag.client.config.Config +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} + +class AthenaRemoveMetadataOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { + + object AmazonS3URI { + def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) + } + + protected val metadataUri: AmazonS3URI = AmazonS3URI(params.get("_command", classOf[String])) + + override def runTask(): TaskResult = { + logger.info(s"[$operatorName] Delete ${metadataUri.toString}.") + withS3(_.deleteObject(metadataUri.getBucket, metadataUri.getKey)) + TaskResult.empty(request) + } +}