Skip to content

Commit

Permalink
HPCC-30456 ParquetIO.Write() overwrites files with no warning or error
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Oct 10, 2023
1 parent a799031 commit 3028a75
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 17 deletions.
6 changes: 3 additions & 3 deletions plugins/parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ dataset := ParquetIO.Read(layout, '/source/directory/data.parquet');

#### 2. Writing Parquet Files

The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data.
The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists.

```
ParquetIO.Write(inDataset, '/output/directory/data.parquet');
ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption);
```

### Partitioned Files (Tabular Datasets)

#### 1. Reading Partitioned Files

The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files.
The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files.

```
github_dataset := ParquetIO.ReadPartition(layout, '/source/directory/partioned_dataset');
Expand Down
9 changes: 3 additions & 6 deletions plugins/parquet/examples/decimal_test.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ IMPORT PARQUET;

layout := RECORD
DECIMAL5_2 height;
END;
END;

decimal_data := DATASET([{152.25}, {125.56}], layout);

#IF(1)
ParquetIO.Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet');
#END
overwriteOption := TRUE;
ParquetIO.Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet', overwriteOption);

#IF(1)
ParquetIO.Read(layout, '/datadrive/dev/test_data/decimal.parquet');
#END
8 changes: 4 additions & 4 deletions plugins/parquet/parquet.ecllib
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ EXPORT ParquetIO := MODULE
RETURN _DoParquetReadPartition();
ENDMACRO;

EXPORT Write(outDS, filePath) := MACRO
LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath))
EXPORT Write(outDS, filePath, overwriteOption = false) := MACRO
LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption))
ENDEMBED;
_doParquetWrite(outDS);
ENDMACRO;

EXPORT WritePartition(outDS, outRows = 100000, basePath) := MACRO
LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows))
EXPORT WritePartition(outDS, outRows = 100000, basePath, overwriteOption = false) := MACRO
LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows), overwriteOpt(overwriteOption))
ENDEMBED;
_DoParquetWritePartition(outDS)
ENDMACRO;
Expand Down
22 changes: 19 additions & 3 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ extern void fail(const char *message)
* @param _batchSize The size of the batches when converting parquet columns to rows.
*/
ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination,
int _rowSize, int _batchSize, const IThorActivityContext *_activityCtx)
int _rowSize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx)
: partOption(option), location(_location), destination(destination)
{
rowSize = _rowSize;
batchSize = _batchSize;
overwrite = _overwrite;
activityCtx = _activityCtx;

pool = arrow::default_memory_pool();
Expand Down Expand Up @@ -162,7 +163,7 @@ arrow::Status ParquetHelper::openWriteFile()
writeOptions.filesystem = filesystem;
writeOptions.base_dir = destination;
writeOptions.partitioning = partitioning;
writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;
writeOptions.existing_data_behavior = overwrite ? arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore : arrow::dataset::ExistingDataBehavior::kError;
}
else
{
Expand All @@ -173,6 +174,17 @@ arrow::Status ParquetHelper::openWriteFile()
destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave()));
}

if (!overwrite)
{
StringBuffer filename;
StringBuffer path;
splitFilename(destination.c_str(), nullptr, &path, &filename, &filename, false);
Owned<IDirectoryIterator> itr = createDirectoryIterator(path.str(), filename.str());

if (itr)
failx("Error writing to file %s in directory %s because file already exists. Set the Overwrite option to True to overwrite existing files.", filename.str(), path.str());
}

std::shared_ptr<arrow::io::FileOutputStream> outfile;

PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination));
Expand Down Expand Up @@ -1633,6 +1645,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
const char *destination = ""; // file name and location of where to read parquet file from
__int64 rowsize = 40000; // Size of the row groups when writing to parquet files
__int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows
bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists.

// Iterate through user options and save them
StringArray inputOptions;
inputOptions.appendList(options, ",");
Expand All @@ -1654,6 +1668,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
rowsize = atoi(val);
else if (stricmp(optName, "BatchSize") == 0)
batchSize = atoi(val);
else if (stricmp(optName, "overwriteOpt") == 0)
overwrite = clipStrToBool(val);
else
failx("Unknown option %s", optName.str());
}
Expand All @@ -1664,7 +1680,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
}
else
{
m_parquet = std::make_shared<ParquetHelper>(option, location, destination, rowsize, batchSize, activityCtx);
m_parquet = std::make_shared<ParquetHelper>(option, location, destination, rowsize, batchSize, overwrite, activityCtx);
}
}

Expand Down
3 changes: 2 additions & 1 deletion plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ class JsonValueConverter
class ParquetHelper
{
public:
ParquetHelper(const char *option, const char *location, const char *destination, int rowsize, int _batchSize, const IThorActivityContext *_activityCtx);
ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx);
~ParquetHelper();
std::shared_ptr<arrow::Schema> getSchema();
arrow::Status openWriteFile();
Expand Down Expand Up @@ -747,6 +747,7 @@ class ParquetHelper
__int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file.
size_t batchSize = 0; // batchSize for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once.
bool partition; // Boolean variable to track whether we are writing partitioned files or not.
bool overwrite = false; // Overwrite option specified by the user. If true the plugin will overwrite files that are already exisiting. Default is false.
std::string partOption; // Read, r, Write, w, option for specifying parquet operation.
std::string location; // Location to read parquet file from.
std::string destination; // Destination to write parquet file to.
Expand Down

0 comments on commit 3028a75

Please sign in to comment.