diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index 4a5276418d..4a6d4d8222 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -35,6 +35,7 @@ import org.opensearch.sql.planner.physical.ValuesOperator; import org.opensearch.sql.planner.physical.WindowOperator; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; /** * Default implementor for implementing logical to physical translation. "Default" here means all @@ -129,6 +130,11 @@ public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) { return plan.build(); } + @Override + public PhysicalPlan visitTableWriteBuilder(TableWriteBuilder plan, C context) { + return plan.build(visitChild(plan, context)); + } + @Override public PhysicalPlan visitRelation(LogicalRelation node, C context) { throw new UnsupportedOperationException("Storage engine is responsible for " diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java index b18e099afa..a192966287 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java @@ -32,6 +32,10 @@ @UtilityClass public class LogicalPlanDSL { + public static LogicalPlan write(LogicalPlan input, Table table, List columns) { + return new LogicalWrite(input, table, columns); + } + public static LogicalPlan aggregation( LogicalPlan input, List aggregatorList, List groupByList) { return new LogicalAggregation(input, aggregatorList, groupByList); diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java index 0386eb6e2a..9a41072fe7 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -7,6 +7,7 @@ package org.opensearch.sql.planner.logical; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; /** * The visitor of {@link LogicalPlan}. @@ -28,6 +29,14 @@ public R visitTableScanBuilder(TableScanBuilder plan, C context) { return visitNode(plan, context); } + public R visitWrite(LogicalWrite plan, C context) { + return visitNode(plan, context); + } + + public R visitTableWriteBuilder(TableWriteBuilder plan, C context) { + return visitNode(plan, context); + } + public R visitFilter(LogicalFilter plan, C context) { return visitNode(plan, context); } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalWrite.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalWrite.java new file mode 100644 index 0000000000..496e6009e3 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalWrite.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.logical; + +import java.util.Collections; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.storage.Table; + +/** + * Logical operator for insert statement. + */ +@EqualsAndHashCode(callSuper = true) +@Getter +@ToString +public class LogicalWrite extends LogicalPlan { + + /** Table that handles the write operation. */ + private final Table table; + + /** Optional column name list specified in insert statement. */ + private final List columns; + + /** + * Construct a logical write with given child node, table and column name list. + */ + public LogicalWrite(LogicalPlan child, Table table, List columns) { + super(Collections.singletonList(child)); + this.table = table; + this.columns = columns; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitWrite(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java index f241e76993..70847b869b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java @@ -17,6 +17,7 @@ import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort; import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder; import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown; +import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder; /** * {@link LogicalPlan} Optimizer. @@ -55,7 +56,8 @@ public static LogicalPlanOptimizer create() { TableScanPushDown.PUSH_DOWN_SORT, TableScanPushDown.PUSH_DOWN_LIMIT, TableScanPushDown.PUSH_DOWN_HIGHLIGHT, - TableScanPushDown.PUSH_DOWN_PROJECT)); + TableScanPushDown.PUSH_DOWN_PROJECT, + new CreateTableWriteBuilder())); } /** diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java index 0ba478594a..856d8df7ea 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java @@ -20,6 +20,7 @@ import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; import org.opensearch.sql.planner.logical.LogicalSort; +import org.opensearch.sql.planner.logical.LogicalWrite; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; @@ -110,4 +111,14 @@ public static Property table() { ? Optional.of(((LogicalRelation) plan).getTable()) : Optional.empty()); } + + /** + * Logical write with table field. + */ + public static Property writeTable() { + return Property.optionalProperty("table", + plan -> plan instanceof LogicalWrite + ? Optional.of(((LogicalWrite) plan).getTable()) + : Optional.empty()); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/write/CreateTableWriteBuilder.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/write/CreateTableWriteBuilder.java new file mode 100644 index 0000000000..4fbf676862 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/write/CreateTableWriteBuilder.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.optimizer.rule.write; + +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.writeTable; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import lombok.Getter; +import lombok.experimental.Accessors; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalWrite; +import org.opensearch.sql.planner.optimizer.Rule; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.write.TableWriteBuilder; + +/** + * Rule that replaces logical write operator with {@link TableWriteBuilder} for later optimization + * and transforming to physical operator. + */ +public class CreateTableWriteBuilder implements Rule { + + /** Capture the table inside matched logical relation operator. */ + private final Capture capture; + + /** Pattern that matches logical relation operator. */ + @Accessors(fluent = true) + @Getter + private final Pattern pattern; + + /** + * Construct create table write builder rule. + */ + public CreateTableWriteBuilder() { + this.capture = Capture.newCapture(); + this.pattern = Pattern.typeOf(LogicalWrite.class) + .with(writeTable().capturedAs(capture)); + } + + @Override + public LogicalPlan apply(LogicalWrite plan, Captures captures) { + return captures.get(capture).createWriteBuilder(plan); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index 63dd05cc6b..d4bc4a1ea9 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -7,6 +7,7 @@ package org.opensearch.sql.planner.physical; import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.write.TableWriteOperator; /** * The visitor of {@link PhysicalPlan}. @@ -36,6 +37,10 @@ public R visitTableScan(TableScanOperator node, C context) { return visitNode(node, context); } + public R visitTableWrite(TableWriteOperator node, C context) { + return visitNode(node, context); + } + public R visitProject(ProjectOperator node, C context) { return visitNode(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index ae0aaaf17b..496281fa8d 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -10,8 +10,10 @@ import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.executor.streaming.StreamingSource; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalWrite; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; /** * Table. @@ -73,6 +75,17 @@ default TableScanBuilder createScanBuilder() { return null; // TODO: Enforce all subclasses to implement this later } + /* + * Create table write builder for logical to physical transformation. + * + * @param plan logical write plan + * @return table write builder + */ + default TableWriteBuilder createWriteBuilder(LogicalWrite plan) { + throw new UnsupportedOperationException( + "Write operation is not supported on current table"); + } + /** * Translate {@link Table} to {@link StreamingSource} if possible. */ diff --git a/core/src/main/java/org/opensearch/sql/storage/write/TableWriteBuilder.java b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteBuilder.java new file mode 100644 index 0000000000..54dfa5d557 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.write; + +import java.util.Collections; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * A {@link TableWriteBuilder} represents transition state between logical planning and physical + * planning for table write operator. The concrete implementation class gets involved in the logical + * optimization through this abstraction and thus transform to specific {@link TableWriteOperator} + * in a certain storage engine. + */ +public abstract class TableWriteBuilder extends LogicalPlan { + + /** + * Construct table write builder with child node. + */ + public TableWriteBuilder(LogicalPlan child) { + super(Collections.singletonList(child)); + } + + /** + * Build table write operator with given child node. + * + * @param child child operator node + * @return table write operator + */ + public abstract TableWriteOperator build(PhysicalPlan child); + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitTableWriteBuilder(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/storage/write/TableWriteOperator.java b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteOperator.java new file mode 100644 index 0000000000..92cdc6eb41 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteOperator.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.write; + +import java.util.Collections; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; + +/** + * {@link TableWriteOperator} is the abstraction for data source to implement different physical + * write operator on a data source. This is also to avoid "polluting" physical plan visitor by + * concrete table scan implementation. + */ +@RequiredArgsConstructor +public abstract class TableWriteOperator extends PhysicalPlan { + + /** Input physical node. */ + protected final PhysicalPlan input; + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return visitor.visitTableWrite(this, context); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + + /** + * Explain the execution plan. + * + * @return explain output + */ + public abstract String explain(); +} diff --git a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java index 2322e4684e..017cfb60ea 100644 --- a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java @@ -58,6 +58,8 @@ import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.TableScanOperator; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; +import org.opensearch.sql.storage.write.TableWriteOperator; @ExtendWith(MockitoExtension.class) class DefaultImplementorTest { @@ -212,4 +214,17 @@ public TableScanOperator build() { }; assertEquals(tableScanOperator, tableScanBuilder.accept(implementor, null)); } + + @Test + public void visitTableWriteBuilderShouldBuildTableWriteOperator() { + LogicalPlan child = values(); + TableWriteOperator tableWriteOperator = Mockito.mock(TableWriteOperator.class); + TableWriteBuilder logicalPlan = new TableWriteBuilder(child) { + @Override + public TableWriteOperator build(PhysicalPlan child) { + return tableWriteOperator; + } + }; + assertEquals(tableWriteOperator, logicalPlan.accept(implementor, null)); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index 33c6b02c38..341bcbc29e 100644 --- a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -12,6 +12,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -31,9 +32,12 @@ import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.aggregation.Aggregator; import org.opensearch.sql.expression.window.WindowDefinition; +import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.TableScanOperator; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; +import org.opensearch.sql.storage.write.TableWriteOperator; /** * Todo. Temporary added for UT coverage, Will be removed. @@ -83,6 +87,19 @@ public TableScanOperator build() { assertNull(tableScanBuilder.accept(new LogicalPlanNodeVisitor() { }, null)); + LogicalPlan write = LogicalPlanDSL.write(null, table, Collections.emptyList()); + assertNull(write.accept(new LogicalPlanNodeVisitor() { + }, null)); + + TableWriteBuilder tableWriteBuilder = new TableWriteBuilder(null) { + @Override + public TableWriteOperator build(PhysicalPlan child) { + return null; + } + }; + assertNull(tableWriteBuilder.accept(new LogicalPlanNodeVisitor() { + }, null)); + LogicalPlan filter = LogicalPlanDSL.filter(relation, expression); assertNull(filter.accept(new LogicalPlanNodeVisitor() { }, null)); diff --git a/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java b/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java index e2510ec464..7516aa1809 100644 --- a/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java @@ -7,6 +7,7 @@ package org.opensearch.sql.planner.optimizer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import static org.opensearch.sql.data.model.ExprValueUtils.integerValue; @@ -20,6 +21,8 @@ import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.relation; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.sort; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.values; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.write; import com.google.common.collect.ImmutableList; import java.util.Collections; @@ -39,6 +42,7 @@ import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; @ExtendWith(MockitoExtension.class) class LogicalPlanOptimizerTest { @@ -270,6 +274,37 @@ public PhysicalPlan implement(LogicalPlan plan) { ); } + @Test + void table_support_write_builder_should_be_replaced() { + Mockito.reset(table, tableScanBuilder); + TableWriteBuilder writeBuilder = Mockito.mock(TableWriteBuilder.class); + when(table.createWriteBuilder(any())).thenReturn(writeBuilder); + + assertEquals( + writeBuilder, + optimize(write(values(), table, Collections.emptyList())) + ); + } + + @Test + void table_not_support_write_builder_should_report_error() { + Mockito.reset(table, tableScanBuilder); + Table table = new Table() { + @Override + public Map getFieldTypes() { + return null; + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return null; + } + }; + + assertThrows(UnsupportedOperationException.class, + () -> table.createWriteBuilder(null)); + } + private LogicalPlan optimize(LogicalPlan plan) { final LogicalPlanOptimizer optimizer = LogicalPlanOptimizer.create(); final LogicalPlan optimize = optimizer.optimize(plan); diff --git a/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java b/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java index 61d192362a..9f90fd8d05 100644 --- a/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java @@ -35,5 +35,6 @@ void source_is_empty() { void table_is_empty() { plan = Mockito.mock(LogicalFilter.class); assertFalse(Patterns.table().getFunction().apply(plan).isPresent()); + assertFalse(Patterns.writeTable().getFunction().apply(plan).isPresent()); } } diff --git a/core/src/test/java/org/opensearch/sql/storage/write/TableWriteOperatorTest.java b/core/src/test/java/org/opensearch/sql/storage/write/TableWriteOperatorTest.java new file mode 100644 index 0000000000..8780b08276 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/storage/write/TableWriteOperatorTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.write; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; + +@ExtendWith(MockitoExtension.class) +class TableWriteOperatorTest { + + @Mock + private PhysicalPlan child; + + private TableWriteOperator tableWrite; + + @BeforeEach + void setUp() { + tableWrite = new TableWriteOperator(child) { + @Override + public String explain() { + return "explain"; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public ExprValue next() { + return null; + } + }; + } + + @Test + void testAccept() { + Boolean isVisited = tableWrite.accept(new PhysicalPlanNodeVisitor<>() { + @Override + protected Boolean visitNode(PhysicalPlan node, Object context) { + return (node instanceof TableWriteOperator); + } + + @Override + public Boolean visitTableWrite(TableWriteOperator node, Object context) { + return super.visitTableWrite(node, context); + } + }, null); + + assertTrue(isVisited); + } + + @Test + void testGetChild() { + assertEquals(Collections.singletonList(child), tableWrite.getChild()); + } +} \ No newline at end of file