-
Notifications
You must be signed in to change notification settings - Fork 2
Parquet file reading performance improvement
Parquet file reading performance
In performance benchmarking of Drill, queries on Parquet [1] files that are disk bound appear to perform less well than expected when compared with other engines that read from Parquet files. Drill appears to perform slower, especially in the Parquet reader. In particular the Parquet reader sometimes pulls in data from the file system slower than it can be read from the file system.
The aim of this improvement is to optimize the performance of the Parquet reader so that the reader is gated on the speed at which data can be read from the file system. This also addresses the issue highlighted in DRILL-??
The main requirement of this improvement is to get the performance of Drill’s vectorized parquet reader to be at least as fast as other applications that read from Parquet files. While Drill’s parquet reader is among the fastest implementations in Java, it may lag behind other implementations e.g the parquet-cpp reader.
For comparison, the performance of Drill should be benchmarked using the TPC-H data set and queries that are reading from disk.
There are no end user use cases for this as this is an internal platform improvement. #3 Background and Research We ran some baseline tests to establish Drill’s parquet reader performance and a reasonable target for this project.
The tests were run on a ten node cluster with 11 disks per node. There were two runs for a single query that read all the data from all the columns of a set of parquet files. The file system cache was flushed before the first run so that the first run read all data from disk. The second run was done with a warm cache to see if differences in performance are as a result of the engine’s in-memory processing. Care was taken to ensure that all file reads are local. The tests were run using Drill and one other Parquet reader.
In addition we wrote a similar stand alone program (Solo Reader) that reads all the columns from the files that are on a single node. The program reads the column chunks in parallel (matching the parallelization of the two parquet readers), reads from disk in 16 MB chunks and discards the data after reading. This program effectively simulates the best speeds we can get from the distributed file system when a process has multiple threads reading from different subsets of as set of Parquet files.
The following table summarizes the results
SQLEngine | DataSz (MB) | #Nodes | #Disks/ Node | Cached | Runtime | Est MB/sec |
---|---|---|---|---|---|---|
Drill 1.7.0 | 72,023 | 10 | 11 | No | 26,859 | 268 |
Drill 1.7.0 | 72,023 | 10 | 11 | Yes | 11,236 | 641 |
Other Parquet Reader | 72,023 | 10 | 11 | No | 18,050 | 399 |
Other Parquet Reader | 72,023 | 10 | 11 | Yes | 11,610 | 620 |
SoloReader-16MB | 24,822 | 1 | 11 | No | 34,300 | 724 |
SoloReader-16MB | 24,822 | 1 | 11 | Yes | 9,526 | 2606 |
We observe that when the cache is cold, Drill achieved a thruput of only 268 MB/sec while the other Parquet reader was able to achieve 399 MB/sec. When the cache is warm, Drill was marginally faster. The solo reader was able to read from disk at nearly three times the speed. We conclude that when reading from disk, Drill is not pulling data as fast as it can be processed. We also see that the file system is not the bottleneck as the solo reader can read data from disk faster than the Parquet readers are able to process in-memory data. #4 Design Overview ##4.1 Optimizing disk accesses A characteristic of the Parquet reader is the following read pattern -
Seek to offset where a column chunk begins.
Repeat until column is read
Read byte by byte until page header is read.
Read Page data (by default in Drill, the page size is a maximum of 1 MB)
This read pattern is essentially sequential but suboptimal, since many small reads are done followed by a larger read. This can lead to the file system not caching data effectively and can also prevent the file system from reading ahead as it would for a sequential read. Experimentally, we saw that by increasing the page size of the Parquet file, we would get better read speeds which indicates that we could get better read performance if the file system does larger sequential reads.
To address this, we can make two changes to the reader that will help the file system both read ahead as well as cache better.
###4.1.1 Buffering Reader
The common method to reduce the number of disk accesses is to wrap an InputStream
in a BufferedInputStream
. BufferedInputStream
, though, uses byte arrays and does not work with ByteBuffer
and so will incur an additional copy when we copy from the byte array to the ByteBuffer
. Additionally, the buffer size of BufferedInputStream
is only 4K, far too small for the data set sizes we will encounter with Drill.
This improvement proposes to add a similar buffering capability that supports reading directly into a DrillBuf
and also allows for a configurable buffer size.
###4.1.2 File System Read Ahead
A second strategy is to provide hints to the file system that the InputStream
is going to read a specific range of data. The Posix fadvise
[2] call is implemented by some underlying implementations of HDFS and when available this call can provide the file system with a hint to read ahead and cache as much as it can. This call has no effect on the semantics of the program itself, but will improve performance when available. This improvement intends to set the advice parameter to SEQUENTIAL
.
This call will be made optional. For underlying implementations that do not have support for fadvise
, no call will be made. The call is made optional for another reason. Future improvements that push down filtering to the page level will essentially skip pages that do not need to be read and in that case the file system should not read ahead unnecessarily. In that case, no fdavise call will be made.
##4.2 Pipelining
The existing ParquetRecordReader reads all the columns for a file in a single thread, reading a page from a column at a time.
For every column
Read Page
Decompress
Decode
Copy to Value Vector
Until Value vector is full or all data is read.
This is illustrated in Figure 1 below:
This implementation has a bottleneck where each disk operation (Read Page in Figure 1) is followed by a time consuming Decode (actually a decompress+decode) and Copy operation. This can clearly be improved by parallelizing the Read Page operation for each column. Additionally by making the Read Page operation asynchronous, we can pipeline the operation and the Decode and Read Page operations can run without waiting.
The pipelining can be implemented in two phases: ###4.2.1 Asynchronous Page Reader The first phase makes the ReadPage step asynchronous as this appears to be the bottleneck as illustrated in Figure 2 below. Note that in this phase the Decode Copy operation will still be in a single thread and will decode and copy data once column at a time.
###4.2.2 Parallel Column Reader We are splitting this into two phases so we are able to measure the improvement we see by parallelizing and pipeline just the disk operation. The second phase is to parallelize the Decode + copy operation. This is trickier to implement as the threads need to be synchronised to fill the same number of records for every value vector.
##4.3 Thread Pool
To prevent excessive use of threads the implementation should use a thread pool. This cannot be the same as the thread pool used to execute fragments as that can lead to excessive waiting by the scan threads which in turn will cause the downstream operators to wait.
The current proposal is to create a single scan thread pool per drillbit. A second scan decode thread pool may be required for implementing the Parallel Column Reader.
#5 Implementation Details
##5.1 Algorithms (and Data Structures)
###5.1.1 Buffering Reader
We will implement a new BufferedDirectBufInputStream
that will wrap around a FSDataInputStream
currently used by the Parquet reader. This class explicitly does not try to implement support for other InputStream
implementations as testing that implementation would be well beyond the scope of the project. There is however nothing specific to FSDataInputStream
that is used, except to assume that seek is implemented.
The Buffering reader will also take as input a range (start offset, size) of data. The range is intended to be used to issue an fadvise
as well as to limit the reading of the data (see 5.1.1.1 Limitation).
As a minor optimization, if range of data to be read is less than the default buffer, the smaller of the two sizes will be used as the default buffer size.
####5.1.1.1 Limitation
Some older versions of Parquet data files do not have the correct Rowgroup size set in the metadata, which makes it impossible to limit the amount of data read without additional decoding of data. The Buffering Reader will therefore not actually return an EOF if it has read past the range of data indicated at creation time.
###5.1.2 Asynchronous Page Reader
org.apache.drill.exec.store.parquet.columnreaders.PageReader
(referred to as PageReader) , will be changed to use a BufferedDirectBufInputStream
instead of the current ColumnDataReader
. As with the ColumnDataReader
in the current implementation, the BufferedDirectBufInputStream
will be backed by an FSDataInputStream
.
PageReader
will also become asynchronous. At init time it will submit an asynchronous background task to the underlying thread pool. The background task will be implemented in the AsyncPageReader
static nested class of PageReader
and will implement the Callable
interface. The call method will return a ReadStatus
.
public class PageReader {
static class AsyncPageReader implements Callable{
...
ReadStatus call() throws Exception {
...
}
}
static class ReadStatus{
public long bytesRead;
public int returnVal;
public Exception e;
public DrillBuf pageData;
}
}
Roughly:
Parquet Reader Thread | Read thread |
---|---|
Submit page read task While not end of data Get page Submit page read task Decompress Decode
|
Read page header Read page data bytes Return page header+page data or exception.
|
###5.1.3 Thread Pool | |
The thread pool for the scan will be started at Drillbit startup time and accessed by PageReader through the operator context. | |
###5.1.4 Parallelized Column Reader | |
####5.1.4.1 ParquetRecordReader calls various methods from ColumnReader that implement decompressing, decoding, and copying to value vectors. These methods are called sequentiall for each column and can easily be parallelized. It is hard to pipeline these though as the logic is rather intricately tied to maintaining the correct record count across all the columns. |
####5.1.4.2 Fixed length fields
Fixed length fields are decoded by calling the ColumnReader.processPages
method from the ParquetRecordReader.readAllFixedFields
method. The ParquetRecordReader.readAllFixedFields
method will be changed to submit ColumnReader.processPages
tasks in parallel to a scan decoding thread pool. It will block until all ColumnReader.processPages
tasks have completed.
####5.1.4.3 Variable Length fields
ParquetRecordReeader
uses a VarLengthBinaryReader
to read variable length columns. The method VarLengthBinaryReader.readFields
calls two methods ColumnReader.determineSize
and ColumnReader.readRecords
serially.
The VarLengthBinaryReader.readFields
method will be changed to submit ColumnReader.determineSize
and ColumnReader.readRecords
in parallel to a scan decoding thread pool.
Note that we cannot submit column decoding tasks to the scan thread pool as these methods need to block waiting for data to be read. If the thread pool size is not large enough, the disk read task which is created by the column decoding task will be blocked waiting for a thread to execute it and the system will deadlock.
##5.2 APIs and Protocols
The BufferingDirectBufInputStream
class implements the API of InputStream
[3], except the mark and reset functionality. Additionally, the following APIs are implemented.
Return type | API | Description |
---|---|---|
void |
public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, long totalByteSize, int bufSize, boolean enableHints) |
Creates a buffered Input stream that reads the underlying InputStream starting at startOffset, reads upto totalByteSize bytes and uses an internal buffer of bufSize bytes. enableHints will try to issue an fadvise call if available |
int |
read(DrillBuf b) |
Reads some number of bytes from the input stream and stores them into the DrillBuf buffer b. |
int |
read(DrillBuf b, int off, int len) |
Reads up to len bytes of data from the input stream into an array of bytes. |
##5.3 Performance
This entire feature is to improve Parquet reader performance. We expect to get close to the 399 MB/sec achieved by the other readers for the case where the cache is cold. There should be no degradation in performance with the case where the data is already in the file system cache.
Performance benchmarking should be done as part of this improvement.
##5.4 Error and Failure handling
The BufferedDirectBufInputStream
will throw the same exceptions that InputStream
does. These exceptions will be caught by the AsyncPageReader
. Any Exception in AsyncPageReader
will be caught by PageReader as IO Exceptions and rethrown as a UserException.
##5.5 Memory management
This feature increases the minimum amount of memory required to read a single column of Parquet data to 8 MB. There is no way around this except by configuring a smaller read buffer size.
##5.6 Scalability Issues
###5.6.1 Resource utilization
The number of threads created as part of a query’s execution increases and with an increased number of concurrent queries can lead to excessive thread context switches. This will be limited by using a thread pool.
A second consideration is that the BufferedInputStream
implementation will allocate an internal buffer to read and buffer data into. Based on experiments described in section 3, it would appear that 8 MB is good size for this buffer. However a large number of columns (say 300), can result in a large amount of memory (2.34 GB in this case) being pre-allocated. This could potentially lead to out-of-memory in conditions of high concurrency.
##5.7 Options and metrics
###5.7.1 Configurable Options
The following options need to be made configurable for the Parquet Record reader.
- Parquet scan thread pool size - This limits the size of the thread pool used by a single ParquetRecordReader.
- Parquet scan buffering block size - This sets the default size of the read buffer.
###5.7.2 Metrics
- Parquet scan PageReader Wait Time - The total amount of time the PageReader was waiting to get data from an async read
- More TBD
##5.8 Testing implications Asynchronous reading of page data can have issues if dictionary data is not read fully before data pages are read and decoded. Functional tests should be run for all Parquet data types especially dictionary encoded types and files with pages that are a mix of dictionary encoded and non-dictionary encoded. Any off by one errors in the buffering can cause data corruption. Functional tests should be run on files with data pages that are split on the boundary of the read buffer. ##5.9 Tradeoffs and Limitations The faster implementation comes by utilizing more memory and more threads. A new pool of scan threads will increase the number of execution threads. The size of this pool is bounded though. Since the implementation will use a buffering input stream, the memory requirement for the input streams can become much larger than it is presently. The biggest limitation is that in the pipeline, decompressing data has to be done at the level of (Parquet) page data. This can easily become the performance bottleneck and there is no way to overcome that. For every data set, as the page size is changed, there is a point at which the disk will not remain the bottleneck but the decompression will be the slowest process in the pipeline. There is no way at the moment of tuning this automatically and the system will have to be tuned manually to determine the best compression type and page size. #6 Implementation Plan
- Initial Experimentation - (outlined in the Background and Research section)
- Implement
BufferedDirectBufInputStream
and use that inPageReader
- Implement
AsynchronousPageReader
- Run Benchmark
- Implement parallel ColumnReaders
- Run Benchmark
- Add unit tests #7 Open items The design of Parallelized Column Reader (section 5.1.4) may need more detail. Any additional metrics that need to be tracked need to be added.
#8 References
[1] https://parquet.apache.org/documentation/latest/
[2] http://linux.die.net/man/2/posix_fadvise
[3] https://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html
9 Document History
Date | Author | Version | Description |
---|---|---|---|
2016-07-17 | Parth S. Chandra | 0.1 | Initial Draft |