Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Avoid S3 throttle when fetching object metadata #607

Open
penghuo opened this issue Aug 29, 2024 · 1 comment
Open

[FEATURE] Avoid S3 throttle when fetching object metadata #607

penghuo opened this issue Aug 29, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@penghuo
Copy link
Collaborator

penghuo commented Aug 29, 2024

Is your feature request related to a problem?
Create MV (auto_refresh) failed

24/08/27 22:00:51 ERROR Executor: Exception in task 3522.0 in stage 202.0 (TID 169730)
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow Down; Request ID: 1EQ9CWXF9MC1S9KD; S3 Extended Request ID: yOVkBHfAt+jWF8t4av9jQAgcwU9aaJpKG03ZE3d2sNLGmH3VgcQTjyAJTnWuRNOznjHt19wHoAQ=; Proxy: null)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5467) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5414) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:26) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:12) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:111) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:138) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:186) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:43) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.getFileMetadataFromCacheOrS3(Jets3tNativeFileSystemStore.java:636) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:320) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:650) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:633) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:433) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
    at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:258) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$6(HadoopFSUtils.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at scala.collection.immutable.Stream.map(Stream.scala:418) ~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$4(HadoopFSUtils.scala:128) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]

Root cause: Spark's HadoopFSUtils lists files and directories in parallel when the number of paths (5000 in this test case) exceeds the PARALLEL_PARTITION_DISCOVERY_THRESHOLD (default is 32). Spark schedules Math.min(paths.size, PARALLEL_PARTITION_DISCOVERY_PARALLELISM (default is 10000) tasks in parallel, making S3 HEAD requests to fetch metadata on the driver node. This approach exceeds the limit of 5500 GET/HEAD requests per second per partitioned Amazon S3 prefix.

Work around: Given that the average S3 list task latency is 20ms and S3's limit is 5500 requests per second per prefix, we could set spark.sql.sources.parallelPartitionDiscovery.parallelism = 100. This would result in a total of 5000 requests per second.

What solution would you like?
A clear and concise description of what you want to happen.

What alternatives have you considered?
n/a

Do you have any additional context?
Add any other context or screenshots about the feature request here.

@penghuo
Copy link
Collaborator Author

penghuo commented Aug 30, 2024

Issue 2 - listing s3 mulitple times before the first batch started

  • Summary
    • Root cause, The cached table partition metadata was evicted because it exceeded the maximum configured limit of 250MB. As a result, when Spark creates the logical relation, it rescans the root path.
    • Next step,
      • set spark.sql.hive.filesourcePartitionFileCacheSize = 524288000 // 500MB
      • scale-up driver node, to 16vCPU, 64GB memory.

@dai-chen dai-chen changed the title [FEATURE] Avoid S3 throttle when fetech object metadata [FEATURE] Avoid S3 throttle when fetching object metadata Sep 3, 2024
@dai-chen dai-chen removed the untriaged label Sep 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants