Skip to content

Commit

Permalink
Make dwio::common::WriterOptions polymorphic (facebookincubator#10380)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#10380

Making writer options a virtual class that could be specialized for
different file formats to enable Velox users to provided them as part of a
query plan.

Today, there are a few different ways to pass writer parameters (serdeParams and
hive config). They end up being serialized to strings, so can't be used with
non-serializable configs. Moreover, file format specific configurations and code
end-up in generic parts of the code, like HiveConnector.

This will allow us to better organize file format specific options; for now only
creating the framework to contain the changes.

Reviewed By: kunalkataria, kparichay

Differential Revision: D59302873
  • Loading branch information
pedroerp authored and facebook-github-bot committed Jul 11, 2024
1 parent ba8d27d commit a4e6c23
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 60 deletions.
129 changes: 89 additions & 40 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,57 +677,106 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());
setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());

dwio::common::WriterOptions options;
// Take the one provided by the user as a starting point, or allocate a new
// one.
const auto& writerOptions = insertTableHandle_->writerOptions();
std::shared_ptr<dwio::common::WriterOptions> options = writerOptions
? writerOptions
: std::make_shared<dwio::common::WriterOptions>();

const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = getNonPartitionTypes(dataChannels_, inputType_);

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
options.spillConfig = spillConfig_;
}
options.nonReclaimableSection =
writerInfo_.back()->nonReclaimableSectionHolder.get();
options.maxStripeSize = std::optional(
hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties));
options.maxDictionaryMemory = std::optional(
hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties));
options.orcWriterIntegerDictionaryEncodingEnabled =
hiveConfig_->isOrcWriterIntegerDictionaryEncodingEnabled(
connectorSessionProperties);
options.orcWriterStringDictionaryEncodingEnabled =
hiveConfig_->isOrcWriterStringDictionaryEncodingEnabled(
connectorSessionProperties);
options.parquetWriteTimestampUnit =
hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties);
options.orcMinCompressionSize = std::optional(
hiveConfig_->orcWriterMinCompressionSize(connectorSessionProperties));
options.orcLinearStripeSizeHeuristics =
std::optional(hiveConfig_->orcWriterLinearStripeSizeHeuristics(
connectorSessionProperties));
options.serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());

if (!options->schema) {
options->schema = getNonPartitionTypes(dataChannels_, inputType_);
}

if (!options->memoryPool) {
options->memoryPool = writerInfo_.back()->writerPool.get();
}

if (!options->compressionKind) {
options->compressionKind = insertTableHandle_->compressionKind();
}

if (!options->spillConfig && canReclaim()) {
options->spillConfig = spillConfig_;
}

if (!options->nonReclaimableSection) {
options->nonReclaimableSection =
writerInfo_.back()->nonReclaimableSectionHolder.get();
}

if (!options->maxStripeSize) {
options->maxStripeSize = std::optional(
hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties));
}

if (!options->maxDictionaryMemory) {
options->maxDictionaryMemory = std::optional(
hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties));
}

if (!options->orcWriterIntegerDictionaryEncodingEnabled) {
options->orcWriterIntegerDictionaryEncodingEnabled =
hiveConfig_->isOrcWriterIntegerDictionaryEncodingEnabled(
connectorSessionProperties);
}

if (!options->orcWriterStringDictionaryEncodingEnabled) {
options->orcWriterStringDictionaryEncodingEnabled =
hiveConfig_->isOrcWriterStringDictionaryEncodingEnabled(
connectorSessionProperties);
}

if (!options->parquetWriteTimestampUnit) {
options->parquetWriteTimestampUnit =
hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties);
}

if (!options->orcMinCompressionSize) {
options->orcMinCompressionSize = std::optional(
hiveConfig_->orcWriterMinCompressionSize(connectorSessionProperties));
}

if (!options->orcLinearStripeSizeHeuristics) {
options->orcLinearStripeSizeHeuristics =
std::optional(hiveConfig_->orcWriterLinearStripeSizeHeuristics(
connectorSessionProperties));
}

if (options->serdeParameters.empty()) {
options->serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
}

auto compressionLevel =
hiveConfig_->orcWriterCompressionLevel(connectorSessionProperties);
options.zlibCompressionLevel =
compressionLevel.value_or(kDefaultZlibCompressionLevel);
options.zstdCompressionLevel =
compressionLevel.value_or(kDefaultZstdCompressionLevel);

if (!options->zlibCompressionLevel) {
options->zlibCompressionLevel =
compressionLevel.value_or(kDefaultZlibCompressionLevel);
}
if (!options->zstdCompressionLevel) {
options->zstdCompressionLevel =
compressionLevel.value_or(kDefaultZstdCompressionLevel);
}

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
auto writer = writerFactory_->createWriter(
dwio::common::FileSink::create(
writePath,
{.bufferWrite = false,
.connectorProperties = hiveConfig_->config(),
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
.pool = writerInfo_.back()->sinkPool.get(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = ioStats_.back().get()}),
{
.bufferWrite = false,
.connectorProperties = hiveConfig_->config(),
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
.pool = writerInfo_.back()->sinkPool.get(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = ioStats_.back().get(),
}),
options);
writer = maybeCreateBucketSortWriter(std::move(writer));
writers_.emplace_back(std::move(writer));
Expand Down
16 changes: 11 additions & 5 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ FOLLY_ALWAYS_INLINE std::ostream& operator<<(
class HiveInsertTableHandle;
using HiveInsertTableHandlePtr = std::shared_ptr<HiveInsertTableHandle>;

/**
* Represents a request for Hive write.
*/
/// Represents a request for Hive write.
class HiveInsertTableHandle : public ConnectorInsertTableHandle {
public:
HiveInsertTableHandle(
Expand All @@ -198,13 +196,16 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
dwio::common::FileFormat::DWRF,
std::shared_ptr<HiveBucketProperty> bucketProperty = nullptr,
std::optional<common::CompressionKind> compressionKind = {},
const std::unordered_map<std::string, std::string>& serdeParameters = {})
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr)
: inputColumns_(std::move(inputColumns)),
locationHandle_(std::move(locationHandle)),
tableStorageFormat_(tableStorageFormat),
bucketProperty_(std::move(bucketProperty)),
compressionKind_(compressionKind),
serdeParameters_(serdeParameters) {
serdeParameters_(serdeParameters),
writerOptions_(writerOptions) {
if (compressionKind.has_value()) {
VELOX_CHECK(
compressionKind.value() != common::CompressionKind_MAX,
Expand Down Expand Up @@ -235,6 +236,10 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
return serdeParameters_;
}

const std::shared_ptr<dwio::common::WriterOptions>& writerOptions() const {
return writerOptions_;
}

bool supportsMultiThreading() const override {
return true;
}
Expand Down Expand Up @@ -262,6 +267,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
const std::shared_ptr<HiveBucketProperty> bucketProperty_;
const std::optional<common::CompressionKind> compressionKind_;
const std::unordered_map<std::string, std::string> serdeParameters_;
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_;
};

/// Parameters for Hive writers.
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@ struct WriterOptions {
std::optional<uint8_t> parquetWriteTimestampUnit;
std::optional<uint8_t> zlibCompressionLevel;
std::optional<uint8_t> zstdCompressionLevel;

virtual ~WriterOptions() = default;
};

} // namespace facebook::velox::dwio::common
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/WriterFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class WriterFactory {
/// @return writer object
virtual std::unique_ptr<Writer> createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const dwio::common::WriterOptions& options) = 0;
std::shared_ptr<dwio::common::WriterOptions> options) = 0;

private:
const FileFormat format_;
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,8 +855,8 @@ dwrf::WriterOptions getDwrfOptions(const dwio::common::WriterOptions& options) {

std::unique_ptr<dwio::common::Writer> DwrfWriterFactory::createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const dwio::common::WriterOptions& options) {
auto dwrfOptions = getDwrfOptions(options);
std::shared_ptr<dwio::common::WriterOptions> options) {
auto dwrfOptions = getDwrfOptions(*options);
return std::make_unique<Writer>(std::move(sink), dwrfOptions);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class DwrfWriterFactory : public dwio::common::WriterFactory {

std::unique_ptr<dwio::common::Writer> createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const dwio::common::WriterOptions& options) override;
std::shared_ptr<dwio::common::WriterOptions> options) override;
};

} // namespace facebook::velox::dwrf
6 changes: 3 additions & 3 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,10 @@ void Writer::setMemoryReclaimers() {

std::unique_ptr<dwio::common::Writer> ParquetWriterFactory::createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const dwio::common::WriterOptions& options) {
auto parquetOptions = getParquetOptions(options);
std::shared_ptr<dwio::common::WriterOptions> options) {
auto parquetOptions = getParquetOptions(*options);
return std::make_unique<Writer>(
std::move(sink), parquetOptions, asRowType(options.schema));
std::move(sink), parquetOptions, asRowType(options->schema));
}

} // namespace facebook::velox::parquet
2 changes: 1 addition & 1 deletion velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class ParquetWriterFactory : public dwio::common::WriterFactory {

std::unique_ptr<dwio::common::Writer> createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const dwio::common::WriterOptions& options) override;
std::shared_ptr<dwio::common::WriterOptions> options) override;
};

} // namespace facebook::velox::parquet
8 changes: 4 additions & 4 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
using namespace facebook::velox;

namespace facebook::velox::exec::test {

namespace {

void writeToFile(
Expand All @@ -43,9 +42,9 @@ void writeToFile(
memory::MemoryPool* pool) {
VELOX_CHECK_GT(data.size(), 0);

dwio::common::WriterOptions options;
options.schema = data[0]->type();
options.memoryPool = pool;
auto options = std::make_shared<dwio::common::WriterOptions>();
options->schema = data[0]->type();
options->memoryPool = pool;

auto writeFile = std::make_unique<LocalWriteFile>(path, true, false);
auto sink =
Expand Down Expand Up @@ -159,6 +158,7 @@ class ServerResponse {
private:
folly::dynamic response_;
};

} // namespace

PrestoQueryRunner::PrestoQueryRunner(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/fuzzer/PrestoQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::exec::test {

template <typename T>
T extractSingleValue(const std::vector<RowVectorPtr>& data) {
auto simpleVector = data[0]->childAt(0)->as<SimpleVector<T>>();
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ PlanBuilder& PlanBuilder::tableWrite(
const dwio::common::FileFormat fileFormat,
const std::vector<std::string>& aggregates,
const std::string& connectorId,
const std::unordered_map<std::string, std::string>& serdeParameters) {
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
VELOX_CHECK_NOT_NULL(planNode_, "TableWrite cannot be the source node");
auto rowType = planNode_->outputType();

Expand Down Expand Up @@ -418,7 +419,8 @@ PlanBuilder& PlanBuilder::tableWrite(
fileFormat,
bucketProperty,
common::CompressionKind_NONE,
serdeParameters);
serdeParameters,
writerOptions);

auto insertHandle =
std::make_shared<core::InsertTableHandle>(connectorId, hiveHandle);
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ class PlanBuilder {
/// @param fileFormat File format to use for the written data.
/// @param aggregates Aggregations for column statistics collection during
/// write.
/// @param connectorId Name used to register the connector.
/// @param serdeParameters Additional parameters passed to the writer.
/// @param Option objects passed to the writer.
PlanBuilder& tableWrite(
const std::string& outputDirectoryPath,
const std::vector<std::string>& partitionBy,
Expand All @@ -446,7 +449,9 @@ class PlanBuilder {
dwio::common::FileFormat::DWRF,
const std::vector<std::string>& aggregates = {},
const std::string& connectorId = "test-hive",
const std::unordered_map<std::string, std::string>& serdeParameters = {});
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr);

/// Add a TableWriteMergeNode.
PlanBuilder& tableWriteMerge(
Expand Down

0 comments on commit a4e6c23

Please sign in to comment.