Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

<DRAFT> Not a actual commit to merge, this is to debug main_distribution failure #14

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ concurrency:
jobs:
duckdb-stable-build:
name: Build extension binaries
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.1.0
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
with:
duckdb_version: v1.1.0
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
duckdb_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait

duckdb-stable-deploy:
name: Deploy extension binaries
needs: duckdb-stable-build
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.1.0
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main
secrets: inherit
with:
duckdb_version: v1.1.0
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
duckdb_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait
deploy_latest: true
2 changes: 1 addition & 1 deletion .github/workflows/main_distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
with:
duckdb_version: main
ci_tools_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait

2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1863 files
145 changes: 112 additions & 33 deletions src/from_substrait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@

#include "duckdb/common/types/value.hpp"
#include "duckdb/parser/expression/list.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"

#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/common/exception.hpp"
Expand All @@ -25,7 +16,24 @@
#include "google/protobuf/util/json_util.h"
#include "substrait/plan.pb.h"

#include "duckdb/main/table_description.hpp"

#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/common/helper.hpp"

#include "duckdb/main/relation.hpp"
#include "duckdb/main/relation/table_relation.hpp"
#include "duckdb/main/relation/table_function_relation.hpp"
#include "duckdb/main/relation/value_relation.hpp"
#include "duckdb/main/relation/view_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"

namespace duckdb {
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
Expand All @@ -40,7 +48,7 @@ const case_insensitive_set_t SubstraitToDuckDB::valid_extract_subfields = {
"quarter", "microsecond", "milliseconds", "second", "minute", "hour"};

string SubstraitToDuckDB::RemapFunctionName(const string &function_name) {
// Lets first drop any extension id
// Let's first drop any extension id
string name;
for (auto &c : function_name) {
if (c == ':') {
Expand All @@ -67,7 +75,9 @@ string SubstraitToDuckDB::RemoveExtension(const string &function_name) {
return name;
}

SubstraitToDuckDB::SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json) : con(con_p) {
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json,
bool acquire_lock_p)
: context(context_p), acquire_lock(acquire_lock_p) {
if (!json) {
if (!plan.ParseFromString(serialized)) {
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
Expand Down Expand Up @@ -326,15 +336,25 @@ unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformInExpr(const substrait:
return make_uniq<OperatorExpression>(ExpressionType::COMPARE_IN, std::move(values));
}

unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformNested(const substrait::Expression &sexpr) {
unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformNested(const substrait::Expression &sexpr,
RootNameIterator *iterator) {
auto &nested_expression = sexpr.nested();
if (nested_expression.has_struct_()) {
auto &struct_expression = nested_expression.struct_();
vector<unique_ptr<ParsedExpression>> children;
for (auto &child : struct_expression.fields()) {
children.emplace_back(TransformExpr(child));
}
return make_uniq<FunctionExpression>("row", std::move(children));
if (iterator && !iterator->Finished() && iterator->Unique(children.size())) {
for (auto &child : children) {
child->alias = iterator->GetCurrentName();
iterator->Next();
}
return make_uniq<FunctionExpression>("struct_pack", std::move(children));
} else {
return make_uniq<FunctionExpression>("row", std::move(children));
}

} else if (nested_expression.has_list()) {
auto &list_expression = nested_expression.list();
vector<unique_ptr<ParsedExpression>> children;
Expand All @@ -356,7 +376,11 @@ unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformNested(const substrait:
}
}

unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformExpr(const substrait::Expression &sexpr) {
unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformExpr(const substrait::Expression &sexpr,
RootNameIterator *iterator) {
if (iterator) {
iterator->Next();
}
switch (sexpr.rex_type_case()) {
case substrait::Expression::RexTypeCase::kLiteral:
return TransformLiteralExpr(sexpr);
Expand All @@ -371,7 +395,7 @@ unique_ptr<ParsedExpression> SubstraitToDuckDB::TransformExpr(const substrait::E
case substrait::Expression::RexTypeCase::kSingularOrList:
return TransformInExpr(sexpr);
case substrait::Expression::RexTypeCase::kNested:
return TransformNested(sexpr);
return TransformNested(sexpr, iterator);
case substrait::Expression::RexTypeCase::kSubquery:
default:
throw InternalException("Unsupported expression type " + to_string(sexpr.rex_type_case()));
Expand Down Expand Up @@ -453,22 +477,27 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformCrossProductOp(const substrait:
TransformOp(sub_cross.right())->Alias("right"));
}

shared_ptr<Relation> SubstraitToDuckDB::TransformFetchOp(const substrait::Rel &sop) {
shared_ptr<Relation> SubstraitToDuckDB::TransformFetchOp(const substrait::Rel &sop,
const google::protobuf::RepeatedPtrField<std::string> *names) {
auto &slimit = sop.fetch();
idx_t limit = slimit.count() == -1 ? NumericLimits<idx_t>::Maximum() : slimit.count();
idx_t offset = slimit.offset();
return make_shared_ptr<LimitRelation>(TransformOp(slimit.input()), limit, offset);
return make_shared_ptr<LimitRelation>(TransformOp(slimit.input(), names), limit, offset);
}

shared_ptr<Relation> SubstraitToDuckDB::TransformFilterOp(const substrait::Rel &sop) {
auto &sfilter = sop.filter();
return make_shared_ptr<FilterRelation>(TransformOp(sfilter.input()), TransformExpr(sfilter.condition()));
}

shared_ptr<Relation> SubstraitToDuckDB::TransformProjectOp(const substrait::Rel &sop) {
shared_ptr<Relation>
SubstraitToDuckDB::TransformProjectOp(const substrait::Rel &sop,
const google::protobuf::RepeatedPtrField<std::string> *names) {
vector<unique_ptr<ParsedExpression>> expressions;
RootNameIterator iterator(names);

for (auto &sexpr : sop.project().expressions()) {
expressions.push_back(TransformExpr(sexpr));
expressions.push_back(TransformExpr(sexpr, &iterator));
}

vector<string> mock_aliases;
Expand Down Expand Up @@ -510,16 +539,46 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformAggregateOp(const substrait::Re
return make_shared_ptr<AggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
std::move(groups));
}
unique_ptr<TableDescription> TableInfo(ClientContext &context, const string &schema_name, const string &table_name) {
// obtain the table info
auto table = Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name, table_name,
OnEntryNotFound::RETURN_NULL);
if (!table) {
return {};
}
// write the table info to the result
auto result = make_uniq<TableDescription>(INVALID_CATALOG, schema_name, table_name);
for (auto &column : table->GetColumns().Logical()) {
result->columns.emplace_back(column.Copy());
}
return result;
}

shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &sop) {
auto &sget = sop.read();
shared_ptr<Relation> scan;
auto context_wrapper = make_shared_ptr<RelationContextWrapper>(context);
if (sget.has_named_table()) {
auto table_name = sget.named_table().names(0);
// If we can't find a table with that name, let's try a view.
try {
scan = con.Table(sget.named_table().names(0));
auto table_info = TableInfo(*context, DEFAULT_SCHEMA, table_name);
if (!table_info) {
throw CatalogException("Table '%s' does not exist!", table_name);
}
if (acquire_lock) {
scan = make_shared_ptr<TableRelation>(context, std::move(table_info));

} else {
scan = make_shared_ptr<TableRelation>(context_wrapper, std::move(table_info));
}
} catch (...) {
scan = con.View(sget.named_table().names(0));
if (acquire_lock) {
scan = make_shared_ptr<ViewRelation>(context, DEFAULT_SCHEMA, table_name);

} else {
scan = make_shared_ptr<ViewRelation>(context_wrapper, DEFAULT_SCHEMA, table_name);
}
}
} else if (sget.has_local_files()) {
vector<Value> parquet_files;
Expand All @@ -540,7 +599,18 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
string name = "parquet_" + StringUtil::GenerateRandomName();
named_parameter_map_t named_parameters({{"binary_as_string", Value::BOOLEAN(false)}});
scan = con.TableFunction("parquet_scan", {Value::LIST(parquet_files)}, named_parameters)->Alias(name);
vector<Value> parameters {Value::LIST(parquet_files)};
shared_ptr<TableFunctionRelation> scan_rel;
if (acquire_lock) {
scan_rel = make_shared_ptr<TableFunctionRelation>(context, "parquet_scan", parameters,
std::move(named_parameters));
} else {
scan_rel = make_shared_ptr<TableFunctionRelation>(context_wrapper, "parquet_scan", parameters,
std::move(named_parameters));
}

auto rel = static_cast<Relation *>(scan_rel.get());
scan = rel->Alias(name);
} else if (sget.has_virtual_table()) {
// We need to handle a virtual table as a LogicalExpressionGet
auto literal_values = sget.virtual_table().values();
Expand All @@ -553,7 +623,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
expression_rows.emplace_back(expression_row);
}
scan = con.Values(expression_rows);
vector<string> column_names;
if (acquire_lock) {
scan = make_shared_ptr<ValueRelation>(context, expression_rows, column_names);

} else {
scan = make_shared_ptr<ValueRelation>(context_wrapper, expression_rows, column_names);
}
} else {
throw NotImplementedException("Unsupported type of read operator for substrait");
}
Expand All @@ -578,12 +654,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
return scan;
}

shared_ptr<Relation> SubstraitToDuckDB::TransformSortOp(const substrait::Rel &sop) {
shared_ptr<Relation> SubstraitToDuckDB::TransformSortOp(const substrait::Rel &sop,
const google::protobuf::RepeatedPtrField<std::string> *names) {
vector<OrderByNode> order_nodes;
for (auto &sordf : sop.sort().sorts()) {
order_nodes.push_back(TransformOrder(sordf));
}
return make_shared_ptr<OrderRelation>(TransformOp(sop.sort().input()), std::move(order_nodes));
return make_shared_ptr<OrderRelation>(TransformOp(sop.sort().input(), names), std::move(order_nodes));
}

static SetOperationType TransformSetOperationType(substrait::SetRel_SetOp setop) {
Expand All @@ -603,7 +680,8 @@ static SetOperationType TransformSetOperationType(substrait::SetRel_SetOp setop)
}
}

shared_ptr<Relation> SubstraitToDuckDB::TransformSetOp(const substrait::Rel &sop) {
shared_ptr<Relation> SubstraitToDuckDB::TransformSetOp(const substrait::Rel &sop,
const google::protobuf::RepeatedPtrField<std::string> *names) {
D_ASSERT(sop.has_set());
auto &set = sop.set();
auto set_op_type = set.op();
Expand All @@ -615,31 +693,32 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformSetOp(const substrait::Rel &sop
throw NotImplementedException("The amount of inputs (%d) is not supported for this set operation", input_count);
}
auto lhs = TransformOp(inputs[0]);
auto rhs = TransformOp(inputs[1]);
auto rhs = TransformOp(inputs[1], names);

return make_shared_ptr<SetOpRelation>(std::move(lhs), std::move(rhs), type);
}

shared_ptr<Relation> SubstraitToDuckDB::TransformOp(const substrait::Rel &sop) {
shared_ptr<Relation> SubstraitToDuckDB::TransformOp(const substrait::Rel &sop,
const google::protobuf::RepeatedPtrField<std::string> *names) {
switch (sop.rel_type_case()) {
case substrait::Rel::RelTypeCase::kJoin:
return TransformJoinOp(sop);
case substrait::Rel::RelTypeCase::kCross:
return TransformCrossProductOp(sop);
case substrait::Rel::RelTypeCase::kFetch:
return TransformFetchOp(sop);
return TransformFetchOp(sop, names);
case substrait::Rel::RelTypeCase::kFilter:
return TransformFilterOp(sop);
case substrait::Rel::RelTypeCase::kProject:
return TransformProjectOp(sop);
return TransformProjectOp(sop, names);
case substrait::Rel::RelTypeCase::kAggregate:
return TransformAggregateOp(sop);
case substrait::Rel::RelTypeCase::kRead:
return TransformReadOp(sop);
case substrait::Rel::RelTypeCase::kSort:
return TransformSortOp(sop);
return TransformSortOp(sop, names);
case substrait::Rel::RelTypeCase::kSet:
return TransformSetOp(sop);
return TransformSetOp(sop, names);
default:
throw InternalException("Unsupported relation type " + to_string(sop.rel_type_case()));
}
Expand Down Expand Up @@ -681,7 +760,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformRootOp(const substrait::RelRoot
const auto &column_names = sop.names();
vector<unique_ptr<ParsedExpression>> expressions;
int id = 1;
auto child = TransformOp(sop.input());
auto child = TransformOp(sop.input(), &column_names);
auto first_projection_or_table = GetProjection(*child);
if (first_projection_or_table) {
vector<ColumnDefinition> *column_definitions = &first_projection_or_table->Cast<ProjectionRelation>().columns;
Expand Down
Loading