Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Added option for choosing absolute read, write throughput (cumulative). Issue with Parallelism while writing is handled. #66

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

LalithSrinivas
Copy link

@LalithSrinivas LalithSrinivas commented Jun 11, 2020

What ?
Add support for Dynamo batch writes using absolute RCU and WCU limit set by the spark job, along with target capacity based batch writes. We also added an option to increase / decrease the parallelism while writing data in dynamoDB by setting number of partitions.
Why ?
Adding the support for absolute RCU and WCU gives the user more flexibility in terms of running the job. It also reduces the manual work of calculating the target-capacity % based on the provisioned RCU & WCU
Other changes
We also noticed that retries to write a dynamo record is infinite without a hard limit. We give an option for the user to set a limit on the retries using maxretries
How

  1. By passing "absWrite", "absRead" options, the user can choose to define cumulative write and read throughput.
    This is achieved by considering "absread", "abswrite" parameters. If parameters map contains these keys. Then their values are considered as a cumulative throughput limit. Else there are set to -1 at first, then, as before, throughput is fetched from the table properties. (lines from 69, TableConnector.scala)

  2. If a data frame is user-defined and it is distributed in more (or less) partitions than the value of defaultParallelism parameter, then there will be an overflow (or underflow) of Write Capacity Units. To handle this issue, one should identify the parallelism factor for a user-defined data frame. That can be done by identifying number of tasks running. which is equals to (number of stages) * (number of partitions, the DF is in). As the task is of one stage, number of tasks = number of partitions of DF. Hence an argument with numInputDFPartitions parameter. (line 57, 85, TableConnector.scala)

  3. Infinite recursion case in handleBatchWriteResponse method is handled by adding a maximum retries constraint. The maxRetries can be set by user, by using maxRetries parameter

LalithSrinivas added 7 commits June 11, 2020 20:39
1. Added an option to use user defined write, read limit by passing "absRead", "absWrite" parameter as keys and required limit as value
2. Resolved a case of infinite loop in "handleBatchWriteResponse" method by adding a limit of maximum retiries (of unprocessed data). It can be passed, as a parameter, by the user with the name "maxRetries"
3. Fixed issue with writeLimit in case if dataframe is user defined. (in which case parallelism should not be considered for num of parallel tasks, but the number of tasks itself). Added a parameter for this, which is "numInputDFPartitions". as number of tasks = (number of stages) * (number of partitions, the DF is in). Here number of stages = 1.
Added a logger info part to let the user know about number of unprocessed items when the max retries is reached.
@LalithSrinivas LalithSrinivas marked this pull request as ready for review June 15, 2020 04:59
@cosmincatalin cosmincatalin requested a review from jacobfi July 10, 2020 11:18
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant