Skip to content

Commit

Permalink
Merge pull request #18295 from jackdelv/HPCC-30775-support-cursor
Browse files Browse the repository at this point in the history
HPCC-30784 Support getCursor() and setCursor() in ParquetRowReader

Reviewed-By: Dan S. Camper <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Feb 26, 2024
2 parents 4edfd23 + b4db534 commit 5a76ec3
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 19 deletions.
13 changes: 2 additions & 11 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,8 +1312,8 @@ class ParquetDiskRowReader : public CInterfaceOf<IDiskRowStream>, implements IDi
virtual const void * nextRow() override;
virtual const void * nextRow(size32_t & resultSize) override;
virtual const void * nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override;
virtual void setCursor(MemoryBuffer & cursor) override;
virtual bool getCursor(MemoryBuffer & cursor) override { return parquetFileReader->getCursor(cursor); }
virtual void setCursor(MemoryBuffer & cursor) override { parquetFileReader->setCursor(cursor); }
virtual void stop() override;

virtual void clearInput() override;
Expand Down Expand Up @@ -1420,15 +1420,6 @@ const void * ParquetDiskRowReader::nextRow(MemoryBufferBuilder & builder)
return nullptr;
}

bool ParquetDiskRowReader::getCursor(MemoryBuffer & cursor)
{
return false;
}

void ParquetDiskRowReader::setCursor(MemoryBuffer & cursor)
{
}

void ParquetDiskRowReader::stop()
{
}
Expand Down
74 changes: 71 additions & 3 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ arrow::Status ParquetReader::processReadFile()

divide_row_groups(activityCtx, totalTables, tableCount, startRowGroup);
}
tablesProcessed = 0;
totalRowsProcessed = 0;
rowsProcessed = 0;
rowsCount = 0;
Expand Down Expand Up @@ -385,18 +386,21 @@ arrow::Result<std::shared_ptr<arrow::Table>> ParquetReader::queryRows()
*/
__int64 ParquetReader::next(TableColumns *&nextTable)
{
if (rowsProcessed == rowsCount)
if (rowsProcessed == rowsCount || restoredCursor)
{
if (restoredCursor)
restoredCursor = false;
else
rowsProcessed = 0;
std::shared_ptr<arrow::Table> table;
if (endsWithIgnoreCase(partOption.c_str(), "partition"))
{
PARQUET_ASSIGN_OR_THROW(table, queryRows());
PARQUET_ASSIGN_OR_THROW(table, queryRows()); // Sets rowsProcessed to current row in table corresponding to startRow
}
else
{
reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table));
}
rowsProcessed = 0;
tablesProcessed++;
rowsCount = table->num_rows();
splitTable(table);
Expand All @@ -406,6 +410,70 @@ __int64 ParquetReader::next(TableColumns *&nextTable)
return rowsProcessed++;
}

/**
* @brief Gets the information about the next row to be read from a Parquet file. A boolean is stored at the
* beginning of the memory buffer for the partition status. If the file is a partitioned dataset, the next row
* and the number of remaining rows are stored in the memory buffer. If the file is a regular Parquet file, the
* current and remaining tables are stored in the memory buffer. Additionally, the number of rows processed within
* the current table is stored in the memory buffer.
*
* @param cursor MemoryBuffer where file processing information is stored
* @return true If building the buffer succeeds
*/
bool ParquetReader::getCursor(MemoryBuffer & cursor)
{
bool partition = endsWithIgnoreCase(partOption.c_str(), "partition");
cursor.append(partition);

// Adjust starting positions to current read position and remove
// already processed rows from the total count for the workers
if (partition)
{
cursor.append(startRow + totalRowsProcessed);
cursor.append(totalRowCount - totalRowsProcessed);
}
else
{
cursor.append(startRowGroup + tablesProcessed);
cursor.append(tableCount - tablesProcessed);
cursor.append(rowsProcessed);
}

return true;
}

/**
* @brief Resets the current access row in the Parquet file based on the information stored in a memory buffer
* created by getCursor. Sets restoredCursor to true for reading from the middle of the table in non-partitioned
* datasets. Resets the file read process trackers and reads the partition flag from the beginning of the buffer.
* If the file is a partitioned dataset then only the starting and remaining rows are read. Otherwise the
* starting and remaining tables are read as well as the current row within the table.
*
* @param cursor MemoryBuffer where file processing information is stored
*/
void ParquetReader::setCursor(MemoryBuffer & cursor)
{
restoredCursor = true;
tablesProcessed = 0;
totalRowsProcessed = 0;
rowsProcessed = 0;
rowsCount = 0;

bool partition;
cursor.read(partition);
if (partition)
{
cursor.read(startRow);
cursor.read(totalRowCount);
}
else
{
cursor.read(startRowGroup);
cursor.read(tableCount);
cursor.read(rowsProcessed);
}
}

/**
* @brief Constructs a ParquetWriter for the target destination and checks for existing data.
*
Expand Down
20 changes: 15 additions & 5 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,15 +769,25 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
std::shared_ptr<parquet::arrow::RowGroupReader> queryCurrentTable(__int64 currTable);
arrow::Result<std::shared_ptr<arrow::Table>> queryRows();

bool getCursor(MemoryBuffer & cursor);
void setCursor(MemoryBuffer & cursor);

private:
// Count of processed rows and tables for both partitioned and regular files.
__int64 tablesProcessed = 0; // The number of tables processed when reading parquet files.
__int64 totalRowsProcessed = 0; // Total number of rows processed of partitioned dataset. We cannot get the total number of chunks and they are variable sizes.
__int64 totalRowCount = 0; // Total number of rows in a partition dataset.
__int64 startRow = 0; // The starting row in a partitioned dataset.
__int64 totalRowsProcessed = 0; // Total number of rows processed.
__int64 rowsProcessed = 0; // Current Row that has been read from the RowGroup.
__int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker.
__int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading.
__int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file.

// Partitioned file read location and size in rows.
__int64 totalRowCount = 0; // Total number of rows in a partition dataset to be read by the worker.
__int64 startRow = 0; // The starting row in a partitioned dataset.

// Regular file read location and size in tables.
__int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading.
__int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker.

bool restoredCursor = false; // True if reading from a restored file location. Skips check rowsProcessed == rowsCount to open the table at the current row location.
size_t maxRowCountInTable = 0; // Max table size set by user.
std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'.
std::string location; // Full path to location for reading parquet files. Can be a filename or directory.
Expand Down

0 comments on commit 5a76ec3

Please sign in to comment.