Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Very bad read performance for pretty small DynamoDB table #65

Open
juliankeppel opened this issue May 25, 2020 · 9 comments
Open

Very bad read performance for pretty small DynamoDB table #65

juliankeppel opened this issue May 25, 2020 · 9 comments

Comments

@juliankeppel
Copy link

juliankeppel commented May 25, 2020

I want to read a very small DynamoDB table (about 6.500 elements, 200 KB in total) into my Spark Structured Streaming job every micro-batch. I use Pyspark with Spark version 2.4.4, spark-dynamodb version 1.0.4. The DynamoDB table has a provisioned read capacity of 5.

My code looks as follows:

spark.read.format("dynamodb") \
  .option("region", "eu-central-1") \
  .option("tableName", my_table) \
  .load() \
  .filter(<some-easy-filtering-on-one-boolean-column>) \
  .select(<some-column-selects>)

I faced a very slow read performance, where it takes multiple seconds, up to few minutes, to read those few elements from Dynamo:
grafik

I also noticed that only small portion of the provisioned read capacity is used for every read:
grafik

It seems to be random how many read capacity is used. Sometimes, there is used even less. But anyway, even with a read capacity of 1 or so, it should be much faster to read ~6.500 elements from a very small DynamoDB table.

I also tried some configurations like:

  • .option("filterPushdown", False)
  • .option("readPartitions", <1, 2, 4, 8, ...>)
  • .option("targetCapacity", 1)

with no effect at all. I noticed that with readPartitions i. e. of 8, it's a little bit faster (about 20 seconds), but not fast enough for my understanding. And I think that such a small amount of elements should be readable with one partition in a feasible amount of time.

Any ideas, what I'm doing wrong? Any advice on that? Thank's in advance!

@juliankeppel juliankeppel changed the title Very bad read performance Very bad read performance for pretty small DynamoDB table May 25, 2020
@jacobfi
Copy link
Contributor

jacobfi commented May 25, 2020

Hi
Can you try to do a

.load() \
.cache() \
.count()

on the Dynamo table's dataframe and see if that makes a difference?

Also I think reading 200 KB with 5 read capacity should take around 5 seconds.

@juliankeppel
Copy link
Author

Please note that I'm talking about a structured streaming project and thus need to update the Dynamo dataframe in every micro-batch. I need the latest data from the Dynamo table, so caching is not feasible for my use case. Anyway, this would possibly only mitigate the original problem.

@jacobfi
Copy link
Contributor

jacobfi commented May 25, 2020

Yes, sorry for not explaining - I know it's not a fix and a workaround at best.
I was mostly curious because I suspect the library may have erronous interaction with the Spark planner, leading to symptoms such as this.
Given what you have shown, it seems there is a problem with the throughput/partition calculation and executor utilization. I'm trying to figure out if it's in the Dynamo connector layer or the Spark planning layer.
If indeed a cache().count() causes the library to access the Dynamo table as expected (5 seconds of 5 throughput on however many partitions) then it means this problem occurs in the planning layer.

@juliankeppel
Copy link
Author

juliankeppel commented May 25, 2020

Ok, I got it!

I tried the following code snippet:

    print(datetime.datetime.now())
    df_count_without_cache = (
        spark.read
        .format("dynamodb")
        .option("region", "eu-central-1")
        .option("tableName", <my_table_name>)
        .load()
        .count()
    )
    print(datetime.datetime.now())
    print(datetime.datetime.now())
    df_count_with_cache = (
        spark.read
        .format("dynamodb")
        .option("region", "eu-central-1")
        .option("tableName", <my_table_name>)
        .load()
        .cache()
        .count()
    )
    print(datetime.datetime.now())

Result was:

2020-05-25 13:00:28.602334
2020-05-25 13:00:42.288196
2020-05-25 13:00:42.288232
2020-05-25 13:00:57.814243

As you can see, both queries took nearly same amount of time (about 15 seconds). What can we conclude from that? The problem is not at Spark planning layer? Any other ideas how to proceed?

@jacobfi
Copy link
Contributor

jacobfi commented May 26, 2020

Hi
I ran some tests yesterday and something interesting happened!
Yesterday I had the same problem as you, but this morning it could read the whole table in less than a second.
I think the issue is that the DynamoDB table description did not correctly reflect the contents of the table yesterday, when I had only just populated it with data.
The library heavily relies on this description to calculate the optimal parameters for the scan operation. If the numbers are off, the scan will be suboptimal.

If this is indeed the case for you, then a workaround right now would be to set the option bytesPerRCU to something very high, such as 400000 or 4000000

Let me know if this works. It might also just work today if the table description is up-to-date.

@juliankeppel
Copy link
Author

Yes, it seems you are right! I also faced more like a random behaviour in read performance. Now that I hard-coded the bytesPerRCU option to 4000000, it's fine.

I will play around a little bit with different values here and use that option for now.

Thank you for providing this workaround!

@sfcoy
Copy link

sfcoy commented Sep 30, 2020

Hi there,
I too have encountered this problem. In our case we are loading 6 or so dynamodb tables and performing a big union on these as well as conventional HDFS tables (hosted in S3). These dynamo tables vary in size between 12 and 200 or so rows (i.e. tiny). Everything seemed to be grinding to a halt in the RateLimiter (as revealed by a thread dump).

Changing bytesPerRCU to 400_000 seemed to resolve this problem.

We also had a second problem where we had to set readPartitions to 1 on a couple of the tables because it seemed to lose track of the table schema when running in a five node cluster. I will raise a separate issue for this.

@cozos
Copy link

cozos commented Jan 8, 2021

@jacobfi When you say "DynamoDB table description", what are you referring to? Is this referring to the output of the DynamoDB DescribeTable API call?

Is the problem that DynamoDB's DescribeTable API call is returning bad/outdated information (I'm guessing because it's eventually consistent), which messes up the calculation of optimal scan parameters?

@jacobfi
Copy link
Contributor

jacobfi commented Jan 8, 2021

Yes the library uses the DescribeTable API, and it is possible this is returning misleading information. But it is also possible that there is just something wrong with the throughput calculation in the library.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

No branches or pull requests

4 participants