Skip to content

Commit

Permalink
Merge pull request #22 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.0.5
  • Loading branch information
civitaspo authored Sep 22, 2018
2 parents 96a3b96 + 922ecfb commit 3aa993a
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 10 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
0.0.5 (2018-09-23)
==================

* [New Feature] Add `athena.preview` operator
* [New Feature] Add `preview` option for `athena.query` operator. This option is true, then run `athena.preview` operator after `athena.query` is executed.
* [Enhancement] Add `athena.remove_metadata` operator for internal use. `athena.query` execute this when `keep_metadata` is false after `athena.preview` operator because `athena.preview` requires matadata.

0.0.4 (2018-08-13)
==================

Expand Down
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.0.4
- pro.civitaspo:digdag-operator-athena:0.0.5
athena:
auth_method: profile

Expand Down Expand Up @@ -85,12 +85,15 @@ Define the below options on properties (which is indicated by `-c`, `--config`).
- **database**: The name of the database. (string, optional)
- **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required)
- **keep_metadata**: Indicate whether to keep the metadata after executing the query. (boolean, default: `false`)
- **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata.
- **save_mode**: Specify the expected behavior of saving the query results. Available values are `"append"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`)
- `"append"`: When saving the query results, even if other CSVs already exist, the query results are expected to be saved as another CSV.
- `"error_if_exists"`: When saving the query results, if other CSVs already exists, an exception is expected to be thrown.
- `"ignore"`: When saving the query results, if other CSVs already exists, the save operation is expected to not save the query results and to not change the existing data.
- `"overwrite"`: When saving the query results, if other CSVs already exist, existing data is expected to be overwritten by the query results. This operation is not atomic.
- **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`)
- **preview**: Call `athena.preview>` operator after run `athena.query>`. (boolean, default: `true`)
- **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata.

### Output Parameters

Expand All @@ -105,6 +108,28 @@ Define the below options on properties (which is indicated by `-c`, `--config`).
- **athena.last_query.submitted_at**: The unix timestamp that the query was submitted. (integer)
- **athena.last_query.completed_at**: The unix timestamp that the query completed. (integer)

## Configuration for `athena.preview>` operator

### Options

- **athena.preview>**: The identifier for the query execution that is succeeded. (string, required)
- **max_rows**: The maximum number of rows to preview. 0 ~ 100 is valid. (integer, default: `10`)

### Output Parameters

- **athena.last_preview.id**: The identifier for the query execution. (string)
- **athena.last_preview.columns**: The information that describes the column structure and data types of a table of query results. (map of array)
- **case_sensitive**: Indicates whether values in the column are case-sensitive. (boolean)
- **catalog**: The catalog to which the query results belong. (string)
- **label**: A column label. (string)
- **name**: The name of the column. (string)
- **nullable**: Indicates the column's nullable status. (one of `NOT_NULL`, `NULLABLE`, `UNKNOWN`)
- **precision**: For `DECIMAL` data types, specifies the total number of digits, up to 38. For performance reasons, we recommend up to 18 digits. (integer)
- **scale**: For `DECIMAL` data types, specifies the total number of digits in the fractional part of the value. Defaults to 0. (integer)
- **database**: The database name to which the query results belong. (string)
- **table**: The table name for the query results. (string)
- **type**: The data type of the column. (string)
- **athena.last_preview.rows**: The rows in the preview results. (array of array)

# Development

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.4'
version = '0.0.5'

def digdagVersion = '0.9.27'
def awsSdkVersion = "1.11.372"
Expand Down
4 changes: 2 additions & 2 deletions example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.0.4
- pro.civitaspo:digdag-operator-athena:0.0.5
athena:
auth_method: profile
query:
Expand All @@ -15,5 +15,5 @@ _export:
athena.query>: template.sql

+step2:
echo>: ${athena.last_query}
echo>: ${athena}

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.lang.reflect.Constructor
import io.digdag.client.config.Config
import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine}
import javax.inject.Inject
import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaQueryOperator}
import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator, AthenaRemoveMetadataOperator}

object AthenaPlugin {

Expand All @@ -16,7 +16,11 @@ object AthenaPlugin {
@Inject protected var templateEngine: TemplateEngine = null

override def get(): JList[OperatorFactory] = {
JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator]))
JArrays.asList(
operatorFactory("athena.query", classOf[AthenaQueryOperator]),
operatorFactory("athena.preview", classOf[AthenaPreviewOperator]),
operatorFactory("athena.remove_metadata", classOf[AthenaRemoveMetadataOperator])
)
}

private def operatorFactory[T <: AbstractAthenaOperator](operatorName: String, klass: Class[T]): OperatorFactory = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package pro.civitaspo.digdag.plugin.athena.operator
import com.amazonaws.services.athena.model.{GetQueryResultsRequest, GetQueryResultsResult}
import com.google.common.base.Optional
import com.google.common.collect.ImmutableList
import io.digdag.client.config.{Config, ConfigKey}
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}

import scala.collection.JavaConverters._
import scala.util.Try

class AthenaPreviewOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) {

protected val execId: String = params.get("_command", classOf[String])
protected val maxRows: Int = params.get("max_rows", classOf[Int], 10)

protected case class LastPreview(
id: String,
columns: Seq[LastPreviewColumnInfo],
rows: Seq[Seq[String]] // TODO: Support types JSON can express
)

protected case class LastPreviewColumnInfo(
caseSensitive: Option[Boolean],
catalog: Option[String],
label: Option[String],
name: String,
nullable: Option[String],
precision: Option[Int],
scale: Option[Int],
database: Option[String],
table: Option[String],
`type`: String
)

protected object LastPreview {

def apply(id: String, r: GetQueryResultsResult): LastPreview = {
new LastPreview(
id = id,
columns = r.getResultSet.getResultSetMetadata.getColumnInfo.asScala.map { ci =>
LastPreviewColumnInfo(
caseSensitive = Try(Option(Boolean.unbox(ci.getCaseSensitive))).getOrElse(None),
catalog = Try(Option(ci.getCatalogName)).getOrElse(None),
label = Try(Option(ci.getLabel)).getOrElse(None),
name = ci.getName,
nullable = Try(Option(ci.getNullable)).getOrElse(None),
precision = Try(Option(ci.getPrecision.toInt)).getOrElse(None),
scale = Try(Option(ci.getScale.toInt)).getOrElse(None),
database = Try(Option(ci.getSchemaName)).getOrElse(None),
table = Try(Option(ci.getTableName)).getOrElse(None),
`type` = ci.getType
)
},
rows = r.getResultSet.getRows.asScala.map(_.getData.asScala.map(_.getVarCharValue)).tail // the first row is column names
)
}
}

override def runTask(): TaskResult = {
val lastPreview: LastPreview = preview()

val table = Tabulator.format(Seq(lastPreview.columns.map(_.name)) ++ lastPreview.rows)
logger.info(s"[${operatorName}] Preview rows.\n$table")

val p: Config = buildLastPreviewParam(lastPreview)

val builder = TaskResult.defaultBuilder(request)
builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_preview")))
builder.storeParams(p)
builder.build()
}

protected def preview(): LastPreview = {
def requestRecursive(nextToken: Option[String] = None): LastPreview = {
val req: GetQueryResultsRequest = new GetQueryResultsRequest()
.withQueryExecutionId(execId)
.withMaxResults(maxRows)

if (nextToken.isDefined) req.setNextToken(nextToken.get)

logger.info(s"[$operatorName] req => $req")
val res = withAthena(_.getQueryResults(req))
val previewResult = LastPreview(execId, res)

Option(res.getNextToken) match {
case None => previewResult
case Some(token) =>
val next = requestRecursive(Option(token))
LastPreview(id = execId, columns = previewResult.columns, rows = previewResult.rows ++ next.rows)
}
}
requestRecursive()
}

protected def buildLastPreviewParam(lastPreview: LastPreview): Config = {
val ret = cf.create()
val lastPreviewParam = ret.getNestedOrSetEmpty("athena").getNestedOrSetEmpty("last_preview")

lastPreviewParam.set("id", lastPreview.id)
val columns = lastPreview.columns.map { ci =>
val cp = cf.create()
cp.set("case_sensitive", ci.caseSensitive.getOrElse(Optional.absent()))
cp.set("catalog", ci.catalog.getOrElse(Optional.absent()))
cp.set("label", ci.label.getOrElse(Optional.absent()))
cp.set("name", ci.name)
cp.set("nullable", ci.nullable.getOrElse(Optional.absent()))
cp.set("precision", ci.precision.getOrElse(Optional.absent()))
cp.set("scale", ci.scale.getOrElse(Optional.absent()))
cp.set("database", ci.database.getOrElse(Optional.absent()))
cp.set("table", ci.table.getOrElse(Optional.absent()))
cp.set("type", ci.`type`)
}
lastPreviewParam.set("columns", columns.asJava)
lastPreviewParam.set("rows", lastPreview.rows.map(_.asJava).asJava)

ret
}

protected object Tabulator {

def format(table: Seq[Seq[Any]]): String = table match {
case Seq() => ""
case _ =>
val sizes = for (row <- table) yield for (cell <- row) yield if (cell == null) 0 else cell.toString.length
val colSizes = for (col <- sizes.transpose) yield col.max
val rows = for (row <- table) yield formatRow(row, colSizes)
formatRows(rowSeparator(colSizes), rows)
}

def formatRows(rowSeparator: String, rows: Seq[String]): String =
(rowSeparator ::
rows.head ::
rowSeparator ::
rows.tail.toList :::
rowSeparator ::
List()).mkString("\n")

def formatRow(row: Seq[Any], colSizes: Seq[Int]): String = {
val cells = for ((item, size) <- row.zip(colSizes)) yield if (size == 0) "" else ("%" + size + "s").format(item)
cells.mkString("|", "|", "|")
}

def rowSeparator(colSizes: Seq[Int]) = colSizes map { "-" * _ } mkString ("+", "+", "+")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system
protected val keepMetadata: Boolean = params.get("keep_metadata", classOf[Boolean], false)
protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite"))
protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m"))
protected val preview: Boolean = params.get("preview", classOf[Boolean], true)

protected lazy val query: String = {
val t = Try {
Expand Down Expand Up @@ -135,17 +136,14 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system
withS3(_.deleteObject(summary.getBucketName, summary.getKey))
}
}
if (!keepMetadata) {
logger.info(s"[$operatorName] Delete ${lastQuery.outputCsvMetadataUri.toString}.")
withS3(_.deleteObject(lastQuery.outputCsvMetadataUri.getBucket, lastQuery.outputCsvMetadataUri.getKey))
}

logger.info(s"[$operatorName] Created ${lastQuery.outputCsvUri} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)")
val p: Config = buildLastQueryParam(lastQuery)

val builder = TaskResult.defaultBuilder(request)
builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query")))
builder.storeParams(p)
builder.subtaskConfig(buildSubTaskConfig(lastQuery))
builder.build()
}

Expand Down Expand Up @@ -221,4 +219,52 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system

ret
}

protected def buildSubTaskConfig(lastQuery: LastQuery): Config = {
val subTask: Config = cf.create()
val export: Config = subTask.getNestedOrSetEmpty("_export").getNestedOrSetEmpty("athena")

export.set("auth_method", authMethod)
export.set("profile_name", profileName)
if (profileFile.isPresent) export.set("profile_file", profileFile.get())
export.set("use_http_proxy", useHttpProxy)
if (region.isPresent) export.set("region", region.get())
if (endpoint.isPresent) export.set("endpoint", endpoint.get())

if (preview) subTask.setNested("+run-preview", buildPreviewSubTaskConfig(lastQuery))
if (!keepMetadata) subTask.setNested("+run-remove-matadata", buildRemoveMetadataSubTaskConfig(lastQuery))

subTask
}

protected def buildPreviewSubTaskConfig(lastQuery: LastQuery): Config = {
val subTask: Config = cf.create()
subTask.set("_type", "athena.preview")
subTask.set("_command", lastQuery.id)
subTask.set("max_rows", 10)

subTask.set("auth_method", authMethod)
subTask.set("profile_name", profileName)
if (profileFile.isPresent) subTask.set("profile_file", profileFile.get())
subTask.set("use_http_proxy", useHttpProxy)
if (region.isPresent) subTask.set("region", region.get())
if (endpoint.isPresent) subTask.set("endpoint", endpoint.get())

subTask
}

protected def buildRemoveMetadataSubTaskConfig(lastQuery: LastQuery): Config = {
val subTask: Config = cf.create()
subTask.set("_type", "athena.remove_metadata")
subTask.set("_command", lastQuery.outputCsvMetadataUri.toString)

subTask.set("auth_method", authMethod)
subTask.set("profile_name", profileName)
if (profileFile.isPresent) subTask.set("profile_file", profileFile.get())
subTask.set("use_http_proxy", useHttpProxy)
if (region.isPresent) subTask.set("region", region.get())
if (endpoint.isPresent) subTask.set("endpoint", endpoint.get())

subTask
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pro.civitaspo.digdag.plugin.athena.operator
import com.amazonaws.services.s3.AmazonS3URI
import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}

class AthenaRemoveMetadataOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) {

object AmazonS3URI {
def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false)
}

protected val metadataUri: AmazonS3URI = AmazonS3URI(params.get("_command", classOf[String]))

override def runTask(): TaskResult = {
logger.info(s"[$operatorName] Delete ${metadataUri.toString}.")
withS3(_.deleteObject(metadataUri.getBucket, metadataUri.getKey))
TaskResult.empty(request)
}
}

0 comments on commit 3aa993a

Please sign in to comment.