Skip to content

Commit

Permalink
PPL lookup prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
salyh committed Jun 13, 2024
1 parent 00d5c4e commit 998363d
Show file tree
Hide file tree
Showing 24 changed files with 775 additions and 6 deletions.
99 changes: 99 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lukk;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand All @@ -66,6 +67,7 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -89,6 +91,7 @@
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalLukk;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -507,6 +510,102 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) {
consecutive);
}

/** Build {@link LogicalLukk}. */
@Override
public LogicalPlan visitLukk(Lukk node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
List<Argument> options = node.getOptions();
// Todo, refactor the option.
Boolean appendOnly = (Boolean) options.get(0).getValue().getValue();

Table table =
dataSourceService
.getDataSource(DEFAULT_DATASOURCE_NAME)
.getStorageEngine()
.getTable(null, node.getIndexName());

if (table == null || !table.exists()) {
throw new SemanticCheckException(
String.format("no such lookup index %s", node.getIndexName()));
}

return new LogicalLukk(
child,
node.getIndexName(),
analyzeLukkMatchFields(node.getMatchFieldList(), context),
appendOnly,
analyzeLukkCopyFields(node.getCopyFieldList(), context, table));
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLukkMatchFields(
List<Map> inputMap, AnalysisContext context) {
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), context);
if (resultMap.getTarget() instanceof Field) {
ReferenceExpression target =
new ReferenceExpression(
((Field) resultMap.getTarget()).getField().toString(), origin.type());
ReferenceExpression originExpr = DSL.ref(origin.toString(), origin.type());
TypeEnvironment curEnv = context.peek();
curEnv.remove(originExpr);
curEnv.define(target);
copyMapBuilder.put(originExpr, target);
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLukkCopyFields(
List<Map> inputMap, AnalysisContext context, Table table) {

TypeEnvironment curEnv = context.peek();
java.util.Map<String, ExprType> fieldTypes = table.getFieldTypes();

if (inputMap.isEmpty()) {
fieldTypes.forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
return ImmutableMap.<ReferenceExpression, ReferenceExpression>builder().build();
}

ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
if (resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field) {
String fieldName = ((Field) resultMap.getOrigin()).getField().toString();
ExprType ex = fieldTypes.get(fieldName);

if (ex == null) {
throw new SemanticCheckException(String.format("no such field %s", fieldName));
}

ReferenceExpression origin = new ReferenceExpression(fieldName, ex);

if (resultMap.getTarget().equals(resultMap.getOrigin())) {

curEnv.define(origin);
copyMapBuilder.put(origin, origin);
} else {
ReferenceExpression target =
new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex);
curEnv.define(target);
copyMapBuilder.put(origin, target);
}
} else {
throw new SemanticCheckException(
String.format(
"the origin and target expected to be field, but is %s/%s",
resultMap.getOrigin(), resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

/** Logical head is identical to {@link LogicalLimit}. */
public LogicalPlan visitHead(Head node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lukk;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand Down Expand Up @@ -217,6 +218,10 @@ public T visitDedupe(Dedupe node, C context) {
return visitChildren(node, context);
}

public T visitLukk(Lukk node, C context) {
return visitChildren(node, context);
}

public T visitHead(Head node, C context) {
return visitChildren(node, context);
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ast.dsl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lukk;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -441,6 +443,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List<Argument> options, Field.
return new Dedupe(input, options, Arrays.asList(fields));
}

public static Lukk lukk(
UnresolvedPlan input,
String indexName,
List<Map> matchFieldList,
List<Argument> options,
List<Map> copyFieldList) {
return new Lukk(input, indexName, matchFieldList, options, copyFieldList);
}

public static List<Map> fieldMap(String field, String asField, String... more) {
assert more == null || more.length % 2 == 0;
List list = new ArrayList();
list.add(map(field, asField));
for (int i = 0; i < more.length; i = i + 2) {
list.add(map(more[i], more[i + 1]));
}
return list;
}

public static Head head(UnresolvedPlan input, Integer size, Integer from) {
return new Head(input, size, from);
}
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Lukk.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Map;

/** AST node represent Lukk operation. */
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
public class Lukk extends UnresolvedPlan {
private UnresolvedPlan child;
private final String indexName;
private final List<Map> matchFieldList;
private final List<Argument> options;
private final List<Map> copyFieldList;

@Override
public Lukk attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitLukk(this, context);
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
import org.opensearch.sql.planner.physical.LimitOperator;
import org.opensearch.sql.planner.physical.LukkOperator;
import org.opensearch.sql.planner.physical.NestedOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
Expand Down Expand Up @@ -157,6 +158,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) {
"consecutive", node.getConsecutive())));
}

@Override
public ExplainResponseNode visitLukk(LukkOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"copyfields", node.getCopyFieldMap(),
"matchfields", node.getMatchFieldMap(),
"indexname", node.getIndexName(),
"appendonly", node.getAppendOnly())));
}

@Override
public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) {
return explain(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalLukk;
import org.opensearch.sql.planner.logical.LogicalNested;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
Expand All @@ -31,6 +32,7 @@
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
import org.opensearch.sql.planner.physical.LimitOperator;
import org.opensearch.sql.planner.physical.LukkOperator;
import org.opensearch.sql.planner.physical.NestedOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
Expand Down Expand Up @@ -74,6 +76,19 @@ public PhysicalPlan visitDedupe(LogicalDedupe node, C context) {
node.getConsecutive());
}

@Override
public PhysicalPlan visitLukk(LogicalLukk node, C context) {
return new LukkOperator(
visitChild(node, context),
node.getIndexName(),
node.getMatchFieldMap(),
node.getAppendOnly(),
node.getCopyFieldMap(),
(a, b) -> {
throw new RuntimeException("not implemented by DefaultImplementor");
});
}

@Override
public PhysicalPlan visitProject(LogicalProject node, C context) {
return new ProjectOperator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Arrays;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.expression.ReferenceExpression;

/** Logical Dedupe Plan. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalLukk extends LogicalPlan {

private final String indexName;
private final Map<ReferenceExpression, ReferenceExpression> matchFieldMap;
private final Map<ReferenceExpression, ReferenceExpression> copyFieldMap;
private final Boolean appendOnly;

/** Constructor of LogicalDedupe. */
public LogicalLukk(
LogicalPlan child,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFieldMap) {
super(Arrays.asList(child));
this.indexName = indexName;
this.copyFieldMap = copyFieldMap;
this.matchFieldMap = matchFieldMap;
this.appendOnly = appendOnly;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitLukk(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) {
return visitNode(plan, context);
}

public R visitLukk(LogicalLukk plan, C context) {
return visitNode(plan, context);
}

public R visitRename(LogicalRename plan, C context) {
return visitNode(plan, context);
}
Expand Down
Loading

0 comments on commit 998363d

Please sign in to comment.