You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In this specific production case we partition by 28 distinct years, so 28 directories, each directory with 200 part files, total of 5.6K files. This particular job runs on a single dedicated and ephemeral VM. We have noticed that most of the time the VM is far from being saturated and the job is very slow. It's not IO or CPU bound. Here's an annotated VM utilization graph . The blue line is CPU, and turquoise is memory. This graph doesn't show IO, but we have also monitored that, and it also was not saturated. On the labels:
BQ, you can ignore this
SPARK~1 spark computes some data
SPARK~2 is 1st slow period
SPARK~3 is 2nd slow period
We took two 10 minute JFR profiles, those are marked P-1 and P-2 in the graph above. So P-1 is solely in SPARK~2, and P-2 is partially in SPARK~2 but mostly in SPARK~3. Here's the P-1 profile, and here's P-2 profile.
The picture is a bit more clear when we look at the locks, here's the report. We see that the threads were blocked on locks for a total of 20.5h, mostly/specifically on the global org.apache.parquet.hadoop.MemoryManager, which has two synchronized methods: addWriter and removeWriter. From parquet-mr GH src:
{code:java}
/**
Add a new writer and its memory allocation to the memory manager.
@param allocation the requested buffer size
*/
synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
Long oldValue = writerList.get(writer);
if (oldValue == null) {
writerList.put(writer, allocation);
} else {
throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
"instance of InternalParquetRecordWriter more than once. The Manager already contains " +
"the writer: " + writer);
}
updateAllocation();
}
/**
Remove the given writer from the memory manager.
@param writer the writer that has been closed
*/
synchronized void removeWriter(InternalParquetRecordWriter<?> writer) {
writerList.remove(writer);
if (!writerList.isEmpty()) {
updateAllocation();
}
}
{code}
During the 10 minute profiling session all worker threads were mostly waiting on this lock.
It appears that a combination of large number of writers created via Spark's DynamicPartitionDataSingleWriter and the MemoryManager synchronization bottleneck drastically reduces the performance by starving the writer threads.
Issue originally reported in Spark: https://issues.apache.org/jira/browse/SPARK-44003
We have a pyspark job that writes to a partitioned parquet dataset via:
In this specific production case we partition by 28 distinct years, so 28 directories, each directory with 200 part files, total of 5.6K files. This particular job runs on a single dedicated and ephemeral VM. We have noticed that most of the time the VM is far from being saturated and the job is very slow. It's not IO or CPU bound. Here's an annotated VM utilization graph . The blue line is CPU, and turquoise is memory. This graph doesn't show IO, but we have also monitored that, and it also was not saturated. On the labels:
BQ
, you can ignore thisSPARK~1
spark computes some dataSPARK~2
is 1st slow periodSPARK~3
is 2nd slow periodWe took two 10 minute JFR profiles, those are marked
P-1
andP-2
in the graph above. SoP-1
is solely inSPARK~2
, andP-2
is partially inSPARK~2
but mostly inSPARK~3
. Here's theP-1
profile, and here'sP-2
profile.The picture is a bit more clear when we look at the locks, here's the report. We see that the threads were blocked on locks for a total of 20.5h, mostly/specifically on the global
org.apache.parquet.hadoop.MemoryManager
, which has two synchronized methods:addWriter
andremoveWriter
. From parquet-mr GH src:{code:java}
/**
Add a new writer and its memory allocation to the memory manager.
@param writer the new created writer
@param allocation the requested buffer size
*/
synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
Long oldValue = writerList.get(writer);
if (oldValue == null) {
writerList.put(writer, allocation);
} else {
throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
"instance of InternalParquetRecordWriter more than once. The Manager already contains " +
"the writer: " + writer);
}
updateAllocation();
}
/**
Remove the given writer from the memory manager.
@param writer the writer that has been closed
*/
synchronized void removeWriter(InternalParquetRecordWriter<?> writer) {
writerList.remove(writer);
if (!writerList.isEmpty()) {
updateAllocation();
}
}
{code}
During the 10 minute profiling session all worker threads were mostly waiting on this lock.
It appears that a combination of large number of writers created via Spark's
DynamicPartitionDataSingleWriter
and theMemoryManager
synchronization bottleneck drastically reduces the performance by starving the writer threads.Reporter: Rafal Wojdyla
Related issues:
PRs and other links:
Note: This issue was originally created as PARQUET-2412. Please see the migration documentation for further details.
The text was updated successfully, but these errors were encountered: