Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table write operator and builder #1094

Merged
merged 5 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Default implementor for implementing logical to physical translation. "Default" here means all
Expand Down Expand Up @@ -129,6 +130,11 @@ public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) {
return plan.build();
}

@Override
public PhysicalPlan visitTableWriteBuilder(TableWriteBuilder plan, C context) {
return plan.build(visitChild(plan, context));
}

@Override
public PhysicalPlan visitRelation(LogicalRelation node, C context) {
throw new UnsupportedOperationException("Storage engine is responsible for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
@UtilityClass
public class LogicalPlanDSL {

public static LogicalPlan write(LogicalPlan input, Table table, List<String> columns) {
return new LogicalWrite(input, table, columns);
}

public static LogicalPlan aggregation(
LogicalPlan input, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
return new LogicalAggregation(input, aggregatorList, groupByList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.opensearch.sql.planner.logical;

import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* The visitor of {@link LogicalPlan}.
Expand All @@ -28,6 +29,14 @@ public R visitTableScanBuilder(TableScanBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitWrite(LogicalWrite plan, C context) {
return visitNode(plan, context);
}

public R visitTableWriteBuilder(TableWriteBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitFilter(LogicalFilter plan, C context) {
return visitNode(plan, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.storage.Table;

/**
* Logical operator for insert statement.
*/
@EqualsAndHashCode(callSuper = true)
@Getter
@ToString
public class LogicalWrite extends LogicalPlan {

/** Table that handles the write operation. */
private final Table table;

/** Optional column name list specified in insert statement. */
private final List<String> columns;

/**
* Construct a logical write with given child node, table and column name list.
*/
public LogicalWrite(LogicalPlan child, Table table, List<String> columns) {
super(Collections.singletonList(child));
this.table = table;
this.columns = columns;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitWrite(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;

/**
* {@link LogicalPlan} Optimizer.
Expand Down Expand Up @@ -55,7 +56,8 @@ public static LogicalPlanOptimizer create() {
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT));
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

Expand Down Expand Up @@ -110,4 +111,14 @@ public static Property<LogicalPlan, Table> table() {
? Optional.of(((LogicalRelation) plan).getTable())
: Optional.empty());
}

/**
* Logical write with table field.
*/
public static Property<LogicalPlan, Table> writeTable() {
return Property.optionalProperty("table",
plan -> plan instanceof LogicalWrite
? Optional.of(((LogicalWrite) plan).getTable())
: Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.write;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.writeTable;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Rule that replaces logical write operator with {@link TableWriteBuilder} for later optimization
* and transforming to physical operator.
*/
public class CreateTableWriteBuilder implements Rule<LogicalWrite> {

/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalWrite> pattern;

/**
* Construct create table write builder rule.
*/
public CreateTableWriteBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalWrite.class)
.with(writeTable().capturedAs(capture));
}

@Override
public LogicalPlan apply(LogicalWrite plan, Captures captures) {
return captures.get(capture).createWriteBuilder(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.opensearch.sql.planner.physical;

import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.write.TableWriteOperator;

/**
* The visitor of {@link PhysicalPlan}.
Expand Down Expand Up @@ -36,6 +37,10 @@ public R visitTableScan(TableScanOperator node, C context) {
return visitNode(node, context);
}

public R visitTableWrite(TableWriteOperator node, C context) {
return visitNode(node, context);
}

public R visitProject(ProjectOperator node, C context) {
return visitNode(node, context);
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Table.
Expand Down Expand Up @@ -73,6 +75,17 @@ default TableScanBuilder createScanBuilder() {
return null; // TODO: Enforce all subclasses to implement this later
}

/*
* Create table write builder for logical to physical transformation.
*
* @param plan logical write plan
* @return table write builder
*/
default TableWriteBuilder createWriteBuilder(LogicalWrite plan) {
throw new UnsupportedOperationException(
"Write operation is not supported on current table");
}

/**
* Translate {@link Table} to {@link StreamingSource} if possible.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.write;

import java.util.Collections;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* A {@link TableWriteBuilder} represents transition state between logical planning and physical
* planning for table write operator. The concrete implementation class gets involved in the logical
* optimization through this abstraction and thus transform to specific {@link TableWriteOperator}
* in a certain storage engine.
*/
public abstract class TableWriteBuilder extends LogicalPlan {

/**
* Construct table write builder with child node.
*/
public TableWriteBuilder(LogicalPlan child) {
super(Collections.singletonList(child));
}

/**
* Build table write operator with given child node.
*
* @param child child operator node
* @return table write operator
*/
public abstract TableWriteOperator build(PhysicalPlan child);

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableWriteBuilder(this, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.write;

import java.util.Collections;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;

/**
* {@link TableWriteOperator} is the abstraction for data source to implement different physical
* write operator on a data source. This is also to avoid "polluting" physical plan visitor by
* concrete table scan implementation.
*/
@RequiredArgsConstructor
public abstract class TableWriteOperator extends PhysicalPlan {

/** Input physical node. */
protected final PhysicalPlan input;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableWrite(this, context);
}

@Override
public List<PhysicalPlan> getChild() {
return Collections.singletonList(input);
}

/**
* Explain the execution plan.
*
* @return explain output
*/
public abstract String explain();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;
import org.opensearch.sql.storage.write.TableWriteOperator;

@ExtendWith(MockitoExtension.class)
class DefaultImplementorTest {
Expand Down Expand Up @@ -212,4 +214,17 @@ public TableScanOperator build() {
};
assertEquals(tableScanOperator, tableScanBuilder.accept(implementor, null));
}

@Test
public void visitTableWriteBuilderShouldBuildTableWriteOperator() {
LogicalPlan child = values();
TableWriteOperator tableWriteOperator = Mockito.mock(TableWriteOperator.class);
TableWriteBuilder logicalPlan = new TableWriteBuilder(child) {
@Override
public TableWriteOperator build(PhysicalPlan child) {
return tableWriteOperator;
}
};
assertEquals(tableWriteOperator, logicalPlan.accept(implementor, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -31,9 +32,12 @@
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.aggregation.Aggregator;
import org.opensearch.sql.expression.window.WindowDefinition;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;
import org.opensearch.sql.storage.write.TableWriteOperator;

/**
* Todo. Temporary added for UT coverage, Will be removed.
Expand Down Expand Up @@ -83,6 +87,19 @@ public TableScanOperator build() {
assertNull(tableScanBuilder.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));

LogicalPlan write = LogicalPlanDSL.write(null, table, Collections.emptyList());
assertNull(write.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));

TableWriteBuilder tableWriteBuilder = new TableWriteBuilder(null) {
@Override
public TableWriteOperator build(PhysicalPlan child) {
return null;
}
};
assertNull(tableWriteBuilder.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));

LogicalPlan filter = LogicalPlanDSL.filter(relation, expression);
assertNull(filter.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));
Expand Down
Loading