Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Disambiguates throughput for reads and writes #96

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ The following parameters can be set as options on the Spark reader object before
- `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false.
- `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false`
- `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
- `readThroughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.

The following parameters can be set as options on the Spark writer object before saving.

- `writeBatchSize` number of items to send per call to DynamoDB BatchWriteItem. Default 25.
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
- `throughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
- `writeThroughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
- `inferSchema` if false will not automatically infer schema - this is useful when writing to a table with many columns

## System Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package com.audienceproject.spark.dynamodb.connector

import com.amazonaws.services.dynamodbv2.document._
import com.amazonaws.services.dynamodbv2.document.spec.{BatchWriteItemSpec, ScanSpec, UpdateItemSpec}
import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity
import com.amazonaws.services.dynamodbv2.model.{ProvisionedThroughputDescription, ReturnConsumedCapacity}
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder
import com.audienceproject.shaded.google.common.util.concurrent.RateLimiter
import com.audienceproject.spark.dynamodb.catalyst.JavaConverter
Expand Down Expand Up @@ -68,13 +68,18 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
else sizeBased
})

def readThroughputParam(paramName: String, fallback: ProvisionedThroughputDescription => Long) =
parameters.getOrElse(paramName,
// fallback to old throughput name to preserve backwards compatibility
parameters.getOrElse("throughput",
Option(fallback(desc.getProvisionedThroughput).longValue())
.filter(_ > 0).map(_.toString).getOrElse("100")
)
).toLong

// Provisioned or on-demand throughput.
val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits)
.filter(_ > 0).map(_.longValue().toString)
.getOrElse("100")).toLong
val writeThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getWriteCapacityUnits)
.filter(_ > 0).map(_.longValue().toString)
.getOrElse("100")).toLong
val readThroughput = readThroughputParam("readThroughput", _.getReadCapacityUnits)
val writeThroughput = readThroughputParam("writeThroughput", _.getWriteCapacityUnits)

// Rate limit calculation.
val avgItemSize = tableSize.toDouble / itemCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
})

// Provisioned or on-demand throughput.
val readThroughput = parameters.getOrElse("throughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
val readThroughput = parameters.getOrElse("readThroughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
.filter(_ > 0).map(_.longValue().toString)
.getOrElse("100")).toLong

Expand Down