diff --git a/plugins/parquet/README.md b/plugins/parquet/README.md index 241d89d7a51..39e4358ab69 100644 --- a/plugins/parquet/README.md +++ b/plugins/parquet/README.md @@ -38,17 +38,17 @@ dataset := ParquetIO.Read(layout, '/source/directory/data.parquet'); #### 2. Writing Parquet Files -The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. +The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists. ``` -ParquetIO.Write(inDataset, '/output/directory/data.parquet'); +ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption); ``` ### Partitioned Files (Tabular Datasets) #### 1. Reading Partitioned Files -The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files. +The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files. ``` github_dataset := ParquetIO.ReadPartition(layout, '/source/directory/partioned_dataset'); diff --git a/plugins/parquet/examples/decimal_test.ecl b/plugins/parquet/examples/decimal_test.ecl index 39a7cb0cf84..f7dc79e20b8 100644 --- a/plugins/parquet/examples/decimal_test.ecl +++ b/plugins/parquet/examples/decimal_test.ecl @@ -4,14 +4,11 @@ IMPORT PARQUET; layout := RECORD DECIMAL5_2 height; -END; +END; decimal_data := DATASET([{152.25}, {125.56}], layout); -#IF(1) -ParquetIO.Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet'); -#END +overwriteOption := TRUE; +ParquetIO.Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet', overwriteOption); -#IF(1) ParquetIO.Read(layout, '/datadrive/dev/test_data/decimal.parquet'); -#END diff --git a/plugins/parquet/parquet.ecllib b/plugins/parquet/parquet.ecllib index 07529024153..7db75888133 100644 --- a/plugins/parquet/parquet.ecllib +++ b/plugins/parquet/parquet.ecllib @@ -33,14 +33,14 @@ EXPORT ParquetIO := MODULE RETURN _DoParquetReadPartition(); ENDMACRO; - EXPORT Write(outDS, filePath) := MACRO - LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath)) + EXPORT Write(outDS, filePath, overwriteOption = false) := MACRO + LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption)) ENDEMBED; _doParquetWrite(outDS); ENDMACRO; - EXPORT WritePartition(outDS, outRows = 100000, basePath) := MACRO - LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows)) + EXPORT WritePartition(outDS, outRows = 100000, basePath, overwriteOption = false) := MACRO + LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption)) ENDEMBED; _DoParquetWritePartition(outDS) ENDMACRO; diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 366b76279c3..934ae3dcf43 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -110,11 +110,12 @@ extern void fail(const char *message) * @param _batchSize The size of the batches when converting parquet columns to rows. */ ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, - int _rowSize, int _batchSize, const IThorActivityContext *_activityCtx) + int _rowSize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx) : partOption(option), location(_location), destination(destination) { rowSize = _rowSize; batchSize = _batchSize; + overwrite = _overwrite; activityCtx = _activityCtx; pool = arrow::default_memory_pool(); @@ -162,7 +163,7 @@ arrow::Status ParquetHelper::openWriteFile() writeOptions.filesystem = filesystem; writeOptions.base_dir = destination; writeOptions.partitioning = partitioning; - writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore; + writeOptions.existing_data_behavior = overwrite ? arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore : arrow::dataset::ExistingDataBehavior::kError; } else { @@ -173,6 +174,17 @@ arrow::Status ParquetHelper::openWriteFile() destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave())); } + if (!overwrite) + { + StringBuffer filename; + StringBuffer path; + splitFilename(destination.c_str(), nullptr, &path, &filename, &filename, false); + Owned itr = createDirectoryIterator(path.str(), filename.str()); + + if (itr) + failx("Error writing to file %s in directory %s because file already exists. Set the Overwrite option to True to overwrite existing files.", filename.str(), path.str()); + } + std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination)); @@ -1633,6 +1645,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ const char *destination = ""; // file name and location of where to read parquet file from __int64 rowsize = 40000; // Size of the row groups when writing to parquet files __int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows + bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists. + // Iterate through user options and save them StringArray inputOptions; inputOptions.appendList(options, ","); @@ -1654,6 +1668,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ rowsize = atoi(val); else if (stricmp(optName, "BatchSize") == 0) batchSize = atoi(val); + else if (stricmp(optName, "overwriteOpt") == 0) + overwrite = clipStrToBool(val); else failx("Unknown option %s", optName.str()); } @@ -1664,7 +1680,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ } else { - m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, activityCtx); + m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, overwrite, activityCtx); } } diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index 8da6377116b..18bb1b363c4 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -708,7 +708,7 @@ class JsonValueConverter class ParquetHelper { public: - ParquetHelper(const char *option, const char *location, const char *destination, int rowsize, int _batchSize, const IThorActivityContext *_activityCtx); + ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx); ~ParquetHelper(); std::shared_ptr getSchema(); arrow::Status openWriteFile(); @@ -747,6 +747,7 @@ class ParquetHelper __int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file. size_t batchSize = 0; // batchSize for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once. bool partition; // Boolean variable to track whether we are writing partitioned files or not. + bool overwrite = false; // Overwrite option specified by the user. If true the plugin will overwrite files that are already exisiting. Default is false. std::string partOption; // Read, r, Write, w, option for specifying parquet operation. std::string location; // Location to read parquet file from. std::string destination; // Destination to write parquet file to.