From 8457f2e5705e22331e5695c7b5a12a6dc73c1b37 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Wed, 8 Nov 2023 13:12:51 +0000 Subject: [PATCH] HPCC-30653 Improve Parquet partitioning interface --- plugins/parquet/README.md | 18 +- plugins/parquet/examples/create_partition.ecl | 15 +- plugins/parquet/parquet.ecllib | 39 +++-- plugins/parquet/parquetembed.cpp | 157 +++++++++++++----- plugins/parquet/parquetembed.hpp | 8 +- 5 files changed, 171 insertions(+), 66 deletions(-) diff --git a/plugins/parquet/README.md b/plugins/parquet/README.md index 095499a212f..bd21989ca6a 100644 --- a/plugins/parquet/README.md +++ b/plugins/parquet/README.md @@ -38,7 +38,7 @@ dataset := ParquetIO.Read(layout, '/source/directory/data.parquet'); #### 2. Writing Parquet Files -The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists. +The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists. If overwrite is set to true the plugin will check for files that match the target path passed in and delete them first before writing new files. The Parquet Plugin supports all available Arrow compression types. Specifying the compression when writing is optional and defaults to Uncompressed. The options for compressing your files are Snappy, GZip, Brotli, LZ4, LZ4Frame, LZ4Hadoop, ZSTD, Uncompressed. @@ -51,14 +51,24 @@ ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption, co ### Partitioned Files (Tabular Datasets) +The Parquet plugin supports both Hive Partitioning and Directory Partitioning. Hive partitioning uses a key-value partitioning scheme for selecting directory names. For example, the file under `dataset/year=2017/month=01/data0.parquet` contains only data for which the year equals 2017 and the month equals 01. The second partitioning scheme, Directory Partitioning, is similar, but rather than having key-value pairs the partition keys are inferred in the file path. For example, instead of having `/year=2017/month=01/day=01` the file path would be `/2017/01/01`. + #### 1. Reading Partitioned Files -The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files. +For reading partitioned files, pass in the target directory to the read function of the type of partition you are using. For directory partitioning, a list of the field names that make up the partitioning schema is required because it is not included in the directory structure like hive partitioning. ``` -github_dataset := ParquetIO.ReadPartition(layout, '/source/directory/partioned_dataset'); +github_dataset := ParquetIO.HivePartition.Read(layout, '/source/directory/partitioned_dataset'); + +github_dataset := ParquetIO.DirectoryPartition.Read(layout, 'source/directory/partitioned_dataset', 'year;month;day') ``` #### 2. Writing Partitioned Files -For partitioning parquet files all you need to do is run the Write function on Thor rather than hthor and each worker will create its own parquet file. +To select the fields that you wish to partition your data on pass in a string of semicolon seperated field names. If the fields you select create too many subdirectories you may need to partition your data on different fields. The rowSize field defaults to 100000 rows and determines how many rows to put in each part of the output files. Writing a partitioned file to a directory that already contains data will fail unless the overwrite option is set to true. If the overwrite option is set to true and the target directory is not empty the plugin will first erase the contents of the target directory before writing the new files. + +``` +ParquetIO.HivePartition.Write(outDataset, rowSize, '/source/directory/partioned_dataset', overwriteOption, 'year;month;day'); + +ParquetIO.DirectoryPartition.Read(outDataset, rowSize, '/source/directory/partioned_dataset', overwriteOption, 'year;month;day'); +``` \ No newline at end of file diff --git a/plugins/parquet/examples/create_partition.ecl b/plugins/parquet/examples/create_partition.ecl index 00b849c21bc..a6584355dcb 100755 --- a/plugins/parquet/examples/create_partition.ecl +++ b/plugins/parquet/examples/create_partition.ecl @@ -18,12 +18,11 @@ layout := RECORD INTEGER commit_date; END; -#IF(0) -github_dataset := ParquetIO.Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); -Write(DISTRIBUTE(github_dataset, SKEW(.05)), '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet'); -#END +csv_data := DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1))); +writeStep := ParquetIO.HivePartition.Write(CHOOSEN(csv_data, 1330), , '/datadrive/dev/test_data/sandbox/test_partition/', TRUE, 'commit_date;repo'); -#IF(1) -github_dataset := ParquetIO.ReadPartition(layout, '/datadrive/dev/test_data/hpcc_gh_partition'); -OUTPUT(CHOOSEN(github_dataset, 10000), NAMED('GITHUB_PARTITION')); -#END \ No newline at end of file +github_dataset := ParquetIO.HivePartition.Read(layout, '/datadrive/dev/test_data/sandbox/test_partition/'); +readStep := OUTPUT(CHOOSEN(github_dataset, 100), NAMED('GITHUB_PARTITION')); +countStep := OUTPUT(COUNT(github_dataset), NAMED('GITHUB_COUNT')); + +SEQUENTIAL(writeStep, PARALLEL(readStep, countStep)); diff --git a/plugins/parquet/parquet.ecllib b/plugins/parquet/parquet.ecllib index 2338ec28245..66529ac50dc 100644 --- a/plugins/parquet/parquet.ecllib +++ b/plugins/parquet/parquet.ecllib @@ -21,29 +21,46 @@ END; EXPORT ParquetIO := MODULE + EXPORT HivePartition := MODULE + EXPORT Write(outDS, outRows = 100000, basePath, overwriteOption = false, partitionFieldList) := FUNCTIONMACRO + LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writehivepartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption), partitionFields(partitionFieldList)) + ENDEMBED; + RETURN _DoParquetWritePartition(outDS); + ENDMACRO; + + EXPORT Read(resultLayout, basePath) := FUNCTIONMACRO + LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readhivepartition'), location(basePath)) + ENDEMBED; + RETURN _DoParquetReadPartition(); + ENDMACRO; + END; + + EXPORT DirectoryPartition := MODULE + EXPORT Write(outDS, outRows = 100000, basePath, overwriteOption = false, partitionFieldList) := FUNCTIONMACRO + LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writedirectorypartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption), partitionFields(partitionFieldList)) + ENDEMBED; + RETURN _DoParquetWritePartition(outDS); + ENDMACRO; + + EXPORT Read(resultLayout, basePath, partitionFieldList) := FUNCTIONMACRO + LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readdirectorypartition'), location(basePath)), partitionFields(partitionFieldList) + ENDEMBED; + RETURN _DoParquetReadPartition(); + ENDMACRO; + END; + EXPORT Read(resultLayout, filePath) := FUNCTIONMACRO LOCAL STREAMED DATASET(resultLayout) _DoParquetRead() := EMBED(parquet : activity, option('read'), location(filePath)) ENDEMBED; RETURN _DoParquetRead(); ENDMACRO; - EXPORT ReadPartition(resultLayout, basePath) := FUNCTIONMACRO - LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readpartition'), location(basePath)) - ENDEMBED; - RETURN _DoParquetReadPartition(); - ENDMACRO; - EXPORT Write(outDS, filePath, overwriteOption = false, compressionOption = '\'UNCOMPRESSED\'') := FUNCTIONMACRO LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption), compression(compressionOption)) ENDEMBED; RETURN _doParquetWrite(outDS); ENDMACRO; - EXPORT WritePartition(outDS, outRows = 100000, basePath, overwriteOption = false) := FUNCTIONMACRO - LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption)) - ENDEMBED; - RETURN _DoParquetWritePartition(outDS); - ENDMACRO; END; EXPORT getEmbedContext := ParquetService.getEmbedContext; diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index c605bb522f2..c00e29fe645 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -110,7 +110,7 @@ extern void fail(const char *message) * @param _batchSize The size of the batches when converting parquet columns to rows. */ ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize, - bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx) + bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx) : partOption(option), location(_location), destination(destination) { rowSize = _rowSize; @@ -118,12 +118,23 @@ ParquetHelper::ParquetHelper(const char *option, const char *_location, const ch overwrite = _overwrite; compressionOption = _compressionOption; activityCtx = _activityCtx; - pool = arrow::default_memory_pool(); parquetDoc = std::vector(rowSize); - partition = String(option).endsWith("partition"); + if (activityCtx->querySlave() == 0 && startsWith(option, "write")) + { + reportIfFailure(checkDirContents()); + } + + partition = endsWithIgnoreCase(option, "partition"); + if (partition) + { + std::stringstream ss(_partitionFields); + std::string field; + while (std::getline(ss, field, ';')) + partitionFields.push_back(field); + } } ParquetHelper::~ParquetHelper() @@ -142,6 +153,53 @@ std::shared_ptr ParquetHelper::getSchema() return schema; } +arrow::Status ParquetHelper::checkDirContents() +{ + if (destination.empty()) + { + failx("Missing target location when writing Parquet data."); + } + StringBuffer path; + StringBuffer filename; + StringBuffer ext; + splitFilename(destination.c_str(), nullptr, &path, &filename, &ext, false); + + ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); + + Owned itr = createDirectoryIterator(path.str(), filename.appendf("*%s", ext.str())); + ForEach (*itr) + { + IFile &file = itr->query(); + if (file.isFile() == fileBool::foundYes) + { + if(overwrite) + { + if (!file.remove()) + { + failx("Failed to remove file %s", file.queryFilename()); + } + } + else + { + failx("The target file %s already exists. To delete the file set the overwrite option to true.", file.queryFilename()); + } + } + else + { + if (overwrite) + { + reportIfFailure(filesystem->DeleteDirContents(path.str())); + break; + } + else + { + failx("The target directory %s is not empty. To delete the contents of the directory set the overwrite option to true.", path.str()); + } + } + } + return arrow::Status::OK(); +} + /** * @brief Opens the write stream with the schema and destination. T * @@ -154,41 +212,18 @@ arrow::Status ParquetHelper::openWriteFile() if (partition) { ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); - reportIfFailure(filesystem->DeleteDirContents(destination)); - auto partition_schema = arrow::schema({schema->field(5)}); - auto format = std::make_shared(); - auto partitioning = std::make_shared(partition_schema); - writeOptions.file_write_options = format->DefaultWriteOptions(); writeOptions.filesystem = filesystem; writeOptions.base_dir = destination; - writeOptions.partitioning = partitioning; - writeOptions.existing_data_behavior = overwrite ? arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore : arrow::dataset::ExistingDataBehavior::kError; + writeOptions.partitioning = partitionType; + writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore; } else { - StringBuffer filename; - StringBuffer path; - StringBuffer ext; - splitFilename(destination.c_str(), nullptr, &path, &filename, &ext, false); + if(!endsWith(destination.c_str(), ".parquet")) + failx("Error opening file: Invalid file extension for file %s", destination.c_str()); - if(!strieq(ext, ".parquet")) - failx("Error opening file: Invalid file extension %s", ext.str()); - - Owned itr = createDirectoryIterator(path.str(), filename.append("*.parquet")); - - ForEach(*itr) - { - if (overwrite) - { - IFile &file = itr->query(); - if(!file.remove()) - failx("File %s could not be overwritten.", file.queryFilename()); - } - else - failx("Cannot write to file %s because it already exists. To delete it set the overwrite option to true.", destination.c_str()); - } // Currently under the assumption that all channels and workers are given a worker id and no matter // the configuration will show up in activityCtx->numSlaves() if (activityCtx->numSlaves() > 1) @@ -233,8 +268,18 @@ arrow::Status ParquetHelper::openReadFile() std::shared_ptr format = std::make_shared(); arrow::dataset::FileSystemFactoryOptions options; - options.partitioning = arrow::dataset::HivePartitioning::MakeFactory(); // TODO set other partitioning types - + if (endsWithIgnoreCase(partOption.c_str(), "hivepartition")) + { + options.partitioning = arrow::dataset::HivePartitioning::MakeFactory(); + } + else if (endsWithIgnoreCase(partOption.c_str(), "directorypartition")) + { + options.partitioning = arrow::dataset::DirectoryPartitioning::MakeFactory(partitionFields); + } + else + { + failx("Incorrect partitioning type %s.", partOption.c_str()); + } // Create the dataset factory PARQUET_ASSIGN_OR_THROW(auto dataset_factory, arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options)); @@ -421,14 +466,13 @@ arrow::Status ParquetHelper::processReadFile() { // rowsProcessed starts at zero and we read in batches until it is equal to rowsCount rowsProcessed = 0; + totalRowsProcessed = 0; PARQUET_ASSIGN_OR_THROW(rbatchReader, scanner->ToRecordBatchReader()); rbatchItr = arrow::RecordBatchReader::RecordBatchReaderIterator(rbatchReader.get()); // Divide the work among any number of workers - PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr); - PARQUET_ASSIGN_OR_THROW(float total_rows, scanner->CountRows()); - batchSize = batch->num_rows(); - divide_row_groups(activityCtx, std::ceil(total_rows / batchSize), tableCount, startRowGroup); - if (tableCount != 0) + PARQUET_ASSIGN_OR_THROW(auto total_rows, scanner->CountRows()); + divide_row_groups(activityCtx, total_rows, totalRowCount, startRow); + if (totalRowCount != 0) { std::shared_ptr table; PARQUET_ASSIGN_OR_THROW(table, queryRows()); @@ -515,7 +559,10 @@ char ParquetHelper::queryPartOptions() */ bool ParquetHelper::shouldRead() { - return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount); + if (partition) + return !(totalRowsProcessed >= totalRowCount); + else + return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount); } __int64 &ParquetHelper::getRowsProcessed() @@ -557,12 +604,13 @@ arrow::Result> ParquetHelper::queryRows() { if (tablesProcessed == 0) { - __int64 offset = 0; - while (offset < startRowGroup) + __int64 offset = (*rbatchItr)->get()->num_rows(); + while (offset < startRow) { rbatchItr++; - offset++; + offset += (*rbatchItr)->get()->num_rows(); } + rowsProcessed = (*rbatchItr)->get()->num_rows() - (offset - startRow); } PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr); rbatchItr++; @@ -594,6 +642,7 @@ std::unordered_map> &ParquetHelper::n chunkTable(table); } } + totalRowsProcessed++; return parquetTable; } @@ -741,6 +790,27 @@ arrow::Status ParquetHelper::fieldsToSchema(const RtlTypeInfo *typeInfo) } schema = std::make_shared(arrow_fields); + + if (partition) + { + arrow::FieldVector partitionSchema; + for (int i = 0; i < partitionFields.size(); i++) + { + auto field = schema->GetFieldByName(partitionFields[i]); + if (field) + partitionSchema.push_back(field); + else + failx("Field %s not found in RECORD definition of Parquet file.", partitionFields[i].c_str()); + } + + if (endsWithIgnoreCase(partOption.c_str(), "hivepartition")) + partitionType = std::make_shared(std::make_shared(partitionSchema)); + else if (endsWithIgnoreCase(partOption.c_str(), "directorypartition")) + partitionType = std::make_shared(std::make_shared(partitionSchema)); + else + failx("Partitioning method %s is not supported.", partOption.c_str()); + } + return arrow::Status::OK(); } @@ -1731,6 +1801,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition) const char *location = ""; // file name and location of where to write parquet file const char *destination = ""; // file name and location of where to read parquet file from + const char *partitionFields = ""; // comma delimited values containing fields to partition files on __int64 rowsize = 40000; // Size of the row groups when writing to parquet files __int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists. @@ -1780,6 +1851,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ else failx("Unsupported compression type: %s", val); } + else if (stricmp(optName, "partitionFields") == 0) + partitionFields = val; else failx("Unknown option %s", optName.str()); } @@ -1790,7 +1863,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ } else { - m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, overwrite, compressionOption, activityCtx); + m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, overwrite, compressionOption, partitionFields, activityCtx); } } diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index 9027c2e1891..addfbfe03ae 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -785,9 +785,10 @@ class JsonValueConverter class ParquetHelper { public: - ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx); + ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx); ~ParquetHelper(); std::shared_ptr getSchema(); + arrow::Status checkDirContents(); arrow::Status openWriteFile(); arrow::Status openReadFile(); arrow::Status processReadFile(); @@ -818,6 +819,9 @@ class ParquetHelper __int64 currentRow = 0; __int64 rowSize = 0; // The maximum size of each parquet row group. __int64 tablesProcessed = 0; // Current RowGroup that has been read from the input file. + __int64 totalRowsProcessed = 0; // Total number of rows processed of partitioned dataset. We cannot get the total number of chunks and they are variable sizes. + __int64 totalRowCount = 0; // Total number of rows in a partition dataset. + __int64 startRow = 0; // The starting row in a partitioned dataset. __int64 rowsProcessed = 0; // Current Row that has been read from the RowGroup __int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker __int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading. @@ -836,6 +840,8 @@ class ParquetHelper std::shared_ptr scanner = nullptr; // Scanner for reading through partitioned files. PARTITION arrow::dataset::FileSystemDatasetWriteOptions writeOptions; // Write options for writing partitioned files. PARTITION arrow::Compression::type compressionOption = arrow::Compression::type::UNCOMPRESSED; + std::shared_ptr partitionType = nullptr; + std::vector partitionFields; std::shared_ptr rbatchReader = nullptr; arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; std::vector<__int64> fileTableCounts;