From b4db534a8a1afc45b7e2afd58225bfaf376b4f8c Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Thu, 22 Feb 2024 18:02:58 +0000 Subject: [PATCH] HPCC-30784 Support getCursor() and setCursor() in ParquetRowReader --- common/thorhelper/thorread.cpp | 13 +----- plugins/parquet/parquetembed.cpp | 74 ++++++++++++++++++++++++++++++-- plugins/parquet/parquetembed.hpp | 20 ++++++--- 3 files changed, 88 insertions(+), 19 deletions(-) diff --git a/common/thorhelper/thorread.cpp b/common/thorhelper/thorread.cpp index 1f92b7ea31b..7ffbe409d96 100644 --- a/common/thorhelper/thorread.cpp +++ b/common/thorhelper/thorread.cpp @@ -1312,8 +1312,8 @@ class ParquetDiskRowReader : public CInterfaceOf, implements IDi virtual const void * nextRow() override; virtual const void * nextRow(size32_t & resultSize) override; virtual const void * nextRow(MemoryBufferBuilder & builder) override; - virtual bool getCursor(MemoryBuffer & cursor) override; - virtual void setCursor(MemoryBuffer & cursor) override; + virtual bool getCursor(MemoryBuffer & cursor) override { return parquetFileReader->getCursor(cursor); } + virtual void setCursor(MemoryBuffer & cursor) override { parquetFileReader->setCursor(cursor); } virtual void stop() override; virtual void clearInput() override; @@ -1420,15 +1420,6 @@ const void * ParquetDiskRowReader::nextRow(MemoryBufferBuilder & builder) return nullptr; } -bool ParquetDiskRowReader::getCursor(MemoryBuffer & cursor) -{ - return false; -} - -void ParquetDiskRowReader::setCursor(MemoryBuffer & cursor) -{ -} - void ParquetDiskRowReader::stop() { } diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index c2ee5dd1f9e..a3bd7cf5aeb 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -330,6 +330,7 @@ arrow::Status ParquetReader::processReadFile() divide_row_groups(activityCtx, totalTables, tableCount, startRowGroup); } + tablesProcessed = 0; totalRowsProcessed = 0; rowsProcessed = 0; rowsCount = 0; @@ -385,18 +386,21 @@ arrow::Result> ParquetReader::queryRows() */ __int64 ParquetReader::next(TableColumns *&nextTable) { - if (rowsProcessed == rowsCount) + if (rowsProcessed == rowsCount || restoredCursor) { + if (restoredCursor) + restoredCursor = false; + else + rowsProcessed = 0; std::shared_ptr table; if (endsWithIgnoreCase(partOption.c_str(), "partition")) { - PARQUET_ASSIGN_OR_THROW(table, queryRows()); + PARQUET_ASSIGN_OR_THROW(table, queryRows()); // Sets rowsProcessed to current row in table corresponding to startRow } else { reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table)); } - rowsProcessed = 0; tablesProcessed++; rowsCount = table->num_rows(); splitTable(table); @@ -406,6 +410,70 @@ __int64 ParquetReader::next(TableColumns *&nextTable) return rowsProcessed++; } +/** + * @brief Gets the information about the next row to be read from a Parquet file. A boolean is stored at the + * beginning of the memory buffer for the partition status. If the file is a partitioned dataset, the next row + * and the number of remaining rows are stored in the memory buffer. If the file is a regular Parquet file, the + * current and remaining tables are stored in the memory buffer. Additionally, the number of rows processed within + * the current table is stored in the memory buffer. + * + * @param cursor MemoryBuffer where file processing information is stored + * @return true If building the buffer succeeds + */ +bool ParquetReader::getCursor(MemoryBuffer & cursor) +{ + bool partition = endsWithIgnoreCase(partOption.c_str(), "partition"); + cursor.append(partition); + + // Adjust starting positions to current read position and remove + // already processed rows from the total count for the workers + if (partition) + { + cursor.append(startRow + totalRowsProcessed); + cursor.append(totalRowCount - totalRowsProcessed); + } + else + { + cursor.append(startRowGroup + tablesProcessed); + cursor.append(tableCount - tablesProcessed); + cursor.append(rowsProcessed); + } + + return true; +} + +/** + * @brief Resets the current access row in the Parquet file based on the information stored in a memory buffer + * created by getCursor. Sets restoredCursor to true for reading from the middle of the table in non-partitioned + * datasets. Resets the file read process trackers and reads the partition flag from the beginning of the buffer. + * If the file is a partitioned dataset then only the starting and remaining rows are read. Otherwise the + * starting and remaining tables are read as well as the current row within the table. + * + * @param cursor MemoryBuffer where file processing information is stored + */ +void ParquetReader::setCursor(MemoryBuffer & cursor) +{ + restoredCursor = true; + tablesProcessed = 0; + totalRowsProcessed = 0; + rowsProcessed = 0; + rowsCount = 0; + + bool partition; + cursor.read(partition); + if (partition) + { + cursor.read(startRow); + cursor.read(totalRowCount); + } + else + { + cursor.read(startRowGroup); + cursor.read(tableCount); + cursor.read(rowsProcessed); + } +} + /** * @brief Constructs a ParquetWriter for the target destination and checks for existing data. * diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index 2c65e660371..d455a71347f 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -769,15 +769,25 @@ class PARQUETEMBED_PLUGIN_API ParquetReader std::shared_ptr queryCurrentTable(__int64 currTable); arrow::Result> queryRows(); + bool getCursor(MemoryBuffer & cursor); + void setCursor(MemoryBuffer & cursor); + private: + // Count of processed rows and tables for both partitioned and regular files. __int64 tablesProcessed = 0; // The number of tables processed when reading parquet files. - __int64 totalRowsProcessed = 0; // Total number of rows processed of partitioned dataset. We cannot get the total number of chunks and they are variable sizes. - __int64 totalRowCount = 0; // Total number of rows in a partition dataset. - __int64 startRow = 0; // The starting row in a partitioned dataset. + __int64 totalRowsProcessed = 0; // Total number of rows processed. __int64 rowsProcessed = 0; // Current Row that has been read from the RowGroup. - __int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker. - __int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading. __int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file. + + // Partitioned file read location and size in rows. + __int64 totalRowCount = 0; // Total number of rows in a partition dataset to be read by the worker. + __int64 startRow = 0; // The starting row in a partitioned dataset. + + // Regular file read location and size in tables. + __int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading. + __int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker. + + bool restoredCursor = false; // True if reading from a restored file location. Skips check rowsProcessed == rowsCount to open the table at the current row location. size_t maxRowCountInTable = 0; // Max table size set by user. std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'. std::string location; // Full path to location for reading parquet files. Can be a filename or directory.