From 893a8841aba428b062a56ea0d791de3743cb401a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Cord=C3=B3n?= Date: Sat, 6 Mar 2021 18:17:44 +0000 Subject: [PATCH] Disambiguates throughput for reads and writes --- README.md | 4 ++-- .../dynamodb/connector/TableConnector.scala | 19 ++++++++++++------- .../connector/TableIndexConnector.scala | 2 +- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index a43ad7e..808af0b 100644 --- a/README.md +++ b/README.md @@ -93,14 +93,14 @@ The following parameters can be set as options on the Spark reader object before - `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false. - `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false` - `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true. -- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand. +- `readThroughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand. The following parameters can be set as options on the Spark writer object before saving. - `writeBatchSize` number of items to send per call to DynamoDB BatchWriteItem. Default 25. - `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity). - `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false. -- `throughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand. +- `writeThroughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand. - `inferSchema` if false will not automatically infer schema - this is useful when writing to a table with many columns ## System Properties diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala index 80e2bc4..6ca26c6 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala @@ -22,7 +22,7 @@ package com.audienceproject.spark.dynamodb.connector import com.amazonaws.services.dynamodbv2.document._ import com.amazonaws.services.dynamodbv2.document.spec.{BatchWriteItemSpec, ScanSpec, UpdateItemSpec} -import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity +import com.amazonaws.services.dynamodbv2.model.{ProvisionedThroughputDescription, ReturnConsumedCapacity} import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder import com.audienceproject.shaded.google.common.util.concurrent.RateLimiter import com.audienceproject.spark.dynamodb.catalyst.JavaConverter @@ -68,13 +68,18 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para else sizeBased }) + def readThroughputParam(paramName: String, fallback: ProvisionedThroughputDescription => Long) = + parameters.getOrElse(paramName, + // fallback to old throughput name to preserve backwards compatibility + parameters.getOrElse("throughput", + Option(fallback(desc.getProvisionedThroughput).longValue()) + .filter(_ > 0).map(_.toString).getOrElse("100") + ) + ).toLong + // Provisioned or on-demand throughput. - val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits) - .filter(_ > 0).map(_.longValue().toString) - .getOrElse("100")).toLong - val writeThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getWriteCapacityUnits) - .filter(_ > 0).map(_.longValue().toString) - .getOrElse("100")).toLong + val readThroughput = readThroughputParam("readThroughput", _.getReadCapacityUnits) + val writeThroughput = readThroughputParam("writeThroughput", _.getWriteCapacityUnits) // Rate limit calculation. val avgItemSize = tableSize.toDouble / itemCount diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala index 0d1c213..4caadff 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala @@ -65,7 +65,7 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String }) // Provisioned or on-demand throughput. - val readThroughput = parameters.getOrElse("throughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits) + val readThroughput = parameters.getOrElse("readThroughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits) .filter(_ > 0).map(_.longValue().toString) .getOrElse("100")).toLong