Skip to content

Commit

Permalink
HPCC-30653 Improve Parquet partitioning interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Nov 8, 2023
1 parent 69a5d2a commit 8457f2e
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 66 deletions.
18 changes: 14 additions & 4 deletions plugins/parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dataset := ParquetIO.Read(layout, '/source/directory/data.parquet');

#### 2. Writing Parquet Files

The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists.
The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists. If overwrite is set to true the plugin will check for files that match the target path passed in and delete them first before writing new files.

The Parquet Plugin supports all available Arrow compression types. Specifying the compression when writing is optional and defaults to Uncompressed. The options for compressing your files are Snappy, GZip, Brotli, LZ4, LZ4Frame, LZ4Hadoop, ZSTD, Uncompressed.

Expand All @@ -51,14 +51,24 @@ ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption, co

### Partitioned Files (Tabular Datasets)

The Parquet plugin supports both Hive Partitioning and Directory Partitioning. Hive partitioning uses a key-value partitioning scheme for selecting directory names. For example, the file under `dataset/year=2017/month=01/data0.parquet` contains only data for which the year equals 2017 and the month equals 01. The second partitioning scheme, Directory Partitioning, is similar, but rather than having key-value pairs the partition keys are inferred in the file path. For example, instead of having `/year=2017/month=01/day=01` the file path would be `/2017/01/01`.

#### 1. Reading Partitioned Files

The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files.
For reading partitioned files, pass in the target directory to the read function of the type of partition you are using. For directory partitioning, a list of the field names that make up the partitioning schema is required because it is not included in the directory structure like hive partitioning.

```
github_dataset := ParquetIO.ReadPartition(layout, '/source/directory/partioned_dataset');
github_dataset := ParquetIO.HivePartition.Read(layout, '/source/directory/partitioned_dataset');
github_dataset := ParquetIO.DirectoryPartition.Read(layout, 'source/directory/partitioned_dataset', 'year;month;day')
```

#### 2. Writing Partitioned Files

For partitioning parquet files all you need to do is run the Write function on Thor rather than hthor and each worker will create its own parquet file.
To select the fields that you wish to partition your data on pass in a string of semicolon seperated field names. If the fields you select create too many subdirectories you may need to partition your data on different fields. The rowSize field defaults to 100000 rows and determines how many rows to put in each part of the output files. Writing a partitioned file to a directory that already contains data will fail unless the overwrite option is set to true. If the overwrite option is set to true and the target directory is not empty the plugin will first erase the contents of the target directory before writing the new files.

```
ParquetIO.HivePartition.Write(outDataset, rowSize, '/source/directory/partioned_dataset', overwriteOption, 'year;month;day');
ParquetIO.DirectoryPartition.Read(outDataset, rowSize, '/source/directory/partioned_dataset', overwriteOption, 'year;month;day');
```
15 changes: 7 additions & 8 deletions plugins/parquet/examples/create_partition.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ layout := RECORD
INTEGER commit_date;
END;

#IF(0)
github_dataset := ParquetIO.Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet');
Write(DISTRIBUTE(github_dataset, SKEW(.05)), '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet');
#END
csv_data := DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1)));
writeStep := ParquetIO.HivePartition.Write(CHOOSEN(csv_data, 1330), , '/datadrive/dev/test_data/sandbox/test_partition/', TRUE, 'commit_date;repo');

#IF(1)
github_dataset := ParquetIO.ReadPartition(layout, '/datadrive/dev/test_data/hpcc_gh_partition');
OUTPUT(CHOOSEN(github_dataset, 10000), NAMED('GITHUB_PARTITION'));
#END
github_dataset := ParquetIO.HivePartition.Read(layout, '/datadrive/dev/test_data/sandbox/test_partition/');
readStep := OUTPUT(CHOOSEN(github_dataset, 100), NAMED('GITHUB_PARTITION'));
countStep := OUTPUT(COUNT(github_dataset), NAMED('GITHUB_COUNT'));

SEQUENTIAL(writeStep, PARALLEL(readStep, countStep));
39 changes: 28 additions & 11 deletions plugins/parquet/parquet.ecllib
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,46 @@ END;

EXPORT ParquetIO := MODULE

EXPORT HivePartition := MODULE
EXPORT Write(outDS, outRows = 100000, basePath, overwriteOption = false, partitionFieldList) := FUNCTIONMACRO
LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writehivepartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption), partitionFields(partitionFieldList))
ENDEMBED;
RETURN _DoParquetWritePartition(outDS);
ENDMACRO;

EXPORT Read(resultLayout, basePath) := FUNCTIONMACRO
LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readhivepartition'), location(basePath))
ENDEMBED;
RETURN _DoParquetReadPartition();
ENDMACRO;
END;

EXPORT DirectoryPartition := MODULE
EXPORT Write(outDS, outRows = 100000, basePath, overwriteOption = false, partitionFieldList) := FUNCTIONMACRO
LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writedirectorypartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption), partitionFields(partitionFieldList))
ENDEMBED;
RETURN _DoParquetWritePartition(outDS);
ENDMACRO;

EXPORT Read(resultLayout, basePath, partitionFieldList) := FUNCTIONMACRO
LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readdirectorypartition'), location(basePath)), partitionFields(partitionFieldList)
ENDEMBED;
RETURN _DoParquetReadPartition();
ENDMACRO;
END;

EXPORT Read(resultLayout, filePath) := FUNCTIONMACRO
LOCAL STREAMED DATASET(resultLayout) _DoParquetRead() := EMBED(parquet : activity, option('read'), location(filePath))
ENDEMBED;
RETURN _DoParquetRead();
ENDMACRO;

EXPORT ReadPartition(resultLayout, basePath) := FUNCTIONMACRO
LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readpartition'), location(basePath))
ENDEMBED;
RETURN _DoParquetReadPartition();
ENDMACRO;

EXPORT Write(outDS, filePath, overwriteOption = false, compressionOption = '\'UNCOMPRESSED\'') := FUNCTIONMACRO
LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption), compression(compressionOption))
ENDEMBED;
RETURN _doParquetWrite(outDS);
ENDMACRO;

EXPORT WritePartition(outDS, outRows = 100000, basePath, overwriteOption = false) := FUNCTIONMACRO
LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption))
ENDEMBED;
RETURN _DoParquetWritePartition(outDS);
ENDMACRO;
END;

EXPORT getEmbedContext := ParquetService.getEmbedContext;
Expand Down
157 changes: 115 additions & 42 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,31 @@ extern void fail(const char *message)
* @param _batchSize The size of the batches when converting parquet columns to rows.
*/
ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize,
bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx)
bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx)
: partOption(option), location(_location), destination(destination)
{
rowSize = _rowSize;
batchSize = _batchSize;
overwrite = _overwrite;
compressionOption = _compressionOption;
activityCtx = _activityCtx;

pool = arrow::default_memory_pool();

parquetDoc = std::vector<rapidjson::Document>(rowSize);

partition = String(option).endsWith("partition");
if (activityCtx->querySlave() == 0 && startsWith(option, "write"))
{
reportIfFailure(checkDirContents());
}

partition = endsWithIgnoreCase(option, "partition");
if (partition)
{
std::stringstream ss(_partitionFields);
std::string field;
while (std::getline(ss, field, ';'))
partitionFields.push_back(field);
}
}

ParquetHelper::~ParquetHelper()
Expand All @@ -142,6 +153,53 @@ std::shared_ptr<arrow::Schema> ParquetHelper::getSchema()
return schema;
}

arrow::Status ParquetHelper::checkDirContents()
{
if (destination.empty())
{
failx("Missing target location when writing Parquet data.");
}
StringBuffer path;
StringBuffer filename;
StringBuffer ext;
splitFilename(destination.c_str(), nullptr, &path, &filename, &ext, false);

ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination));

Owned<IDirectoryIterator> itr = createDirectoryIterator(path.str(), filename.appendf("*%s", ext.str()));
ForEach (*itr)
{
IFile &file = itr->query();
if (file.isFile() == fileBool::foundYes)
{
if(overwrite)
{
if (!file.remove())
{
failx("Failed to remove file %s", file.queryFilename());
}
}
else
{
failx("The target file %s already exists. To delete the file set the overwrite option to true.", file.queryFilename());
}
}
else
{
if (overwrite)
{
reportIfFailure(filesystem->DeleteDirContents(path.str()));
break;
}
else
{
failx("The target directory %s is not empty. To delete the contents of the directory set the overwrite option to true.", path.str());
}
}
}
return arrow::Status::OK();
}

/**
* @brief Opens the write stream with the schema and destination. T
*
Expand All @@ -154,41 +212,18 @@ arrow::Status ParquetHelper::openWriteFile()
if (partition)
{
ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination));
reportIfFailure(filesystem->DeleteDirContents(destination));
auto partition_schema = arrow::schema({schema->field(5)});

auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
auto partitioning = std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);

writeOptions.file_write_options = format->DefaultWriteOptions();
writeOptions.filesystem = filesystem;
writeOptions.base_dir = destination;
writeOptions.partitioning = partitioning;
writeOptions.existing_data_behavior = overwrite ? arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore : arrow::dataset::ExistingDataBehavior::kError;
writeOptions.partitioning = partitionType;
writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;
}
else
{
StringBuffer filename;
StringBuffer path;
StringBuffer ext;
splitFilename(destination.c_str(), nullptr, &path, &filename, &ext, false);
if(!endsWith(destination.c_str(), ".parquet"))
failx("Error opening file: Invalid file extension for file %s", destination.c_str());

if(!strieq(ext, ".parquet"))
failx("Error opening file: Invalid file extension %s", ext.str());

Owned<IDirectoryIterator> itr = createDirectoryIterator(path.str(), filename.append("*.parquet"));

ForEach(*itr)
{
if (overwrite)
{
IFile &file = itr->query();
if(!file.remove())
failx("File %s could not be overwritten.", file.queryFilename());
}
else
failx("Cannot write to file %s because it already exists. To delete it set the overwrite option to true.", destination.c_str());
}
// Currently under the assumption that all channels and workers are given a worker id and no matter
// the configuration will show up in activityCtx->numSlaves()
if (activityCtx->numSlaves() > 1)
Expand Down Expand Up @@ -233,8 +268,18 @@ arrow::Status ParquetHelper::openReadFile()
std::shared_ptr<arrow::dataset::ParquetFileFormat> format = std::make_shared<arrow::dataset::ParquetFileFormat>();

arrow::dataset::FileSystemFactoryOptions options;
options.partitioning = arrow::dataset::HivePartitioning::MakeFactory(); // TODO set other partitioning types

if (endsWithIgnoreCase(partOption.c_str(), "hivepartition"))
{
options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();
}
else if (endsWithIgnoreCase(partOption.c_str(), "directorypartition"))
{
options.partitioning = arrow::dataset::DirectoryPartitioning::MakeFactory(partitionFields);
}
else
{
failx("Incorrect partitioning type %s.", partOption.c_str());
}
// Create the dataset factory
PARQUET_ASSIGN_OR_THROW(auto dataset_factory, arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options));

Expand Down Expand Up @@ -421,14 +466,13 @@ arrow::Status ParquetHelper::processReadFile()
{
// rowsProcessed starts at zero and we read in batches until it is equal to rowsCount
rowsProcessed = 0;
totalRowsProcessed = 0;
PARQUET_ASSIGN_OR_THROW(rbatchReader, scanner->ToRecordBatchReader());
rbatchItr = arrow::RecordBatchReader::RecordBatchReaderIterator(rbatchReader.get());
// Divide the work among any number of workers
PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr);
PARQUET_ASSIGN_OR_THROW(float total_rows, scanner->CountRows());
batchSize = batch->num_rows();
divide_row_groups(activityCtx, std::ceil(total_rows / batchSize), tableCount, startRowGroup);
if (tableCount != 0)
PARQUET_ASSIGN_OR_THROW(auto total_rows, scanner->CountRows());
divide_row_groups(activityCtx, total_rows, totalRowCount, startRow);
if (totalRowCount != 0)
{
std::shared_ptr<arrow::Table> table;
PARQUET_ASSIGN_OR_THROW(table, queryRows());
Expand Down Expand Up @@ -515,7 +559,10 @@ char ParquetHelper::queryPartOptions()
*/
bool ParquetHelper::shouldRead()
{
return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount);
if (partition)
return !(totalRowsProcessed >= totalRowCount);
else
return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount);
}

__int64 &ParquetHelper::getRowsProcessed()
Expand Down Expand Up @@ -557,12 +604,13 @@ arrow::Result<std::shared_ptr<arrow::Table>> ParquetHelper::queryRows()
{
if (tablesProcessed == 0)
{
__int64 offset = 0;
while (offset < startRowGroup)
__int64 offset = (*rbatchItr)->get()->num_rows();
while (offset < startRow)
{
rbatchItr++;
offset++;
offset += (*rbatchItr)->get()->num_rows();
}
rowsProcessed = (*rbatchItr)->get()->num_rows() - (offset - startRow);
}
PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr);
rbatchItr++;
Expand Down Expand Up @@ -594,6 +642,7 @@ std::unordered_map<std::string, std::shared_ptr<arrow::Array>> &ParquetHelper::n
chunkTable(table);
}
}
totalRowsProcessed++;
return parquetTable;
}

Expand Down Expand Up @@ -741,6 +790,27 @@ arrow::Status ParquetHelper::fieldsToSchema(const RtlTypeInfo *typeInfo)
}

schema = std::make_shared<arrow::Schema>(arrow_fields);

if (partition)
{
arrow::FieldVector partitionSchema;
for (int i = 0; i < partitionFields.size(); i++)
{
auto field = schema->GetFieldByName(partitionFields[i]);
if (field)
partitionSchema.push_back(field);
else
failx("Field %s not found in RECORD definition of Parquet file.", partitionFields[i].c_str());
}

if (endsWithIgnoreCase(partOption.c_str(), "hivepartition"))
partitionType = std::make_shared<arrow::dataset::HivePartitioning>(std::make_shared<arrow::Schema>(partitionSchema));
else if (endsWithIgnoreCase(partOption.c_str(), "directorypartition"))
partitionType = std::make_shared<arrow::dataset::DirectoryPartitioning>(std::make_shared<arrow::Schema>(partitionSchema));
else
failx("Partitioning method %s is not supported.", partOption.c_str());
}

return arrow::Status::OK();
}

Expand Down Expand Up @@ -1731,6 +1801,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition)
const char *location = ""; // file name and location of where to write parquet file
const char *destination = ""; // file name and location of where to read parquet file from
const char *partitionFields = ""; // comma delimited values containing fields to partition files on
__int64 rowsize = 40000; // Size of the row groups when writing to parquet files
__int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows
bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists.
Expand Down Expand Up @@ -1780,6 +1851,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
else
failx("Unsupported compression type: %s", val);
}
else if (stricmp(optName, "partitionFields") == 0)
partitionFields = val;
else
failx("Unknown option %s", optName.str());
}
Expand All @@ -1790,7 +1863,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
}
else
{
m_parquet = std::make_shared<ParquetHelper>(option, location, destination, rowsize, batchSize, overwrite, compressionOption, activityCtx);
m_parquet = std::make_shared<ParquetHelper>(option, location, destination, rowsize, batchSize, overwrite, compressionOption, partitionFields, activityCtx);
}
}

Expand Down
Loading

0 comments on commit 8457f2e

Please sign in to comment.