diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index 4a5276418d..4a6d4d8222 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -35,6 +35,7 @@ import org.opensearch.sql.planner.physical.ValuesOperator; import org.opensearch.sql.planner.physical.WindowOperator; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; /** * Default implementor for implementing logical to physical translation. "Default" here means all @@ -129,6 +130,11 @@ public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) { return plan.build(); } + @Override + public PhysicalPlan visitTableWriteBuilder(TableWriteBuilder plan, C context) { + return plan.build(visitChild(plan, context)); + } + @Override public PhysicalPlan visitRelation(LogicalRelation node, C context) { throw new UnsupportedOperationException("Storage engine is responsible for " diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java index b18e099afa..842f46fb0b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java @@ -32,6 +32,10 @@ @UtilityClass public class LogicalPlanDSL { + public static LogicalPlan write(LogicalPlan input, Table table) { + return new LogicalWrite(input, table); + } + public static LogicalPlan aggregation( LogicalPlan input, List aggregatorList, List groupByList) { return new LogicalAggregation(input, aggregatorList, groupByList); diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java index f7330b5c3d..9a41072fe7 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -7,7 +7,7 @@ package org.opensearch.sql.planner.logical; import org.opensearch.sql.storage.read.TableScanBuilder; -import org.opensearch.sql.storage.TableWriteBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; /** * The visitor of {@link LogicalPlan}. @@ -29,6 +29,10 @@ public R visitTableScanBuilder(TableScanBuilder plan, C context) { return visitNode(plan, context); } + public R visitWrite(LogicalWrite plan, C context) { + return visitNode(plan, context); + } + public R visitTableWriteBuilder(TableWriteBuilder plan, C context) { return visitNode(plan, context); } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalWrite.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalWrite.java new file mode 100644 index 0000000000..5993b2b4ec --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalWrite.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.logical; + +import java.util.Collections; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.storage.Table; + +/** + * Logical operator for insert statement. + */ +@EqualsAndHashCode(callSuper = true) +@ToString +public class LogicalWrite extends LogicalPlan { + + /** Table that handles the write operation. */ + @Getter + private final Table table; + + public LogicalWrite(LogicalPlan child, Table table) { + super(Collections.singletonList(child)); + this.table = table; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitWrite(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java index f241e76993..70847b869b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java @@ -17,6 +17,7 @@ import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort; import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder; import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown; +import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder; /** * {@link LogicalPlan} Optimizer. @@ -55,7 +56,8 @@ public static LogicalPlanOptimizer create() { TableScanPushDown.PUSH_DOWN_SORT, TableScanPushDown.PUSH_DOWN_LIMIT, TableScanPushDown.PUSH_DOWN_HIGHLIGHT, - TableScanPushDown.PUSH_DOWN_PROJECT)); + TableScanPushDown.PUSH_DOWN_PROJECT, + new CreateTableWriteBuilder())); } /** diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java index 0ba478594a..b267658013 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java @@ -20,6 +20,7 @@ import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; import org.opensearch.sql.planner.logical.LogicalSort; +import org.opensearch.sql.planner.logical.LogicalWrite; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; @@ -110,4 +111,14 @@ public static Property table() { ? Optional.of(((LogicalRelation) plan).getTable()) : Optional.empty()); } + + /** + * Logical relation with table field. + */ + public static Property writeTable() { + return Property.optionalProperty("table", + plan -> plan instanceof LogicalWrite + ? Optional.of(((LogicalWrite) plan).getTable()) + : Optional.empty()); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/write/CreateTableWriteBuilder.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/write/CreateTableWriteBuilder.java new file mode 100644 index 0000000000..4fbf676862 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/write/CreateTableWriteBuilder.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.optimizer.rule.write; + +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.writeTable; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import lombok.Getter; +import lombok.experimental.Accessors; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalWrite; +import org.opensearch.sql.planner.optimizer.Rule; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.write.TableWriteBuilder; + +/** + * Rule that replaces logical write operator with {@link TableWriteBuilder} for later optimization + * and transforming to physical operator. + */ +public class CreateTableWriteBuilder implements Rule { + + /** Capture the table inside matched logical relation operator. */ + private final Capture capture; + + /** Pattern that matches logical relation operator. */ + @Accessors(fluent = true) + @Getter + private final Pattern pattern; + + /** + * Construct create table write builder rule. + */ + public CreateTableWriteBuilder() { + this.capture = Capture.newCapture(); + this.pattern = Pattern.typeOf(LogicalWrite.class) + .with(writeTable().capturedAs(capture)); + } + + @Override + public LogicalPlan apply(LogicalWrite plan, Captures captures) { + return captures.get(capture).createWriteBuilder(plan); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index a5a578b1d4..d4bc4a1ea9 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -7,7 +7,7 @@ package org.opensearch.sql.planner.physical; import org.opensearch.sql.storage.TableScanOperator; -import org.opensearch.sql.storage.TableWriteOperator; +import org.opensearch.sql.storage.write.TableWriteOperator; /** * The visitor of {@link PhysicalPlan}. diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index 8196071a68..496281fa8d 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -10,8 +10,10 @@ import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.executor.streaming.StreamingSource; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalWrite; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; /** * Table. @@ -76,9 +78,10 @@ default TableScanBuilder createScanBuilder() { /* * Create table write builder for logical to physical transformation. * + * @param plan logical write plan * @return table write builder */ - default TableWriteBuilder createWriteBuilder() { + default TableWriteBuilder createWriteBuilder(LogicalWrite plan) { throw new UnsupportedOperationException( "Write operation is not supported on current table"); } diff --git a/core/src/main/java/org/opensearch/sql/storage/TableWriteBuilder.java b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteBuilder.java similarity index 74% rename from core/src/main/java/org/opensearch/sql/storage/TableWriteBuilder.java rename to core/src/main/java/org/opensearch/sql/storage/write/TableWriteBuilder.java index c174b00377..54dfa5d557 100644 --- a/core/src/main/java/org/opensearch/sql/storage/TableWriteBuilder.java +++ b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteBuilder.java @@ -3,17 +3,18 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.storage; +package org.opensearch.sql.storage.write; import java.util.Collections; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; +import org.opensearch.sql.planner.physical.PhysicalPlan; /** * A {@link TableWriteBuilder} represents transition state between logical planning and physical * planning for table write operator. The concrete implementation class gets involved in the logical - * optimization through this abstraction and thus transform to specific {@link TableWriteOperator} in - * a certain data source. + * optimization through this abstraction and thus transform to specific {@link TableWriteOperator} + * in a certain storage engine. */ public abstract class TableWriteBuilder extends LogicalPlan { @@ -25,11 +26,12 @@ public TableWriteBuilder(LogicalPlan child) { } /** - * Build table write operator. + * Build table write operator with given child node. * + * @param child child operator node * @return table write operator */ - public abstract TableWriteOperator build(); + public abstract TableWriteOperator build(PhysicalPlan child); @Override public R accept(LogicalPlanNodeVisitor visitor, C context) { diff --git a/core/src/main/java/org/opensearch/sql/storage/TableWriteOperator.java b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteOperator.java similarity index 96% rename from core/src/main/java/org/opensearch/sql/storage/TableWriteOperator.java rename to core/src/main/java/org/opensearch/sql/storage/write/TableWriteOperator.java index 5f922c51b4..007b725677 100644 --- a/core/src/main/java/org/opensearch/sql/storage/TableWriteOperator.java +++ b/core/src/main/java/org/opensearch/sql/storage/write/TableWriteOperator.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.storage; +package org.opensearch.sql.storage.write; import java.util.Collections; import java.util.List; diff --git a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java index 2322e4684e..017cfb60ea 100644 --- a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java @@ -58,6 +58,8 @@ import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.TableScanOperator; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; +import org.opensearch.sql.storage.write.TableWriteOperator; @ExtendWith(MockitoExtension.class) class DefaultImplementorTest { @@ -212,4 +214,17 @@ public TableScanOperator build() { }; assertEquals(tableScanOperator, tableScanBuilder.accept(implementor, null)); } + + @Test + public void visitTableWriteBuilderShouldBuildTableWriteOperator() { + LogicalPlan child = values(); + TableWriteOperator tableWriteOperator = Mockito.mock(TableWriteOperator.class); + TableWriteBuilder logicalPlan = new TableWriteBuilder(child) { + @Override + public TableWriteOperator build(PhysicalPlan child) { + return tableWriteOperator; + } + }; + assertEquals(tableWriteOperator, logicalPlan.accept(implementor, null)); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index 33c6b02c38..ccb951037a 100644 --- a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -31,9 +31,12 @@ import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.aggregation.Aggregator; import org.opensearch.sql.expression.window.WindowDefinition; +import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.TableScanOperator; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; +import org.opensearch.sql.storage.write.TableWriteOperator; /** * Todo. Temporary added for UT coverage, Will be removed. @@ -83,6 +86,19 @@ public TableScanOperator build() { assertNull(tableScanBuilder.accept(new LogicalPlanNodeVisitor() { }, null)); + LogicalPlan write = LogicalPlanDSL.write(null, table); + assertNull(write.accept(new LogicalPlanNodeVisitor() { + }, null)); + + TableWriteBuilder tableWriteBuilder = new TableWriteBuilder(null) { + @Override + public TableWriteOperator build(PhysicalPlan child) { + return null; + } + }; + assertNull(tableWriteBuilder.accept(new LogicalPlanNodeVisitor() { + }, null)); + LogicalPlan filter = LogicalPlanDSL.filter(relation, expression); assertNull(filter.accept(new LogicalPlanNodeVisitor() { }, null)); diff --git a/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java b/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java index e2510ec464..137a830afd 100644 --- a/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java @@ -7,6 +7,7 @@ package org.opensearch.sql.planner.optimizer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import static org.opensearch.sql.data.model.ExprValueUtils.integerValue; @@ -20,6 +21,8 @@ import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.relation; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.sort; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.values; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.write; import com.google.common.collect.ImmutableList; import java.util.Collections; @@ -39,6 +42,7 @@ import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.sql.storage.write.TableWriteBuilder; @ExtendWith(MockitoExtension.class) class LogicalPlanOptimizerTest { @@ -270,6 +274,37 @@ public PhysicalPlan implement(LogicalPlan plan) { ); } + @Test + void table_support_write_builder_should_be_replaced() { + Mockito.reset(table, tableScanBuilder); + TableWriteBuilder writeBuilder = Mockito.mock(TableWriteBuilder.class); + when(table.createWriteBuilder(any())).thenReturn(writeBuilder); + + assertEquals( + writeBuilder, + optimize(write(values(), table)) + ); + } + + @Test + void table_not_support_write_builder_should_report_error() { + Mockito.reset(table, tableScanBuilder); + Table table = new Table() { + @Override + public Map getFieldTypes() { + return null; + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return null; + } + }; + + assertThrows(UnsupportedOperationException.class, + () -> table.createWriteBuilder(null)); + } + private LogicalPlan optimize(LogicalPlan plan) { final LogicalPlanOptimizer optimizer = LogicalPlanOptimizer.create(); final LogicalPlan optimize = optimizer.optimize(plan); diff --git a/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java b/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java index 61d192362a..9f90fd8d05 100644 --- a/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/optimizer/pattern/PatternsTest.java @@ -35,5 +35,6 @@ void source_is_empty() { void table_is_empty() { plan = Mockito.mock(LogicalFilter.class); assertFalse(Patterns.table().getFunction().apply(plan).isPresent()); + assertFalse(Patterns.writeTable().getFunction().apply(plan).isPresent()); } } diff --git a/core/src/test/java/org/opensearch/sql/storage/write/TableWriteOperatorTest.java b/core/src/test/java/org/opensearch/sql/storage/write/TableWriteOperatorTest.java new file mode 100644 index 0000000000..8780b08276 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/storage/write/TableWriteOperatorTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.write; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; + +@ExtendWith(MockitoExtension.class) +class TableWriteOperatorTest { + + @Mock + private PhysicalPlan child; + + private TableWriteOperator tableWrite; + + @BeforeEach + void setUp() { + tableWrite = new TableWriteOperator(child) { + @Override + public String explain() { + return "explain"; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public ExprValue next() { + return null; + } + }; + } + + @Test + void testAccept() { + Boolean isVisited = tableWrite.accept(new PhysicalPlanNodeVisitor<>() { + @Override + protected Boolean visitNode(PhysicalPlan node, Object context) { + return (node instanceof TableWriteOperator); + } + + @Override + public Boolean visitTableWrite(TableWriteOperator node, Object context) { + return super.visitTableWrite(node, context); + } + }, null); + + assertTrue(isVisited); + } + + @Test + void testGetChild() { + assertEquals(Collections.singletonList(child), tableWrite.getChild()); + } +} \ No newline at end of file