Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long delay after writing edge snapshots due to single-threaded renaming of output files by the Spark driver #324

Open
arvindshmicrosoft opened this issue Aug 23, 2021 · 6 comments

Comments

@arvindshmicrosoft
Copy link
Contributor

arvindshmicrosoft commented Aug 23, 2021

I generated SF30000 dataset directly to ADLS Gen 2. What I observed is that after the major snapshots are written, there are significant delays due to single-threaded renaming of the output files. There is a simple, serialized, loop to do this, and all of the action is on the driver. When working with remote cloud storage the overall delays are really significant, taking up ~ 67% of the total datagen time:

stage delay HH:MM after stage is "done" # of Spark tasks
write Comment snapshot 04:28 94499/94499
write Person -[Likes]-> Comment snapshot 03:31 79943/79943
write Forum -[HasMember]-> Person snapshot 03:10 74566/74566
write Comment -[HasTag]-> Tag snapshot 02:24 55907/55907
write Person -[Likes]-> Post snapshot 01:17 34600/34600
write Post snapshot 00:57 23379/23379
write Post -[HasTag]-> Tag snapshot 00:39 15006/15006

The subsequent ones (inserts and deletes) for each of these edges, does not have as bad of a "serialized" behavior, as they seem to write much smaller numbers of files as compared to the the initial snapshot. Is there any way to control the numbers of files produced by the initial snapshot writing?

A sample call stack of the "slow behavior" for the snapshots is below.

"main" #1 prio=5 os_prio=0 tid=0x00007f8e6c05f800 nid=0x4690 runnable [0x00007f8e73805000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at org.wildfly.openssl.OpenSSLSocket.read(OpenSSLSocket.java:423)
        at org.wildfly.openssl.OpenSSLInputStream.read(OpenSSLInputStream.java:41)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        - locked <0x00000000e2cead58> (a java.io.BufferedInputStream)
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
        - locked <0x00000000e2ce1328> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
        - locked <0x00000000e2ce1328> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
        at org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation.processResponse(AbfsHttpOperation.java:330)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:205)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:181)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation$$Lambda$257/1878700101.apply(Unknown Source)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:454)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:179)
        at org.apache.hadoop.fs.azurebfs.services.AbfsClient.renamePath(AbfsClient.java:500)
        at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.rename(AzureBlobFileSystemStore.java:787)
        at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.rename(AzureBlobFileSystem.java:355)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:476)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:490)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:405)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
        - locked <0x00000000a754ea00> (a org.apache.spark.sql.execution.command.DataWritingCommandExec)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan$$Lambda$3184/1809758959.apply(Unknown Source)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.sql.execution.SparkPlan$$Lambda$3185/128469338.apply(Unknown Source)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        - locked <0x00000000a75409c8> (a org.apache.spark.sql.execution.QueryExecution)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter$$Lambda$2963/729861519.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2968/904215674.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2964/495353859.apply(Unknown Source)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
        at ldbc.snb.datagen.io.dataframes$DataFrameWriter$.write(dataframes.scala:57)
        at ldbc.snb.datagen.io.dataframes$DataFrameWriter$.write(dataframes.scala:49)
        at ldbc.snb.datagen.io.Writer$WriterOps.write(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$WriterOps.write$(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$ops$$anon$1.write(Writer.scala:26)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$4(graphs.scala:118)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter$$Lambda$2947/1416238194.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at ldbc.snb.datagen.util.SparkUI$.job(SparkUI.scala:11)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$3(graphs.scala:120)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$3$adapted(graphs.scala:113)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter$$Lambda$2946/34405587.apply(Unknown Source)
        at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:103)
        at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:102)
        at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:102)
        at scala.collection.immutable.RedBlackTree$.foreach(RedBlackTree.scala:99)
        at scala.collection.immutable.TreeMap.foreach(TreeMap.scala:205)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.write(graphs.scala:113)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.write(graphs.scala:103)
        at ldbc.snb.datagen.io.Writer$WriterOps.write(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$WriterOps.write$(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$ops$$anon$1.write(Writer.scala:26)
        at ldbc.snb.datagen.transformation.TransformationStage$write$1$.$anonfun$caseBatched$1(TransformationStage.scala:36)
        at ldbc.snb.datagen.transformation.TransformationStage$write$1$.$anonfun$caseBatched$1$adapted(TransformationStage.scala:35)
        at ldbc.snb.datagen.transformation.TransformationStage$write$1$$$Lambda$2939/533945153.apply(Unknown Source)
        at shapeless.Poly1$CaseBuilder$$anon$1.$anonfun$value$1(polyntraits.scala:36)
        at shapeless.Poly1$CaseBuilder$$anon$1$$Lambda$2938/488509759.apply(Unknown Source)
        at shapeless.PolyDefns$Case.apply(poly.scala:39)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:330)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:331)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:331)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
        at shapeless.syntax.CoproductOps$.map$extension(coproduct.scala:160)
        at ldbc.snb.datagen.transformation.TransformationStage$.run(TransformationStage.scala:57)
        at ldbc.snb.datagen.spark.LdbcDatagen$.run(LdbcDatagen.scala:146)
        at ldbc.snb.datagen.spark.LdbcDatagen$.main(LdbcDatagen.scala:106)
        at ldbc.snb.datagen.spark.LdbcDatagen.main(LdbcDatagen.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
        at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
@arvindshmicrosoft
Copy link
Contributor Author

I edited the issue description as previously I had mistakenly assumed that the numbers of output files produced in the insert / delete stages to be the same as the number of tasks - which is not the case. The numbers of output files produced for inserts / deletes is significantly smaller than the number of output files in the initial snapshot. This does explain the difference in behavior between these 2 sets of stages. So the question reduces to: Is there any way to control the numbers of files produced by the initial snapshot writing?

@szarnyasg
Copy link
Member

@arvindshmicrosoft thanks for reporting this and apologies for the delay in responding. @dszakallas has started to look into this. We have one question - which Spark version do you use? If it's 2.x, can you upgrade to 3.x without much hassle?
Thanks,
Gabor

@arvindshmicrosoft
Copy link
Contributor Author

arvindshmicrosoft commented Sep 13, 2021

Thanks! My cluster config is:

  • Spark 3.1.2
  • Hadoop 3.3.1

So I am on current latest for both. OpenJDK 8.

@dszakallas
Copy link
Member

Taking a look at the hadoop-azure documentation:

| Rename and Delete blob operations on directories with large number of files and sub directories currently is very slow as these operations are done one blob at a time serially. These files and sub folders can be deleted or renamed parallel. Following configurations can be used to enable threads to do parallel processing.

Notice that I found this on the WABS connector's documentation, and I don't know whether this configuration works with ABFSS as well.

Also, I found this piece of performance info on file operations regarding ABFSS: https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html#Performance_and_Scalability

It states that for stores with hierarchical namespaces, directory renames/deletes should be atomic. So in case you are using a namespaced store, the possible reason for the slowness is that individual file renames are happening instead of a directory rename. I wonder if there's a configuration to achieve a single directory rename.

@szarnyasg we should also think about recalibrating the size of our partitions/files in our EMR workload, since they seem very small (on the scale of couple MBs). Outputting larger chunks would mitigate issues like this.

@arvindshmicrosoft
Copy link
Contributor Author

Thanks @dszakallas - there is no obvious configuration within ABFSS to group these. ABFSS is just a victim here, the looping is triggered by these lines:

        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:476)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:490)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:405)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)

In turn, such looping is due to the large number of partitions. The issue would be mitigated if we can configure the partition sizes in Datagen, to increase the "chunk size". Is there such a configuration in Datagen itself?

@dszakallas
Copy link
Member

You can configure --numThreads which directly affects the partition size for Persons. Others are multiples of that by the following factors:

// static
TAG("Tag", 1.0),
TAG_HASTYPE_TAGCLASS("Tag_hasType_TagClass", 1.0),
TAGCLASS("TagClass", 1.0),
TAGCLASS_ISSUBCLASSOF_TAGCLASS("TagClass_isSubclassOf_TagClass", 1.0),
PLACE("Place", 1.0),
PLACE_ISPARTOF_PLACE("Place_isPartOf_Place", 1.0),
ORGANISATION("Organisation", 1.0),
ORGANISATION_ISLOCATEDIN_PLACE("Organisation_isLocatedIn_Place", 1.0),
// dynamic activity
FORUM("Forum", 5.13),
FORUM_HASMEMBER_PERSON("Forum_hasMember_Person", 384.06),
FORUM_HASTAG_TAG("Forum_hasTag_Tag", 11.10),
PERSON_LIKES_POST("Person_likes_Post", 141.12),
PERSON_LIKES_COMMENT("Person_likes_Comment", 325.31),
POST("Post", 138.61),
POST_HASTAG_TAG("Post_hasTag_Tag", 77.34),
COMMENT("Comment", 503.70),
COMMENT_HASTAG_TAG("Comment_hasTag_Tag", 295.20),
// dynamic person
PERSON("Person", 1.0),
PERSON_HASINTEREST_TAG("Person_hasInterest_Tag", 7.89),
PERSON_WORKAT_COMPANY("Person_workAt_Company", 0.77),
PERSON_STUDYAT_UNIVERSITY("Person_studyAt_University", 0.28),
PERSON_KNOWS_PERSON("Person_knows_Person", 26.11),

You can try reducing that. Furthermore I created a pull request that exposes an internal configuration parameter, called oversize factor on the CLI. The multipliers are divided by this value, so larger oversizeFactor will result in fewer, larger files. #334

@szarnyasg szarnyasg added this to the Milestone 4 milestone Jun 25, 2022
@szarnyasg szarnyasg removed this from the Milestone 4 milestone Oct 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants