Skip to content

Commit

Permalink
Fix plan serialization of HiveInsertTableHandle
Browse files Browse the repository at this point in the history
Summary:
Fixing a few bugs and missing member serialization for full
support of HiveInsertTableHandle serialization.
* file format type
* compression kind
* bucket information
* serde parameters

Differential Revision: D63811090
  • Loading branch information
pedroerp authored and facebook-github-bot committed Oct 4, 2024
1 parent fef4915 commit 9733ffb
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 11 deletions.
4 changes: 1 addition & 3 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ std::unique_ptr<folly::io::Codec> compressionKindToCodec(CompressionKind kind);

CompressionKind codecTypeToCompressionKind(folly::io::CodecType type);

/**
* Get the name of the CompressionKind.
*/
/// Get the name of the CompressionKind.
std::string compressionKindToString(CompressionKind kind);

CompressionKind stringToCompressionKind(const std::string& kind);
Expand Down
53 changes: 51 additions & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/ITypedExpr.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/SortingWriter.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/SortBuffer.h"
Expand All @@ -36,8 +37,8 @@
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::connector::hive {

namespace {

// Returns the type of non-partition data columns.
RowTypePtr getNonPartitionTypes(
const std::vector<column_index_t>& dataCols,
Expand Down Expand Up @@ -953,6 +954,21 @@ folly::dynamic HiveInsertTableHandle::serialize() const {

obj["inputColumns"] = arr;
obj["locationHandle"] = locationHandle_->serialize();
obj["tableStorageFormat"] = dwio::common::toString(tableStorageFormat_);

if (bucketProperty_) {
obj["bucketProperty"] = bucketProperty_->serialize();
}

if (compressionKind_.has_value()) {
obj["compressionKind"] = common::compressionKindToString(*compressionKind_);
}

folly::dynamic params = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters_) {
params[key] = value;
}
obj["serdeParameters"] = params;
return obj;
}

Expand All @@ -962,7 +978,33 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create(
obj["inputColumns"]);
auto locationHandle =
ISerializable::deserialize<LocationHandle>(obj["locationHandle"]);
return std::make_shared<HiveInsertTableHandle>(inputColumns, locationHandle);
auto storageFormat =
dwio::common::toFileFormat(obj["tableStorageFormat"].asString());

std::optional<common::CompressionKind> compressionKind = std::nullopt;
if (obj.count("compressionKind") > 0) {
compressionKind =
common::stringToCompressionKind(obj["compressionKind"].asString());
}

std::shared_ptr<const HiveBucketProperty> bucketProperty;
if (obj.count("bucketProperty") > 0) {
bucketProperty =
ISerializable::deserialize<HiveBucketProperty>(obj["bucketProperty"]);
}

std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& pair : obj["serdeParameters"].items()) {
serdeParameters.emplace(pair.first.asString(), pair.second.asString());
}

return std::make_shared<HiveInsertTableHandle>(
inputColumns,
locationHandle,
storageFormat,
bucketProperty,
compressionKind,
serdeParameters);
}

void HiveInsertTableHandle::registerSerDe() {
Expand All @@ -987,6 +1029,13 @@ std::string HiveInsertTableHandle::toString() const {
if (bucketProperty_) {
out << ", bucketProperty: " << bucketProperty_->toString();
}

if (serdeParameters_.size() > 0) {
out << ", serdeParameters: ";
for (const auto& [key, value] : serdeParameters_) {
out << "[" << key << ", " << value << "] ";
}
}
out << "]";
return out.str();
}
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
std::shared_ptr<const LocationHandle> locationHandle,
dwio::common::FileFormat tableStorageFormat =
dwio::common::FileFormat::DWRF,
std::shared_ptr<HiveBucketProperty> bucketProperty = nullptr,
std::shared_ptr<const HiveBucketProperty> bucketProperty = nullptr,
std::optional<common::CompressionKind> compressionKind = {},
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
Expand Down Expand Up @@ -273,7 +273,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
const std::vector<std::shared_ptr<const HiveColumnHandle>> inputColumns_;
const std::shared_ptr<const LocationHandle> locationHandle_;
const dwio::common::FileFormat tableStorageFormat_;
const std::shared_ptr<HiveBucketProperty> bucketProperty_;
const std::shared_ptr<const HiveBucketProperty> bucketProperty_;
const std::optional<common::CompressionKind> compressionKind_;
const std::unordered_map<std::string, std::string> serdeParameters_;
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_;
Expand Down
35 changes: 32 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/expression/ExprToSubfieldFilter.h"

using namespace facebook::velox;
namespace facebook::velox::connector::hive::test {
namespace {

using namespace facebook::velox::exec;
using namespace facebook::velox::connector::hive;

class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
protected:
Expand All @@ -34,6 +35,8 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
HiveColumnHandle::registerSerDe();
LocationHandle::registerSerDe();
HiveInsertTableHandle::registerSerDe();
HiveBucketProperty::registerSerDe();
HiveSortingColumn::registerSerDe();
}

template <typename T>
Expand Down Expand Up @@ -121,12 +124,38 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) {
tableColumnTypes.emplace_back(rowType);
tableColumnTypes.emplace_back(arrType);
tableColumnTypes.emplace_back(varcharType);

auto locationHandle = exec::test::HiveConnectorTestBase::makeLocationHandle(
"targetDirectory",
std::optional("writeDirectory"),
LocationHandle::TableType::kNew);

auto bucketProperty = std::make_shared<HiveBucketProperty>(
HiveBucketProperty::Kind::kPrestoNative,
1024,
std::vector<std::string>{"id", "row"},
std::vector<TypePtr>{VARCHAR(), BOOLEAN()},
std::vector<std::shared_ptr<const HiveSortingColumn>>{
std::make_shared<HiveSortingColumn>(
"id", core::SortOrder{true, true})});

std::unordered_map<std::string, std::string> serdeParameters = {
{"key1", "value1"},
{"key2", "value2"},
};

auto hiveInsertTableHandle =
exec::test::HiveConnectorTestBase::makeHiveInsertTableHandle(
tableColumnNames, tableColumnTypes, {"loc"}, locationHandle);
tableColumnNames,
tableColumnTypes,
{"loc"},
bucketProperty,
locationHandle,
dwio::common::FileFormat::NIMBLE,
common::CompressionKind::CompressionKind_SNAPPY,
serdeParameters);
testSerde(*hiveInsertTableHandle);
}

} // namespace
} // namespace facebook::velox::connector::hive::test
1 change: 1 addition & 0 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
connector::hive::LocationHandle::TableType::kNew),
fileFormat,
CompressionKind::CompressionKind_ZSTD,
{},
writerOptions);
}

Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
std::move(locationHandle),
tableStorageFormat,
compressionKind,
{},
writerOptions);
}

Expand All @@ -289,6 +290,7 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
std::shared_ptr<connector::hive::LocationHandle> locationHandle,
const dwio::common::FileFormat tableStorageFormat,
const std::optional<common::CompressionKind> compressionKind,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
columnHandles;
Expand Down Expand Up @@ -345,7 +347,7 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
tableStorageFormat,
bucketProperty,
compressionKind,
std::unordered_map<std::string, std::string>{},
serdeParameters,
writerOptions);
}

Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class HiveConnectorTestBase : public OperatorTestBase {
/// table.
/// @param locationHandle Location handle for the table write.
/// @param compressionKind compression algorithm to use for table write.
/// @param serdeParameters Table writer configuration parameters.
static std::shared_ptr<connector::hive::HiveInsertTableHandle>
makeHiveInsertTableHandle(
const std::vector<std::string>& tableColumnNames,
Expand All @@ -183,6 +184,7 @@ class HiveConnectorTestBase : public OperatorTestBase {
const dwio::common::FileFormat tableStorageFormat =
dwio::common::FileFormat::DWRF,
const std::optional<common::CompressionKind> compressionKind = {},
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr);

Expand Down

0 comments on commit 9733ffb

Please sign in to comment.