From 348a2b722a4a6841831452d35923177c375e3ac0 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 14 May 2024 14:15:43 +0800 Subject: [PATCH] fix(interactive): support a general `Intersect` implementation in GIE Runtime (#3689) ## What do these changes do? As titled. This pr mainly includes: 1. Refactor the convert logic of `Intersect` in `GraphRelToProtoConverter` in Compiler 2. Refactor the parsing and processing logic of `Intersect` in assembly in Runtime 3. Support a more general `Intersect` that can preserve the edges during intersection, in Runtime 4. Add a micro benchmark framework, and testing for general_intersection v.s. optimized_intersection. The results shows that, on ldbc dataset (sf=1), to find matches for a triangle (a,b,c knows each other), general_intersection's time cost is about 1.7\times of optimized_intersection's, as the general_intersection further preserves all matched edges during the intersection computation. ## Related issue number Fixes #3685 #3745 --------- Co-authored-by: xiao.zl Co-authored-by: Longbin Lai --- flex/codegen/src/hqps_generator.h | 8 +- .../proto/GraphRelToProtoConverter.java | 59 +- .../suite/pattern/PatternQueryTest.java | 121 ++++ .../ir/runtime/GraphRelToProtoTest.java | 85 +++ .../test/resources/proto/intersect_test.json | 7 - .../resources/proto/intersect_test_2.json | 439 ++++++++++++ .../proto/partitioned_intersect_test.json | 277 ++++++++ .../proto/partitioned_intersect_test_2.json | 463 +++++++++++++ .../executor/ir/common/src/utils.rs | 22 + .../executor/ir/core/src/plan/physical.rs | 156 +---- .../ir/integrated/benches/bench_quries.rs | 213 ++++++ .../ir/integrated/benches/common/mod.rs | 326 +++++++++ .../ir/integrated/tests/expand_test.rs | 451 ++++++++++++- .../executor/ir/runtime/Cargo.toml | 1 + .../executor/ir/runtime/src/assembly.rs | 240 +++++-- .../executor/ir/runtime/src/process/entry.rs | 41 +- .../src/process/operator/flatmap/unfold.rs | 41 +- .../process/operator/map/expand_intersect.rs | 635 +++++++++++++++++- .../runtime/src/process/operator/map/mod.rs | 2 +- .../runtime/src/process/operator/sink/sink.rs | 44 +- 20 files changed, 3315 insertions(+), 316 deletions(-) create mode 100644 interactive_engine/compiler/src/test/resources/proto/intersect_test_2.json create mode 100644 interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test.json create mode 100644 interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test_2.json create mode 100644 interactive_engine/executor/ir/integrated/benches/bench_quries.rs create mode 100644 interactive_engine/executor/ir/integrated/benches/common/mod.rs diff --git a/flex/codegen/src/hqps_generator.h b/flex/codegen/src/hqps_generator.h index ab4d409dae2c..c11ada82b5b8 100644 --- a/flex/codegen/src/hqps_generator.h +++ b/flex/codegen/src/hqps_generator.h @@ -518,18 +518,12 @@ class QueryGenerator { case physical::PhysicalOpr::Operator::kIntersect: { LOG(INFO) << "Found a intersect operator"; - // a intersect op must be followed by a unfold op - CHECK(i + 1 < size) << " intersect op must be followed by a unfold op"; - auto& next_op = plan_.plan(i + 1).opr(); - CHECK(next_op.op_kind_case() == - physical::PhysicalOpr::Operator::kUnfold) - << "intersect op must be followed by a unfold op"; + // Note that intersect operator will not be followed by unfold anymore. auto& intersect_op = opr.intersect(); auto intersect_opt_code = BuildIntersectOp(ctx_, intersect_op); for (auto& line : intersect_opt_code) { ss << line << std::endl; } - i += 1; // skip unfold break; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java index 27f3bfccf724..b98667944f1f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java @@ -841,10 +841,6 @@ public RelNode visit(MultiJoin multiJoin) { GraphAlgebraPhysical.PhysicalOpr.newBuilder(); GraphAlgebraPhysical.Intersect.Builder intersectBuilder = GraphAlgebraPhysical.Intersect.newBuilder(); - GraphAlgebraPhysical.PhysicalOpr.Builder unfoldOprBuilder = - GraphAlgebraPhysical.PhysicalOpr.newBuilder(); - GraphAlgebraPhysical.Unfold.Builder unfoldBuilder = - GraphAlgebraPhysical.Unfold.newBuilder(); List conditions = RelOptUtil.conjunctions(multiJoin.getJoinFilter()); int intersectKey = -1; @@ -867,66 +863,19 @@ public RelNode visit(MultiJoin multiJoin) { Preconditions.checkArgument(intersectKey != -1, "intersect key should be set"); intersectBuilder.setKey(intersectKey); - // then, process operators in the intersect branches; - // currently, there are some cases: - // case 1: PhysicalExpand; - // case 2: PhysicalExpand + PhysicalGetV(filter); - // case 3: EdgeExpand + GetV; (not supported yet) - // case 4: PathExpand + GetV; - // TODO(bingqing): This should be refactored. Directly add these cases as subplans in the - // intersect. - // Currently, we process these cases in a consistent way with the previous ir-core - // implementation. - GraphPhysicalGetV auxiliaFilter = null; + // then, build subplans for intersect for (RelNode input : multiJoin.getInputs()) { GraphAlgebraPhysical.PhysicalPlan.Builder subPlanBuilder = GraphAlgebraPhysical.PhysicalPlan.newBuilder(); - // specifically, if it is PhysicalGetV(filter), we build an auxilia node after - // the intersect. - if (input instanceof GraphPhysicalGetV - && !ObjectUtils.isEmpty(((GraphPhysicalGetV) input).getFilters())) { - auxiliaFilter = (GraphPhysicalGetV) input; - auxiliaFilter - .getInput() - .accept( - new GraphRelToProtoConverter( - isColumnId, - graphConfig, - subPlanBuilder, - this.relToCommons, - depth + 1)); - } else if (input instanceof GraphLogicalGetV) { - throw new UnsupportedOperationException( - "Unsupported of LogicalEdgeEdge + LogicalGetV in Intersect yet"); - } else { - input.accept( - new GraphRelToProtoConverter( - isColumnId, - graphConfig, - subPlanBuilder, - this.relToCommons, - depth + 1)); - } + input.accept( + new GraphRelToProtoConverter( + isColumnId, graphConfig, subPlanBuilder, this.relToCommons, depth + 1)); intersectBuilder.addSubPlans(subPlanBuilder); } intersectOprBuilder.setOpr( GraphAlgebraPhysical.PhysicalOpr.Operator.newBuilder() .setIntersect(intersectBuilder)); physicalBuilder.addPlan(intersectOprBuilder.build()); - - // after intersect, we need to unfold the result. - unfoldBuilder.setTag(Utils.asAliasId(intersectKey)); - unfoldBuilder.setAlias(Utils.asAliasId(intersectKey)); - unfoldOprBuilder.setOpr( - GraphAlgebraPhysical.PhysicalOpr.Operator.newBuilder().setUnfold(unfoldBuilder)); - - physicalBuilder.addPlan(unfoldOprBuilder.build()); - - // if have filters, we need to add a auxilia node after intersect. - if (auxiliaFilter != null) { - addAuxilia(physicalBuilder, auxiliaFilter); - } - return multiJoin; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java index f4e5f1c5bab8..d681f6cd9263 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java @@ -36,8 +36,16 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest { public abstract Traversal get_pattern_4_test(); + public abstract Traversal get_pattern_4b_test(); + + public abstract Traversal get_pattern_4c_test(); + public abstract Traversal get_pattern_5_test(); + public abstract Traversal get_pattern_5b_test(); + + public abstract Traversal get_pattern_5c_test(); + public abstract Traversal get_pattern_6_test(); public abstract Traversal get_pattern_7_test(); @@ -46,12 +54,16 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest { public abstract Traversal get_pattern_9_test(); + public abstract Traversal get_pattern_9b_test(); + public abstract Traversal get_pattern_10_test(); public abstract Traversal get_pattern_11_test(); public abstract Traversal get_pattern_12_test(); + public abstract Traversal get_pattern_12b_test(); + public abstract Traversal get_pattern_13_test(); public abstract Traversal get_pattern_14_test(); @@ -94,6 +106,20 @@ public void run_pattern_4_test() { Assert.assertEquals(23286L, traversal.next().longValue()); } + @Test + public void run_pattern_4b_test() { + Traversal traversal = this.get_pattern_4b_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(23286L, traversal.next().longValue()); + } + + @Test + public void run_pattern_4c_test() { + Traversal traversal = this.get_pattern_4c_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(23286L, traversal.next().longValue()); + } + @Test public void run_pattern_5_test() { Traversal traversal = this.get_pattern_5_test(); @@ -101,6 +127,20 @@ public void run_pattern_5_test() { Assert.assertEquals(5596L, traversal.next().longValue()); } + @Test + public void run_pattern_5b_test() { + Traversal traversal = this.get_pattern_5b_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(5596L, traversal.next().longValue()); + } + + @Test + public void run_pattern_5c_test() { + Traversal traversal = this.get_pattern_5c_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(5596L, traversal.next().longValue()); + } + @Test public void run_pattern_6_test() { Traversal traversal = this.get_pattern_6_test(); @@ -129,6 +169,13 @@ public void run_pattern_9_test() { Assert.assertEquals(23286L, traversal.next().longValue()); } + @Test + public void run_pattern_9b_test() { + Traversal traversal = this.get_pattern_9b_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(23286L, traversal.next().longValue()); + } + @Test public void run_pattern_10_test() { Traversal traversal = this.get_pattern_10_test(); @@ -150,6 +197,13 @@ public void run_pattern_12_test() { Assert.assertEquals(232854L, traversal.next().longValue()); } + @Test + public void run_pattern_12b_test() { + Traversal traversal = this.get_pattern_12b_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(232854L, traversal.next().longValue()); + } + @Test public void run_pattern_13_test() { Traversal traversal = this.get_pattern_13_test(); @@ -255,6 +309,28 @@ public Traversal get_pattern_4_test() { .count(); } + // PM5 + @Override + public Traversal get_pattern_4b_test() { + return g.V().match( + __.as("a").outE("KNOWS").as("d").inV().as("b"), + __.as("b").out("KNOWS").as("c"), + __.as("a").out("KNOWS").as("c")) + .select("d") + .count(); + } + + // PM5 + @Override + public Traversal get_pattern_4c_test() { + return g.V().match( + __.as("a").outE("KNOWS").as("d").inV().as("b"), + __.as("b").outE("KNOWS").as("e").inV().as("c"), + __.as("a").outE("KNOWS").as("f").inV().as("c")) + .select("d", "e", "f") + .count(); + } + // PM7 @Override public Traversal get_pattern_5_test() { @@ -265,6 +341,28 @@ public Traversal get_pattern_5_test() { .count(); } + // PM7 + @Override + public Traversal get_pattern_5b_test() { + return g.V().match( + __.as("a").has("gender", "male").outE("KNOWS").as("d").inV().as("b"), + __.as("b").has("gender", "female").out("KNOWS").as("c"), + __.as("a").out("KNOWS").as("c")) + .select("d") + .count(); + } + + // PM7 + @Override + public Traversal get_pattern_5c_test() { + return g.V().match( + __.as("a").has("gender", "male").outE("KNOWS").as("d").inV().as("b"), + __.as("b").has("gender", "female").outE("KNOWS").as("e").inV().as("c"), + __.as("a").outE("KNOWS").as("f").inV().as("c")) + .select("d", "e", "f") + .count(); + } + // PM12 @Override public Traversal get_pattern_6_test() { @@ -317,6 +415,18 @@ public Traversal get_pattern_9_test() { .count(); } + // PM5-path + @Override + public Traversal get_pattern_9b_test() { + return g.V().match( + ((IrCustomizedTraversal) __.as("a").out("2..3", "KNOWS")) + .endV() + .as("c"), + __.as("a").outE("KNOWS").as("d").inV().as("c")) + .select("d") + .count(); + } + // PM11-path @Override public Traversal get_pattern_10_test() { @@ -351,6 +461,17 @@ public Traversal get_pattern_12_test() { .count(); } + // fuzzy pattern + @Override + public Traversal get_pattern_12b_test() { + return g.V().match( + __.as("a").outE("KNOWS", "LIKES").as("d").inV().as("b"), + __.as("b").outE("KNOWS", "LIKES").as("e").inV().as("c"), + __.as("a").outE("KNOWS", "LIKES").as("f").inV().as("c")) + .select("d", "e", "f") + .count(); + } + // support both @Override public Traversal get_pattern_13_test() { diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java index ca7bdf4e69a3..3fe7ce8e4e40 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java @@ -1123,6 +1123,75 @@ public void intersect_test() throws Exception { FileUtils.readJsonFromResource("proto/intersect_test.json"), plan.explain().trim()); } + + try (PhysicalBuilder protoBuilder = + new GraphRelProtoPhysicalBuilder( + getMockPartitionedCBOConfig(), getMockCBOMeta(), new LogicalPlan(after))) { + PhysicalPlan plan = protoBuilder.build(); + Assert.assertEquals( + FileUtils.readJsonFromResource("proto/partitioned_intersect_test.json"), + plan.explain().trim()); + } + } + + @Test + public void intersect_test_02() throws Exception { + GraphRelOptimizer optimizer = getMockCBO(); + IrMeta irMeta = getMockCBOMeta(); + GraphBuilder builder = Utils.mockGraphBuilder(optimizer, irMeta); + RelNode before = + com.alibaba.graphscope.cypher.antlr4.Utils.eval( + "Match (message:COMMENT|POST)-[e1:HASCREATOR]->(person:PERSON), \n" + + " (message:COMMENT|POST)-[e2:HASTAG]->(tag:TAG), \n" + + " (person:PERSON)-[e3:HASINTEREST]->(tag:TAG)\n" + + "Return count(person);", + builder) + .build(); + RelNode after = optimizer.optimize(before, new GraphIOProcessor(builder, irMeta)); + Assert.assertEquals( + "root:\n" + + "GraphLogicalAggregate(keys=[{variables=[], aliases=[]}]," + + " values=[[{operands=[person], aggFunction=COUNT, alias='$f0'," + + " distinct=false}]])\n" + + " MultiJoin(joinFilter=[=(tag, tag)], isFullOuterJoin=[false]," + + " joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]]," + + " projFields=[[ALL, ALL]])\n" + + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[TAG]}], alias=[tag]," + + " opt=[END])\n" + + " GraphLogicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)," + + " EdgeLabel(HASTAG, POST, TAG)]], alias=[e2], startAlias=[message]," + + " opt=[OUT])\n" + + " CommonTableScan(table=[[common#-676410541]])\n" + + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[TAG]}], alias=[tag]," + + " opt=[END])\n" + + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[HASINTEREST]}]," + + " alias=[e3], startAlias=[person], opt=[OUT])\n" + + " CommonTableScan(table=[[common#-676410541]])\n" + + "common#-676410541:\n" + + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[POST, COMMENT]}]," + + " alias=[message], opt=[START])\n" + + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[HASCREATOR]}]," + + " alias=[e1], startAlias=[person], opt=[IN])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[person], opt=[VERTEX])", + com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); + + try (PhysicalBuilder protoBuilder = + new GraphRelProtoPhysicalBuilder( + getMockCBOConfig(), getMockCBOMeta(), new LogicalPlan(after))) { + PhysicalPlan plan = protoBuilder.build(); + Assert.assertEquals( + FileUtils.readJsonFromResource("proto/intersect_test_2.json"), + plan.explain().trim()); + } + try (PhysicalBuilder protoBuilder = + new GraphRelProtoPhysicalBuilder( + getMockPartitionedCBOConfig(), getMockCBOMeta(), new LogicalPlan(after))) { + PhysicalPlan plan = protoBuilder.build(); + Assert.assertEquals( + FileUtils.readJsonFromResource("proto/partitioned_intersect_test_2.json"), + plan.explain().trim()); + } } private Configs getMockCBOConfig() { @@ -1139,6 +1208,22 @@ private Configs getMockCBOConfig() { "target/test-classes/statistics/ldbc30_hierarchy_statistics.txt")); } + private Configs getMockPartitionedCBOConfig() { + return new Configs( + ImmutableMap.of( + "graph.planner.is.on", + "true", + "graph.planner.opt", + "CBO", + "graph.planner.rules", + "FilterIntoJoinRule, FilterMatchRule, ExtendIntersectRule," + + " ExpandGetVFusionRule", + "graph.planner.cbo.glogue.schema", + "target/test-classes/statistics/ldbc30_hierarchy_statistics.txt", + "pegasus.hosts", + "host1,host2")); + } + private GraphRelOptimizer getMockCBO() { return new GraphRelOptimizer(getMockCBOConfig()); } diff --git a/interactive_engine/compiler/src/test/resources/proto/intersect_test.json b/interactive_engine/compiler/src/test/resources/proto/intersect_test.json index bc9d6e24d346..5341c37cc6a0 100644 --- a/interactive_engine/compiler/src/test/resources/proto/intersect_test.json +++ b/interactive_engine/compiler/src/test/resources/proto/intersect_test.json @@ -170,13 +170,6 @@ "key": 2 } } - }, { - "opr": { - "unfold": { - "tag": 2, - "alias": 2 - } - } }, { "opr": { "groupBy": { diff --git a/interactive_engine/compiler/src/test/resources/proto/intersect_test_2.json b/interactive_engine/compiler/src/test/resources/proto/intersect_test_2.json new file mode 100644 index 000000000000..1ca5b9f307de --- /dev/null +++ b/interactive_engine/compiler/src/test/resources/proto/intersect_test_2.json @@ -0,0 +1,439 @@ +{ + "plan": [{ + "opr": { + "scan": { + "alias": 2, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + } + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 2 + }] + }, { + "opr": { + "edge": { + "vTag": 2, + "direction": "IN", + "params": { + "tables": [{ + "id": 0 + }], + "sampleRatio": 1.0 + }, + "alias": 1, + "expandOpt": "EDGE" + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "srcLabel": 2, + "dstLabel": 1 + } + }, { + "label": { + "srcLabel": 3, + "dstLabel": 1 + } + }] + } + }, + "alias": 1 + }] + }, { + "opr": { + "vertex": { + "params": { + "tables": [{ + "id": 2 + }, { + "id": 3 + }], + "sampleRatio": 1.0 + }, + "alias": 0 + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 3 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "imageFile" + }, + "type": "STRING" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }, { + "propId": { + "name": "language" + }, + "type": "STRING" + }, { + "propId": { + "name": "content" + }, + "type": "STRING" + }, { + "propId": { + "name": "length" + }, + "type": "INT32" + }] + }, { + "label": { + "label": 2 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }, { + "propId": { + "name": "content" + }, + "type": "STRING" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "length" + }, + "type": "INT32" + }] + }] + } + } + }] + }, { + "opr": { + "intersect": { + "subPlans": [{ + "plan": [{ + "opr": { + "edge": { + "vTag": 0, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + }, + "alias": 3, + "expandOpt": "EDGE" + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "label": 1, + "srcLabel": 2, + "dstLabel": 7 + } + }, { + "label": { + "label": 1, + "srcLabel": 3, + "dstLabel": 7 + } + }] + } + }, + "alias": 3 + }] + }, { + "opr": { + "vertex": { + "opt": "END", + "params": { + "tables": [{ + "id": 7 + }], + "sampleRatio": 1.0 + }, + "alias": 4 + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 7 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "name" + }, + "type": "STRING" + }, { + "propId": { + "name": "url" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 4 + }] + }] + }, { + "plan": [{ + "opr": { + "edge": { + "vTag": 2, + "params": { + "tables": [{ + "id": 10 + }], + "sampleRatio": 1.0 + }, + "alias": 5, + "expandOpt": "EDGE" + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "label": 10, + "srcLabel": 1, + "dstLabel": 7 + } + }] + } + }, + "alias": 5 + }] + }, { + "opr": { + "vertex": { + "opt": "END", + "params": { + "tables": [{ + "id": 7 + }], + "sampleRatio": 1.0 + }, + "alias": 4 + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 7 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "name" + }, + "type": "STRING" + }, { + "propId": { + "name": "url" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 4 + }] + }] + }], + "key": 4 + } + } + }, { + "opr": { + "groupBy": { + "functions": [{ + "vars": [{ + "tag": { + "id": 2 + }, + "nodeType": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + } + }], + "aggregate": "COUNT", + "alias": 6 + }] + } + }, + "metaData": [{ + "type": { + "dataType": "INT64" + }, + "alias": 6 + }] + }, { + "opr": { + "sink": { + "sinkTarget": { + "sinkDefault": { + } + } + } + } + }] +} \ No newline at end of file diff --git a/interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test.json b/interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test.json new file mode 100644 index 000000000000..015570927444 --- /dev/null +++ b/interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test.json @@ -0,0 +1,277 @@ +{ + "plan": [{ + "opr": { + "scan": { + "alias": 1, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + } + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 1 + }] + }, { + "opr": { + "repartition": { + "toAnother": { + "shuffleKey": 1 + } + } + } + }, { + "opr": { + "edge": { + "vTag": 1, + "direction": "IN", + "params": { + "tables": [{ + "id": 0 + }], + "sampleRatio": 1.0 + }, + "alias": 0 + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "srcLabel": 2, + "dstLabel": 1 + } + }, { + "label": { + "srcLabel": 3, + "dstLabel": 1 + } + }] + } + }, + "alias": -1 + }] + }, { + "opr": { + "intersect": { + "subPlans": [{ + "plan": [{ + "opr": { + "repartition": { + "toAnother": { + "shuffleKey": 0 + } + } + } + }, { + "opr": { + "edge": { + "vTag": 0, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + }, + "alias": 2 + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "label": 1, + "srcLabel": 2, + "dstLabel": 7 + } + }, { + "label": { + "label": 1, + "srcLabel": 3, + "dstLabel": 7 + } + }] + } + }, + "alias": -1 + }] + }] + }, { + "plan": [{ + "opr": { + "repartition": { + "toAnother": { + "shuffleKey": 1 + } + } + } + }, { + "opr": { + "edge": { + "vTag": 1, + "params": { + "tables": [{ + "id": 10 + }], + "sampleRatio": 1.0 + }, + "alias": 2 + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "label": 10, + "srcLabel": 1, + "dstLabel": 7 + } + }] + } + }, + "alias": -1 + }] + }] + }], + "key": 2 + } + } + }, { + "opr": { + "groupBy": { + "functions": [{ + "vars": [{ + "tag": { + "id": 1 + }, + "nodeType": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + } + }], + "aggregate": "COUNT", + "alias": 3 + }] + } + }, + "metaData": [{ + "type": { + "dataType": "INT64" + }, + "alias": 3 + }] + }, { + "opr": { + "sink": { + "sinkTarget": { + "sinkDefault": { + } + } + } + } + }] +} \ No newline at end of file diff --git a/interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test_2.json b/interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test_2.json new file mode 100644 index 000000000000..50aa41100031 --- /dev/null +++ b/interactive_engine/compiler/src/test/resources/proto/partitioned_intersect_test_2.json @@ -0,0 +1,463 @@ +{ + "plan": [{ + "opr": { + "scan": { + "alias": 2, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + } + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 2 + }] + }, { + "opr": { + "repartition": { + "toAnother": { + "shuffleKey": 2 + } + } + } + }, { + "opr": { + "edge": { + "vTag": 2, + "direction": "IN", + "params": { + "tables": [{ + "id": 0 + }], + "sampleRatio": 1.0 + }, + "alias": 1, + "expandOpt": "EDGE" + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "srcLabel": 2, + "dstLabel": 1 + } + }, { + "label": { + "srcLabel": 3, + "dstLabel": 1 + } + }] + } + }, + "alias": 1 + }] + }, { + "opr": { + "vertex": { + "params": { + "tables": [{ + "id": 2 + }, { + "id": 3 + }], + "sampleRatio": 1.0 + }, + "alias": 0 + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 3 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "imageFile" + }, + "type": "STRING" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }, { + "propId": { + "name": "language" + }, + "type": "STRING" + }, { + "propId": { + "name": "content" + }, + "type": "STRING" + }, { + "propId": { + "name": "length" + }, + "type": "INT32" + }] + }, { + "label": { + "label": 2 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }, { + "propId": { + "name": "content" + }, + "type": "STRING" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "length" + }, + "type": "INT32" + }] + }] + } + } + }] + }, { + "opr": { + "intersect": { + "subPlans": [{ + "plan": [{ + "opr": { + "repartition": { + "toAnother": { + "shuffleKey": 0 + } + } + } + }, { + "opr": { + "edge": { + "vTag": 0, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + }, + "alias": 3, + "expandOpt": "EDGE" + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "label": 1, + "srcLabel": 2, + "dstLabel": 7 + } + }, { + "label": { + "label": 1, + "srcLabel": 3, + "dstLabel": 7 + } + }] + } + }, + "alias": 3 + }] + }, { + "opr": { + "vertex": { + "opt": "END", + "params": { + "tables": [{ + "id": 7 + }], + "sampleRatio": 1.0 + }, + "alias": 4 + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 7 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "name" + }, + "type": "STRING" + }, { + "propId": { + "name": "url" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 4 + }] + }] + }, { + "plan": [{ + "opr": { + "repartition": { + "toAnother": { + "shuffleKey": 2 + } + } + } + }, { + "opr": { + "edge": { + "vTag": 2, + "params": { + "tables": [{ + "id": 10 + }], + "sampleRatio": 1.0 + }, + "alias": 5, + "expandOpt": "EDGE" + } + }, + "metaData": [{ + "type": { + "graphType": { + "elementOpt": "EDGE", + "graphDataType": [{ + "label": { + "label": 10, + "srcLabel": 1, + "dstLabel": 7 + } + }] + } + }, + "alias": 5 + }] + }, { + "opr": { + "vertex": { + "opt": "END", + "params": { + "tables": [{ + "id": 7 + }], + "sampleRatio": 1.0 + }, + "alias": 4 + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 7 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "name" + }, + "type": "STRING" + }, { + "propId": { + "name": "url" + }, + "type": "STRING" + }] + }] + } + }, + "alias": 4 + }] + }] + }], + "key": 4 + } + } + }, { + "opr": { + "groupBy": { + "functions": [{ + "vars": [{ + "tag": { + "id": 2 + }, + "nodeType": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + } + }], + "aggregate": "COUNT", + "alias": 6 + }] + } + }, + "metaData": [{ + "type": { + "dataType": "INT64" + }, + "alias": 6 + }] + }, { + "opr": { + "sink": { + "sinkTarget": { + "sinkDefault": { + } + } + } + } + }] +} \ No newline at end of file diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index 798f05c74beb..6cf0f8ca4f0a 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -841,6 +841,13 @@ impl From for physical_pb::PhysicalOpr { } } +impl From for physical_pb::PhysicalOpr { + fn from(unfold: physical_pb::Unfold) -> Self { + let op_kind = physical_pb::physical_opr::operator::OpKind::Unfold(unfold); + op_kind.into() + } +} + impl From for physical_pb::Project { fn from(project: pb::Project) -> Self { let mappings = project @@ -1012,6 +1019,21 @@ impl common_pb::Logical { } } +impl physical_pb::PhysicalOpr { + pub fn is_repartition(&self) -> bool { + match self { + physical_pb::PhysicalOpr { + opr: + Some(physical_pb::physical_opr::Operator { + op_kind: Some(physical_pb::physical_opr::operator::OpKind::Repartition(_)), + }), + .. + } => true, + _ => false, + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/interactive_engine/executor/ir/core/src/plan/physical.rs b/interactive_engine/executor/ir/core/src/plan/physical.rs index e9dd0f68d042..2987179d2cb7 100644 --- a/interactive_engine/executor/ir/core/src/plan/physical.rs +++ b/interactive_engine/executor/ir/core/src/plan/physical.rs @@ -21,7 +21,6 @@ use std::convert::TryInto; -use ir_common::error::ParsePbError; use ir_common::expr_parse::str_to_expr_pb; use ir_common::generated::algebra as pb; use ir_common::generated::common as common_pb; @@ -31,7 +30,6 @@ use ir_common::KeyId; use ir_physical_client::physical_builder::PlanBuilder; use crate::error::{IrError, IrResult}; -use crate::glogue::combine_get_v_by_query_params; use crate::plan::logical::{LogicalPlan, NodeType}; use crate::plan::meta::PlanMeta; @@ -941,34 +939,17 @@ impl AsPhysical for LogicalPlan { } // Given a->b, we support intersecting their neighbors, e.g., Intersect{{a->c, b->c}, key=c} -// more cases as follows: -// 1. To intersect a->d->c and b->c with key=c, -// if so, translate into two operators, i.e., EdgeExpand{a->d} and Intersect{{d->c, b->c}, key=c} -// 2. To intersect a->c->d and b->c with key=c, -// if so, translate into two operators, i.e., Intersect{{a->c, b->c}, key=c} and Expand{c->d} -// 3. To intersect a->c, b->c with key=c, with filters -// we support expanding vertices with filters on edges (i.e., filters on a->c, b->c), e.g., Intersect{{a-filter->c, b-filter->c}, key=c}; -// if expanding vertices with filters on vertices (i.e., filters on c), translate into Intersect{{a->c, b->c}, key=c} + Select {filter on c} -// 4. To intersect a->...->d->c and b->c with key=c, where a->..->d->c is a path from a to c, -// if so, translate into PathExpand{a->d} + Intersect{d->c, b->c, key=c}. - -// Thus, after build intersect, the physical plan looks like: -// 1. the last ops in intersect's sub_plans are the ones to intersect; -// 2. the intersect op includes: -// 1) EdgeExpand with Opt = ExpandV, which is to expand and intersect on id-only vertices; (supported currently) -// 2) EdgeExpand with Opt = ExpandE, which is to expand and intersect on edges (although, not considered in Pattern yet); - +// Currently, subplans in Intersect could be like: +// 1. vec![ExpandE, GetV] for edge expand to intersect; +// 2. vec![PathExpand, GetV] for path expand to intersect fn add_intersect_job_builder( builder: &mut PlanBuilder, plan_meta: &mut PlanMeta, intersect_opr: &pb::Intersect, subplans: &Vec, ) -> IrResult<()> { - use pb::logical_plan::operator::Opr::*; - let intersect_tag = intersect_opr .key .as_ref() .ok_or_else(|| IrError::ParsePbError("Empty tag in `Intersect` opr".into()))?; - let mut auxilia: Option = None; let mut intersect_plans: Vec = vec![]; for subplan in subplans { // subplan would be like: @@ -987,116 +968,12 @@ fn add_intersect_job_builder( let last_opr = subplan.get_last_node().ok_or_else(|| { IrError::InvalidPattern("Last node Missing for Intersection's subplan".to_string()) })?; - if let Some(Vertex(get_v)) = last_opr.borrow().opr.opr.as_ref() { - let mut get_v = get_v.clone(); - if get_v.alias.is_none() || !get_v.alias.as_ref().unwrap().eq(intersect_tag) { - Err(IrError::InvalidPattern("Cannot intersect on different tags".to_string()))? - } - // If the first operator is PathExpand, pick its last expand out from the path - let mut edge_expand = if let Some(Edge(edge_expand)) = first_opr.borrow().opr.opr.as_ref() { - edge_expand.clone() - } else if let Some(Path(path_expand)) = first_opr.borrow().opr.opr.as_ref() { - // Process path_expand as follows: - // 1. If path_expand range from 0, it is unsupported; - // 2. If it is path_expand(1,2), optimized as edge_expand; - // 3. Otherwise, translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand, - // and the last edge_expand is the one to intersect. - // Notice that if we have predicates for vertices in path_expand, or for the last vertex of path_expand, - // do the filtering after intersection. - // TODO: there might be a bug here: - // if path_expand has an alias which indicates that the path would be referred later, it may not as expected. - let mut path_expand = path_expand.clone(); - let path_expand_base = path_expand.base.as_ref().ok_or_else(|| { - ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()) - })?; - let path_get_v_opt = path_expand_base.get_v.clone(); - let base_edge_expand = path_expand_base - .edge_expand - .as_ref() - .ok_or_else(|| { - ParsePbError::EmptyFieldError( - "PathExpand::base::edge_expand in Pattern".to_string(), - ) - })?; - // Ensure the base is ExpandV or ExpandE + GetV - if path_get_v_opt == None - && base_edge_expand.expand_opt == pb::edge_expand::ExpandOpt::Edge as i32 - { - Err(IrError::Unsupported( - "Edge Only PathExpand in Intersection's subplan has not been supported yet" - .to_string(), - ))? - } - // Combine the params for the last vertex in path. - // That is, it should satisfy both params in `GetV` in PathExpand's ExpandBase, - // and the params in `EndV` following PathExpand. - if let Some(path_get_v) = path_get_v_opt { - get_v = combine_get_v_by_query_params(get_v, path_get_v); - } - // pick the last edge expand out from the path expand - let mut last_edge_expand = base_edge_expand.clone(); - last_edge_expand.v_tag = None; - let hop_range = path_expand.hop_range.as_mut().ok_or_else(|| { - ParsePbError::EmptyFieldError("pb::PathExpand::hop_range".to_string()) - })?; - if hop_range.lower < 1 { - Err(IrError::Unsupported(format!( - "PathExpand in Intersection with lower range of {:?}", - hop_range.lower - )))? - } - if hop_range.lower == 1 && hop_range.upper == 2 { - // optimized Path(1..2) to as EdgeExpand - last_edge_expand.v_tag = path_expand.start_tag; - } else { - // translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand, - hop_range.lower -= 1; - hop_range.upper -= 1; - let mut end_v = pb::GetV::default(); - end_v.opt = pb::get_v::VOpt::End as i32; - // build the path expansion - path_expand.add_job_builder(builder, plan_meta)?; - end_v.add_job_builder(builder, plan_meta)?; - } - last_edge_expand - } else { - Err(IrError::InvalidPattern( - "First node of Intersection's subplan is neither EdgeExpand or PathExpand".to_string(), - ))? - }; - // build the edge expansion - // the opt should be vertex because now only intersection on vertex is supported - edge_expand.expand_opt = pb::edge_expand::ExpandOpt::Vertex as i32; - edge_expand.alias = get_v.alias.clone(); - edge_expand.add_job_builder(&mut sub_bldr, plan_meta)?; - // vertex parameter after the intersection - if let Some(params) = get_v.params.as_ref() { - // the case that we need to further process getV's filter. - if params.has_predicates() || params.has_columns() || params.has_labels() { - get_v.opt = 4; - auxilia = Some(get_v.clone()); - } - } - } + first_opr.add_job_builder(&mut sub_bldr, plan_meta)?; + last_opr.add_job_builder(&mut sub_bldr, plan_meta)?; intersect_plans.push(sub_bldr); } // intersect builder.intersect(intersect_plans, intersect_tag.clone()); - // unfold the intersection - let unfold = pb::Unfold { - tag: Some(intersect_tag.clone()), - alias: Some(intersect_tag.clone()), - meta_data: None, - }; - unfold.add_job_builder(builder, plan_meta)?; - // add vertex filters - if let Some(mut auxilia) = auxilia { - auxilia.tag = Some(intersect_tag.clone()); - if plan_meta.is_partition() { - builder.shuffle(Some(intersect_tag.clone())); - } - builder.get_v(auxilia); - } Ok(()) } @@ -2430,7 +2307,6 @@ mod test { .add_job_builder(&mut builder, &mut plan_meta) .unwrap(); - let unfold_opr = pb::Unfold { tag: Some(2.into()), alias: Some(2.into()), meta_data: None }; // extend 0->1 let fused_expand_ab_opr_vertex = pb::EdgeExpand { v_tag: Some(0.into()), @@ -2450,7 +2326,6 @@ mod test { sub_builder_1.edge_expand(expand_ac_opr_vertex.clone()); sub_builder_2.edge_expand(expand_bc_opr_vertex.clone()); expected_builder.intersect(vec![sub_builder_1, sub_builder_2], 2.into()); - expected_builder.unfold(unfold_opr); assert_eq!(builder, expected_builder); } @@ -2495,10 +2370,6 @@ mod test { is_optional: false, }; - let mut expand_ac_opr_vertex = expand_ac_opr_edge.clone(); - expand_ac_opr_vertex.expand_opt = pb::edge_expand::ExpandOpt::Vertex as i32; - expand_ac_opr_vertex.alias = Some(2.into()); - let expand_bc_opr_edge = pb::EdgeExpand { v_tag: Some(1.into()), direction: 0, @@ -2509,10 +2380,6 @@ mod test { is_optional: false, }; - let mut expand_bc_opr_vertex = expand_bc_opr_edge.clone(); - expand_bc_opr_vertex.expand_opt = pb::edge_expand::ExpandOpt::Vertex as i32; - expand_bc_opr_vertex.alias = Some(2.into()); - let get_c = pb::GetV { tag: None, opt: pb::get_v::VOpt::End as i32, @@ -2521,10 +2388,6 @@ mod test { meta_data: None, }; - let mut get_c_filter = get_c.clone(); - get_c_filter.tag = Some(2.into()); - get_c_filter.opt = 4; - // parents are expand_ac_opr and expand_bc_opr let intersect_opr = pb::Intersect { parents: vec![4, 6], key: Some(2.into()) }; @@ -2556,7 +2419,6 @@ mod test { .add_job_builder(&mut builder, &mut plan_meta) .unwrap(); - let unfold_opr = pb::Unfold { tag: Some(2.into()), alias: Some(2.into()), meta_data: None }; // extend 0->1 let fused_expand_ab_opr_vertex = pb::EdgeExpand { v_tag: Some(0.into()), @@ -2573,11 +2435,11 @@ mod test { let mut sub_builder_1 = PlanBuilder::default(); let mut sub_builder_2 = PlanBuilder::default(); - sub_builder_1.edge_expand(expand_ac_opr_vertex.clone()); - sub_builder_2.edge_expand(expand_bc_opr_vertex.clone()); + sub_builder_1.edge_expand(expand_ac_opr_edge.clone()); + sub_builder_1.get_v(get_c.clone()); + sub_builder_2.edge_expand(expand_bc_opr_edge.clone()); + sub_builder_2.get_v(get_c.clone()); expected_builder.intersect(vec![sub_builder_1, sub_builder_2], 2.into()); - expected_builder.unfold(unfold_opr); - expected_builder.get_v(get_c_filter); assert_eq!(builder, expected_builder); } diff --git a/interactive_engine/executor/ir/integrated/benches/bench_quries.rs b/interactive_engine/executor/ir/integrated/benches/bench_quries.rs new file mode 100644 index 000000000000..868c9308b10e --- /dev/null +++ b/interactive_engine/executor/ir/integrated/benches/bench_quries.rs @@ -0,0 +1,213 @@ +// +//! Copyright 2023 Alibaba Group Holding Limited. +//! +//! Licensed under the Apache License, Version 2.0 (the "License"); +//! you may not use this file except in compliance with the License. +//! You may obtain a copy of the License at +//! +//! http://www.apache.org/licenses/LICENSE-2.0 +//! +//! Unless required by applicable law or agreed to in writing, software +//! distributed under the License is distributed on an "AS IS" BASIS, +//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//! See the License for the specific language governing permissions and +//! limitations under the License. +//! + +#![feature(test)] + +mod common; + +extern crate test; + +use ir_common::generated::algebra as algebra_pb; +use ir_common::generated::common as common_pb; +use ir_common::KeyId; +use ir_physical_client::physical_builder::*; +use pegasus_server::JobRequest; +use test::Bencher; + +use crate::common::benchmark::{ + default_sink_target, incr_request_job_id, initialize, parse_result, query_params, submit_query, + KNOWS_LABEL, TAG_A, TAG_B, TAG_C, TAG_D, +}; + +// (A) -knows-> (B); (A) -knows-> (C); (B) <-knows (C) +fn init_intersect_job_request(edge_tag_1: Option, edge_tag_2: Option) -> JobRequest { + // (A) + let source_opr = algebra_pb::Scan { + scan_opt: 0, + alias: Some(TAG_A.into()), + params: None, + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; + + // (A) -> (B); + let expand_opr1 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 0, + alias: Some(TAG_B.into()), + meta_data: None, + is_optional: false, + }; + + let expand_opr2; + let mut get_v_opr1 = None; + + if let Some(edge_tag_1) = edge_tag_1 { + // a seperate expande + getv + // (A) -> (C); with edge tag edge_tag_1 + expand_opr2 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 1, // expand edge + alias: Some(edge_tag_1.into()), + meta_data: None, + is_optional: false, + }; + + get_v_opr1 = Some(algebra_pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_C.into()), + meta_data: None, + }); + } else { + // a fused expandv + expand_opr2 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 0, // expand vertex + alias: Some(TAG_C.into()), + meta_data: None, + is_optional: false, + }; + } + + let expand_opr3; + let mut get_v_opr2 = None; + + if let Some(edge_tag_2) = edge_tag_2 { + // a seperate expande + getv + // (B) <- (C); with edge tag edge_tag_2 + expand_opr3 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_B.into()), + direction: 1, // in + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 1, // expand edge + alias: Some(edge_tag_2.into()), + meta_data: None, + is_optional: false, + }; + + get_v_opr2 = Some(algebra_pb::GetV { + tag: None, + opt: 0, // StartV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_C.into()), + meta_data: None, + }); + } else { + // a fused expandv + expand_opr3 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_B.into()), + direction: 1, // in + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 0, // expand vertex + alias: Some(TAG_C.into()), + meta_data: None, + is_optional: false, + }; + } + + let mut sink_tags: Vec<_> = vec![TAG_A, TAG_B, TAG_C] + .into_iter() + .map(|i| common_pb::NameOrIdKey { key: Some(i.into()) }) + .collect(); + if let Some(edge_tag_1) = edge_tag_1 { + sink_tags.push(common_pb::NameOrIdKey { key: Some(edge_tag_1.into()) }); + } + if let Some(edge_tag_2) = edge_tag_2 { + sink_tags.push(common_pb::NameOrIdKey { key: Some(edge_tag_2.into()) }); + } + let sink_pb = algebra_pb::Sink { tags: sink_tags, sink_target: default_sink_target() }; + + let mut job_builder = JobBuilder::default(); + job_builder.add_scan_source(source_opr.clone()); + job_builder.shuffle(None); + job_builder.edge_expand(expand_opr1.clone()); + let mut plan_builder_1 = PlanBuilder::new(1); + plan_builder_1.shuffle(None); + plan_builder_1.edge_expand(expand_opr2); + if let Some(get_v_opr1) = get_v_opr1 { + plan_builder_1.get_v(get_v_opr1); + } + let mut plan_builder_2 = PlanBuilder::new(2); + plan_builder_2.shuffle(None); + plan_builder_2.edge_expand(expand_opr3.clone()); + if let Some(get_v_opr2) = get_v_opr2 { + plan_builder_2.get_v(get_v_opr2); + } + job_builder.intersect(vec![plan_builder_1, plan_builder_2], TAG_C.into()); + + job_builder.sink(sink_pb); + job_builder.build().unwrap() +} + +fn bench_request(b: &mut Bencher, pb_request: JobRequest, print_flag: bool) { + initialize(); + b.iter(|| { + let mut job_req = pb_request.clone(); + let job_id = incr_request_job_id(&mut job_req); + let mut results = submit_query(job_req, 32); + let mut res_count: i32 = 0; + while let Some(result) = results.next() { + match result { + Ok(res) => { + res_count += 1; + if print_flag && job_id == 1 { + if let Some(result) = parse_result(res) { + println!("result {:?}", result) + } + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + println!("result count: {:?}", res_count) + }) +} + +#[bench] +fn bench_intersect_01(b: &mut Bencher) { + // no edge tags, i.e., the optimized intersection + let request = init_intersect_job_request(None, None); + bench_request(b, request, true) +} + +#[bench] +fn bench_intersect_02(b: &mut Bencher) { + let request = init_intersect_job_request(Some(TAG_D), None); + bench_request(b, request, true) +} + +#[bench] +fn bench_intersect_03(b: &mut Bencher) { + let request = init_intersect_job_request(None, Some(TAG_D)); + bench_request(b, request, true) +} + +#[bench] +fn bench_intersect_04(b: &mut Bencher) { + let request = init_intersect_job_request(Some(TAG_D), Some(TAG_D)); + bench_request(b, request, true) +} diff --git a/interactive_engine/executor/ir/integrated/benches/common/mod.rs b/interactive_engine/executor/ir/integrated/benches/common/mod.rs new file mode 100644 index 000000000000..9b180a05ae5e --- /dev/null +++ b/interactive_engine/executor/ir/integrated/benches/common/mod.rs @@ -0,0 +1,326 @@ +// +//! Copyright 2023 Alibaba Group Holding Limited. +//! +//! Licensed under the Apache License, Version 2.0 (the "License"); +//! you may not use this file except in compliance with the License. +//! You may obtain a copy of the License at +//! +//! http://www.apache.org/licenses/LICENSE-2.0 +//! +//! Unless required by applicable law or agreed to in writing, software +//! distributed under the License is distributed on an "AS IS" BASIS, +//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//! See the License for the specific language governing permissions and +//! limitations under the License. +//! + +#[cfg(test)] +#[allow(dead_code)] +#[allow(unused_imports)] +pub mod benchmark { + + use std::collections::HashMap; + use std::convert::{TryFrom, TryInto}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Once}; + + use graph_proxy::apis::{register_graph, DynDetails, Edge, PegasusClusterInfo, Vertex, ID}; + use graph_proxy::{create_exp_store, SimplePartition}; + use ir_common::expr_parse::str_to_expr_pb; + use ir_common::generated::algebra as pb; + use ir_common::generated::common as common_pb; + use ir_common::generated::results as result_pb; + use ir_common::{KeyId, LabelId, NameOrId}; + use lazy_static::lazy_static; + use pegasus::result::{ResultSink, ResultStream}; + use pegasus::{run_opt, Configuration, JobConf, StartupError}; + use pegasus_server::job::{JobAssembly, JobDesc}; + use pegasus_server::rpc::RpcSink; + use pegasus_server::JobRequest; + use prost::Message; + use runtime::process::entry::DynEntry; + use runtime::process::record::Record; + use runtime::IRJobAssembly; + + pub const TAG_A: KeyId = 0; + pub const TAG_B: KeyId = 1; + pub const TAG_C: KeyId = 2; + pub const TAG_D: KeyId = 3; + pub const TAG_E: KeyId = 4; + pub const TAG_F: KeyId = 5; + pub const TAG_G: KeyId = 6; + pub const TAG_H: KeyId = 7; + + pub const PERSON_LABEL: LabelId = 1; + pub const KNOWS_LABEL: LabelId = 12; + + static INIT: Once = Once::new(); + pub static JOB_ID: AtomicUsize = AtomicUsize::new(0); + + lazy_static! { + static ref FACTORY: IRJobAssembly = initialize_job_assembly(); + } + + pub fn initialize() { + INIT.call_once(|| { + start_pegasus(); + }); + } + + fn start_pegasus() { + match pegasus::startup(Configuration::singleton()) { + Ok(_) => { + lazy_static::initialize(&FACTORY); + } + Err(err) => match err { + StartupError::AlreadyStarted(_) => {} + _ => panic!("start pegasus failed"), + }, + } + } + + fn initialize_job_assembly() -> IRJobAssembly { + let cluster_info = Arc::new(PegasusClusterInfo::default()); + let exp_store = create_exp_store(cluster_info.clone()); + register_graph(exp_store); + let partition_info = Arc::new(SimplePartition { num_servers: 1 }); + IRJobAssembly::with(partition_info, cluster_info) + } + + pub fn incr_request_job_id(job_req: &mut JobRequest) -> u64 { + let mut conf = job_req.conf.take().expect("no job_conf"); + conf.job_id = JOB_ID.fetch_add(1, Ordering::SeqCst) as u64; + let job_id = conf.job_id; + job_req.conf = Some(conf); + job_id + } + + pub fn submit_query(job_req: JobRequest, num_workers: u32) -> ResultStream> { + let mut conf = JobConf::default(); + conf.workers = num_workers; + let (tx, rx) = crossbeam_channel::unbounded(); + let sink = ResultSink::new(tx); + let cancel_hook = sink.get_cancel_hook().clone(); + let results = ResultStream::new(conf.job_id, cancel_hook, rx); + let service = &FACTORY; + let job = JobDesc { input: job_req.source, plan: job_req.plan, resource: job_req.resource }; + run_opt(conf, sink, move |worker| service.assemble(&job, worker)).expect("submit job failure;"); + results + } + + pub fn parse_result(result: Vec) -> Option { + let result: result_pb::Results = result_pb::Results::decode(result.as_slice()).unwrap(); + if let Some(result_pb::results::Inner::Record(record_pb)) = result.inner { + let mut record = Record::default(); + for column in record_pb.columns { + let tag: Option = if let Some(tag) = column.name_or_id { + match tag.item.unwrap() { + common_pb::name_or_id::Item::Name(name) => Some( + name.parse::() + .unwrap_or(KeyId::max_value()), + ), + common_pb::name_or_id::Item::Id(id) => Some(id), + } + } else { + None + }; + let entry = column.entry.unwrap(); + // append entry without moving head + if let Some(tag) = tag { + let columns = record.get_columns_mut(); + columns.insert(tag as usize, DynEntry::try_from(entry).unwrap()); + } else { + record.append(DynEntry::try_from(entry).unwrap(), None); + } + } + Some(record) + } else { + None + } + } + + pub fn query_params( + tables: Vec, columns: Vec, + predicate: Option, + ) -> pb::QueryParams { + pb::QueryParams { + tables, + columns, + is_all_columns: false, + limit: None, + predicate, + sample_ratio: 1.0, + extra: HashMap::new(), + } + } + + pub fn query_params_all_columns( + tables: Vec, columns: Vec, + predicate: Option, + ) -> pb::QueryParams { + pb::QueryParams { + tables, + columns, + is_all_columns: true, + limit: None, + predicate, + sample_ratio: 1.0, + extra: HashMap::new(), + } + } + + pub fn to_var_pb(tag: Option, key: Option) -> common_pb::Variable { + common_pb::Variable { + tag: tag.map(|t| t.into()), + property: key + .map(|k| common_pb::Property { item: Some(common_pb::property::Item::Key(k.into())) }), + node_type: None, + } + } + + pub fn to_expr_var_pb(tag: Option, key: Option) -> common_pb::Expression { + common_pb::Expression { + operators: vec![common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Var(to_var_pb(tag, key))), + }], + } + } + + pub fn to_expr_var_all_prop_pb(tag: Option) -> common_pb::Expression { + common_pb::Expression { + operators: vec![common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Var(common_pb::Variable { + tag: tag.map(|t| t.into()), + property: Some(common_pb::Property { + item: Some(common_pb::property::Item::All(common_pb::AllKey {})), + }), + node_type: None, + })), + }], + } + } + + pub fn to_expr_vars_pb( + tag_keys: Vec<(Option, Option)>, is_map: bool, + ) -> common_pb::Expression { + let vars = tag_keys + .into_iter() + .map(|(tag, key)| to_var_pb(tag, key)) + .collect(); + common_pb::Expression { + operators: vec![common_pb::ExprOpr { + node_type: None, + item: if is_map { + Some(common_pb::expr_opr::Item::VarMap(common_pb::VariableKeys { keys: vars })) + } else { + Some(common_pb::expr_opr::Item::Vars(common_pb::VariableKeys { keys: vars })) + }, + }], + } + } + + pub fn build_scan_with_predicate( + tables: Vec, predicate: String, alias: Option, + ) -> pb::Scan { + pb::Scan { + scan_opt: 0, + alias, + params: Some(query_params(tables, vec![], str_to_expr_pb(predicate).ok())), + idx_predicate: None, + meta_data: None, + is_count_only: false, + } + } + + pub fn build_scan(tables: Vec, alias: Option) -> pb::Scan { + pb::Scan { + scan_opt: 0, + alias, + params: Some(query_params(tables, vec![], None)), + idx_predicate: None, + meta_data: None, + is_count_only: false, + } + } + + pub fn build_expand_v( + direction: i32, edge_labels: Vec, alias: Option, + ) -> pb::EdgeExpand { + pb::EdgeExpand { + v_tag: None, + direction, + params: Some(query_params(edge_labels, vec![], None)), + alias, + expand_opt: 0, + meta_data: None, + is_optional: false, + } + } + + pub fn build_expand_v_from_tag( + tag: Option, direction: i32, edge_labels: Vec, + alias: Option, + ) -> pb::EdgeExpand { + pb::EdgeExpand { + v_tag: tag, + direction, + params: Some(query_params(edge_labels, vec![], None)), + alias, + expand_opt: 0, + meta_data: None, + is_optional: false, + } + } + + pub fn build_as(alias: i32) -> pb::Project { + pb::Project { + mappings: vec![pb::project::ExprAlias { + expr: str_to_expr_pb("@".to_string()).ok(), + alias: Some(alias.into()), + }], + is_append: true, + meta_data: vec![], + } + } + + pub fn build_order( + order_pair: Vec<(Option, Option, pb::order_by::ordering_pair::Order)>, + limit: Option, + ) -> pb::OrderBy { + let pairs = order_pair + .into_iter() + .map(|(tag, var, order)| { + let key = to_var_pb(tag, var); + pb::order_by::OrderingPair { key: Some(key), order: order as i32 } + }) + .collect(); + let limit = limit.map(|upper| pb::Range { lower: 0, upper }); + pb::OrderBy { pairs, limit } + } + + pub fn default_count_pb() -> pb::GroupBy { + pb::GroupBy { + mappings: vec![], + functions: vec![pb::group_by::AggFunc { + vars: vec![], + aggregate: 3, // count + alias: None, + }], + meta_data: vec![], + } + } + + pub fn default_sink_pb() -> pb::Sink { + pb::Sink { tags: vec![common_pb::NameOrIdKey { key: None }], sink_target: default_sink_target() } + } + + pub fn default_sink_target() -> Option { + Some(pb::sink::SinkTarget { + inner: Some(pb::sink::sink_target::Inner::SinkDefault(pb::SinkDefault { + id_name_mappings: vec![], + })), + }) + } +} diff --git a/interactive_engine/executor/ir/integrated/tests/expand_test.rs b/interactive_engine/executor/ir/integrated/tests/expand_test.rs index 332787e80e05..78730da90ff0 100644 --- a/interactive_engine/executor/ir/integrated/tests/expand_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/expand_test.rs @@ -27,7 +27,7 @@ mod test { use graph_store::ldbc::LDBCVertexParser; use graph_store::prelude::DefaultId; use ir_common::expr_parse::str_to_expr_pb; - use ir_common::generated::{algebra as algebra_pb, physical as pb}; + use ir_common::generated::{algebra as algebra_pb, common as common_pb, physical as pb}; use ir_common::KeyId; use ir_physical_client::physical_builder::{JobBuilder, PlanBuilder}; use pegasus::api::{Map, Sink}; @@ -1179,8 +1179,6 @@ mod test { meta_data: None, is_optional: false, }; - let unfold_opr = - algebra_pb::Unfold { tag: Some(TAG_C.into()), alias: Some(TAG_C.into()), meta_data: None }; let mut job_builder = JobBuilder::default(); job_builder.add_scan_source(source_opr.clone()); @@ -1194,7 +1192,6 @@ mod test { plan_builder_2.shuffle(None); plan_builder_2.edge_expand(expand_opr3.clone()); job_builder.intersect(vec![plan_builder_1, plan_builder_2], TAG_C.into()); - job_builder.unfold(unfold_opr.clone()); job_builder.sink(default_sink_pb()); job_builder.build().unwrap() } @@ -1286,4 +1283,450 @@ mod test { expected_result_ids.sort(); assert_eq!(result_collection, expected_result_ids); } + + // marko (A) -> lop (B); marko (A) <-> (C); lop (B) <- (C) + fn init_intersect_edges_job_request( + edge_tag_1: Option, edge_tag_2: Option, + ) -> JobRequest { + // marko (A) + let source_opr = algebra_pb::Scan { + scan_opt: 0, + alias: Some(TAG_A.into()), + params: None, + idx_predicate: Some(vec![1].into()), + is_count_only: false, + meta_data: None, + }; + + // marko (A) -> lop (B); + let expand_opr1 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 0, + alias: Some(TAG_B.into()), + meta_data: None, + is_optional: false, + }; + + let expand_opr2; + let mut get_v_opr1 = None; + + if let Some(edge_tag_1) = edge_tag_1 { + // a seperate expande + getv + // marko (A) -> (C); with edge tag edge_tag_1 + expand_opr2 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 1, // expand edge + alias: Some(edge_tag_1.into()), + meta_data: None, + is_optional: false, + }; + + get_v_opr1 = Some(algebra_pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_C.into()), + meta_data: None, + }); + } else { + // a fused expandv + expand_opr2 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 0, // expand vertex + alias: Some(TAG_C.into()), + meta_data: None, + is_optional: false, + }; + } + + let expand_opr3; + let mut get_v_opr2 = None; + + if let Some(edge_tag_2) = edge_tag_2 { + // a seperate expande + getv + // lop (B) <- (C); with edge tag edge_tag_2 + expand_opr3 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_B.into()), + direction: 1, // in + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 1, // expand edge + alias: Some(edge_tag_2.into()), + meta_data: None, + is_optional: false, + }; + + get_v_opr2 = Some(algebra_pb::GetV { + tag: None, + opt: 0, // StartV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_C.into()), + meta_data: None, + }); + } else { + // a fused expandv + expand_opr3 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_B.into()), + direction: 1, // in + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 0, // expand vertex + alias: Some(TAG_C.into()), + meta_data: None, + is_optional: false, + }; + } + + let mut sink_tags: Vec<_> = vec![TAG_A, TAG_B, TAG_C] + .into_iter() + .map(|i| common_pb::NameOrIdKey { key: Some(i.into()) }) + .collect(); + if let Some(edge_tag_1) = edge_tag_1 { + sink_tags.push(common_pb::NameOrIdKey { key: Some(edge_tag_1.into()) }); + } + if let Some(edge_tag_2) = edge_tag_2 { + sink_tags.push(common_pb::NameOrIdKey { key: Some(edge_tag_2.into()) }); + } + let sink_pb = algebra_pb::Sink { tags: sink_tags, sink_target: default_sink_target() }; + + let mut job_builder = JobBuilder::default(); + job_builder.add_scan_source(source_opr.clone()); + job_builder.shuffle(None); + job_builder.edge_expand(expand_opr1.clone()); + let mut plan_builder_1 = PlanBuilder::new(1); + plan_builder_1.shuffle(None); + plan_builder_1.edge_expand(expand_opr2); + if let Some(get_v_opr1) = get_v_opr1 { + plan_builder_1.get_v(get_v_opr1); + } + let mut plan_builder_2 = PlanBuilder::new(2); + plan_builder_2.shuffle(None); + plan_builder_2.edge_expand(expand_opr3.clone()); + if let Some(get_v_opr2) = get_v_opr2 { + plan_builder_2.get_v(get_v_opr2); + } + job_builder.intersect(vec![plan_builder_1, plan_builder_2], TAG_C.into()); + + job_builder.sink(sink_pb); + job_builder.build().unwrap() + } + + // marko (A) -> lop (B); marko (A) -> josh (C); lop (B) <- josh (C) + #[test] + fn general_expand_and_intersection_test_01() { + initialize(); + // no edge tags, i.e., the optimized intersection + let request = init_intersect_edges_job_request(None, None); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let mut expected_result_ids = vec![v4]; + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(Some(TAG_C)).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + result_collection.sort(); + expected_result_ids.sort(); + assert_eq!(result_collection, expected_result_ids); + } + + // marko (A) -> lop (B); marko (A) -> josh (C) with expanding edge tag (D); lop (B) <- josh (C) + // preserving edges in the intersection phase + #[test] + fn general_expand_and_intersection_test_02() { + initialize(); + // with edge tags, i.e., the general intersection + let request = init_intersect_edges_job_request(Some(TAG_D), None); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let mut result_edge_collection = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let expected_result_ids = vec![v4]; + let expected_preserved_edge = vec![(v1, v4)]; // marko -> josh + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(Some(TAG_C)).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + if let Some(edge) = record.get(Some(TAG_D)).unwrap().as_edge() { + result_edge_collection.push((edge.src_id as DefaultId, edge.dst_id as DefaultId)); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + assert_eq!(result_collection, expected_result_ids); + assert_eq!(result_edge_collection, expected_preserved_edge); + } + + // marko (A) -> lop (B); marko (A) -> josh (C); lop (B) <- josh (C) with expanding edge tag (E); + // preserving edges in the intersection phase + #[test] + fn general_expand_and_intersection_test_03() { + initialize(); + // with edge tags, i.e., the general intersection + let request = init_intersect_edges_job_request(None, Some(TAG_E)); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let mut result_edge_collection = vec![]; + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let expected_result_ids = vec![v4]; + let expected_preserved_edge = vec![(v4, v3)]; // josh -> lop + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(Some(TAG_C)).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + if let Some(edge) = record.get(Some(TAG_E)).unwrap().as_edge() { + result_edge_collection.push((edge.src_id as DefaultId, edge.dst_id as DefaultId)); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + assert_eq!(result_collection, expected_result_ids); + assert_eq!(result_edge_collection, expected_preserved_edge); + } + + // marko (A) -> lop (B); marko (A) -> josh (C) with expanding edge tag (D); lop (B) <- josh (C) with expanding edge tag (E); + // preserving edges in the intersection phase + #[test] + fn general_expand_and_intersection_test_04() { + initialize(); + // with edge tags, i.e., the general intersection + let request = init_intersect_edges_job_request(Some(TAG_D), Some(TAG_E)); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let mut result_edge_collection = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let expected_result_ids = vec![v4]; + let expected_preserved_edge = vec![((v1, v4), TAG_D), ((v4, v3), TAG_E)]; // marko -> josh, lop -> josh + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(Some(TAG_C)).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + if let Some(edge) = record.get(Some(TAG_D)).unwrap().as_edge() { + result_edge_collection + .push(((edge.src_id as DefaultId, edge.dst_id as DefaultId), TAG_D)); + } + if let Some(edge) = record.get(Some(TAG_E)).unwrap().as_edge() { + result_edge_collection + .push(((edge.src_id as DefaultId, edge.dst_id as DefaultId), TAG_E)); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + assert_eq!(result_collection, expected_result_ids); + assert_eq!(result_edge_collection, expected_preserved_edge); + } + + // marko (A) -> lop (B) with optional edge_tag; marko (A) -2..3-> (B); + fn init_intersect_path_edges_job_request(edge_tag: Option) -> JobRequest { + // marko (A) + let source_opr = algebra_pb::Scan { + scan_opt: 0, + alias: Some(TAG_A.into()), + params: None, + idx_predicate: Some(vec![1].into()), + is_count_only: false, + meta_data: None, + }; + + // marko (A) -> lop (B); + let expand_opr1; + let mut get_v_opr1 = None; + + if let Some(edge_tag) = edge_tag { + // a seperate expande + getv + // marko (A) -> (B); with edge tag edge_tag + expand_opr1 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 1, + alias: Some(edge_tag.into()), + meta_data: None, + is_optional: false, + }; + + get_v_opr1 = Some(algebra_pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_B.into()), + meta_data: None, + }); + } else { + // a fused expandv + expand_opr1 = algebra_pb::EdgeExpand { + v_tag: Some(TAG_A.into()), + direction: 0, // out + params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), + expand_opt: 0, // expand vertex + alias: Some(TAG_B.into()), + meta_data: None, + is_optional: false, + }; + } + + // marko (A) -2..3-> (B): path expand B; + let expand_opr2 = algebra_pb::EdgeExpand { + v_tag: None, + direction: 2, // both + params: Some(query_params(vec![], vec![], None)), + expand_opt: 0, + alias: None, + meta_data: None, + is_optional: false, + }; + let path_opr = algebra_pb::PathExpand { + alias: None, + base: Some(algebra_pb::path_expand::ExpandBase { + edge_expand: Some(expand_opr2.clone()), + get_v: None, + }), + start_tag: Some(TAG_A.into()), + hop_range: Some(algebra_pb::Range { lower: 2, upper: 3 }), + path_opt: 1, // simple + result_opt: 0, // endv + condition: None, + is_optional: false, + }; + + let endv = algebra_pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: Some(TAG_B.into()), + meta_data: None, + }; + + let mut sink_tags: Vec<_> = vec![TAG_A, TAG_B] + .into_iter() + .map(|i| common_pb::NameOrIdKey { key: Some(i.into()) }) + .collect(); + if let Some(edge_tag) = edge_tag { + sink_tags.push(common_pb::NameOrIdKey { key: Some(edge_tag.into()) }); + } + let sink_pb = algebra_pb::Sink { tags: sink_tags, sink_target: default_sink_target() }; + + let mut job_builder = JobBuilder::default(); + job_builder.add_scan_source(source_opr.clone()); + let mut plan_builder_1 = PlanBuilder::new(1); + plan_builder_1.shuffle(None); + plan_builder_1.edge_expand(expand_opr1); + if let Some(get_v_opr1) = get_v_opr1 { + plan_builder_1.get_v(get_v_opr1); + } + let mut plan_builder_2 = PlanBuilder::new(2); + plan_builder_2.shuffle(None); + plan_builder_2.path_expand(path_opr.clone()); + plan_builder_2.get_v(endv); + job_builder.intersect(vec![plan_builder_1, plan_builder_2], TAG_B.into()); + + job_builder.sink(sink_pb); + job_builder.build().unwrap() + } + + #[test] + fn general_expand_multi_hop_path_and_intersect_test_01() { + initialize(); + // no edge tags, i.e., the optimized intersection + let request = init_intersect_path_edges_job_request(None); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let mut expected_result_ids = vec![v3]; + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(Some(TAG_B)).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + result_collection.sort(); + expected_result_ids.sort(); + assert_eq!(result_collection, expected_result_ids); + } + + #[test] + fn general_expand_multi_hop_path_and_intersect_test_02() { + initialize(); + // with edge tags, i.e., the general intersection + let request = init_intersect_path_edges_job_request(Some(TAG_C)); + + let mut results = submit_query(request, 1); + let mut result_collection = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let mut expected_result_ids = vec![v3]; + while let Some(result) = results.next() { + match result { + Ok(res) => { + let record = parse_result(res).unwrap(); + if let Some(vertex) = record.get(Some(TAG_B)).unwrap().as_vertex() { + result_collection.push(vertex.id() as DefaultId); + } + if let Some(edge) = record.get(Some(TAG_C)).unwrap().as_edge() { + assert_eq!(edge.src_id as DefaultId, v1); + assert_eq!(edge.dst_id as DefaultId, v3); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + + result_collection.sort(); + expected_result_ids.sort(); + assert_eq!(result_collection, expected_result_ids); + } } diff --git a/interactive_engine/executor/ir/runtime/Cargo.toml b/interactive_engine/executor/ir/runtime/Cargo.toml index cb24706366d6..b4cab6c388ee 100644 --- a/interactive_engine/executor/ir/runtime/Cargo.toml +++ b/interactive_engine/executor/ir/runtime/Cargo.toml @@ -17,6 +17,7 @@ prost = "0.11" vec_map = "0.8.2" ahash = ">=0.8.0,<=0.8.7" rand = "0.8.5" +itertools = "0.10" [features] default = [] diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 834cee1d1278..fa904b445e0d 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -15,6 +15,7 @@ use std::convert::TryInto; use std::sync::Arc; +use std::vec; use graph_proxy::apis::cluster_info::ClusterInfo; use graph_proxy::apis::partitioner::PartitionInfo; @@ -151,6 +152,12 @@ impl FnGenerator { Ok(opr.gen_filter_map()?) } + fn gen_general_edge_expand_collection( + &self, opr: pb::EdgeExpand, opr2: Option, + ) -> FnGenResult { + Ok((opr, opr2).gen_filter_map()?) + } + fn gen_path_start(&self, opr: pb::PathExpand) -> FnGenResult { Ok(opr.gen_filter_map()?) } @@ -451,78 +458,137 @@ impl IRJobAssembly { } } OpKind::Intersect(intersect) => { - // The intersect op can be: - // 1) EdgeExpand with Opt = ExpandV, which is to expand and intersect on id-only vertices; - // 2) EdgeExpand (ExpandE) + GetV(Adj), which is to expand and intersect on vertices. + // The subplan in intersect can be: + // 1) (repartition) + EdgeExpand (ExpandV) which is to expand and intersect on id-only vertices; + // or (repartition) + EdgeExpand (ExpandV) + (repartition) + GetV(Itself), which is to expand and intersect on vertices. + // In this case, GetV(Itself) usually stands for further filtering on the intersected vertices. + // 2) (repartition) + EdgeExpand (ExpandE) + GetV(Adj), which is to expand and intersect on vertices. // In this case, EdgeExpand and GetV are not fused, usually with alias in EdgeExpand; Not supported yet; - // 3) EdgeExpand with Opt = ExpandE, which is to expand and intersect on edges - // (not supported yet, and it won't happen in current plan); - // 4) PathExpand + GetV(EndV), which is to expand paths and intersect on the end vertices. - // Specifically, - // 1) if we want to expand and intersect on vertices, while there are some further filters on the intersected vertices, - // this would be translated into Intersect(EdgeExpand(V), EdgeExpand(V)) + Unfold + Select in physical plan for now. - // 2) on distributed graph database, the intersect op exists together with the `Repartition` op in subplans. + // or (repartition) + EdgeExpand (ExpandE) + GetV(Adj) + (repartition) + GetV(Itself), which is to expand and intersect on vertices. + // In this case, EdgeExpand and GetV are not fused, usually with alias in EdgeExpand; + // And GetV(Itself) usually stands for further filtering on the intersected vertices. + // 3) (repartition) + PathExpand + GetV(EndV), which is to expand paths and intersect on the end vertices. + // or (repartition) + PathExpand + GetV(EndV) + (repartition) + GetV(Itself), which is to expand paths and intersect on the end vertices. + // And GetV(Itself) usually stands for further filtering on the intersected vertices. + // Specifically, we slightly modify the plan due to the implementation, as follows: + // 1) basically, we extract all the last edge_expand step (ExpandV or ExpandE+GetV), for intersection; + // 2) the intersect results is a collection, we need to add an Unfold op after intersect; + // 3) if there are some further filters on the intersected vertices on distributed graph database, + // we need to add a repartition + Auxilia op after Unfold to filter the intersected vertices. + // 4) for the cases of PathExpand, we need to pre-expand the path and endV, and then intersect on the last edge_expand. + // 5) specifically, if the edge_expands to intersect are all ExpandV, we can apply the optimized intersect implementation + // i.e., ExpandIntersect which won't preserve any edges during the intersect let mut intersected_expands = vec![]; let mut pre_expands = vec![]; + let mut auxilia: Option = None; + let mut auxilia_repartition = None; for mut subplan in intersect.sub_plans { - if subplan.plan.len() > 3 { - Err(FnGenError::unsupported_error(&format!( - "subplan in pb::Intersect::plan {:?}", - subplan, - )))? - } - let last_op = subplan.plan.pop().ok_or_else(|| { + let subplan_clone = subplan.clone(); + let mut last_op = subplan.plan.pop().ok_or_else(|| { FnGenError::from(ParsePbError::EmptyFieldError( "subplan in pb::Intersect::plan".to_string(), )) })?; + + // if the last opr is Auxilia, move it after intersect + if let OpKind::Vertex(mut vertex) = to_op_kind(&last_op)? { + if vertex.opt == pb::get_v::VOpt::Itself as i32 { + vertex.tag = Some(intersect.key); + auxilia = Some(vertex.clone()); + if subplan + .plan + .last() + .map(|op| op.is_repartition()) + .unwrap_or(false) + { + auxilia_repartition = subplan.plan.pop(); + } + last_op = subplan.plan.pop().ok_or_else(|| { + FnGenError::unsupported_error(&format!( + "subplan with only getV in pb::Intersect::plan {:?}", + vertex, + )) + })?; + } + } + + // then, process subplans after removing the last Auxilia let last_op_kind = to_op_kind(&last_op)?; match last_op_kind { - OpKind::Edge(expand) => { - // the case of expand id-only vertex - let repartition = if let Some(prev) = subplan.plan.last() { - if let OpKind::Repartition(edge_expand_repartition) = to_op_kind(prev)? - { - subplan.plan.pop(); - Some(edge_expand_repartition) + // case 1: EdgeExpandV + OpKind::Edge(mut expand) => { + expand.alias = Some(intersect.key.clone()); + if let Some(opr) = subplan.plan.last() { + if opr.is_repartition() { + intersected_expands.push((subplan.plan.pop(), expand, None)); } else { Err(FnGenError::unsupported_error(&format!( - "subplan in pb::Intersect::plan {:?}", - subplan, + "Subplan in Intersection in EdgeExpandV {:?}", + PhysicalPlanPrinter(&subplan_clone), )))? } } else { - None - }; - intersected_expands.push((repartition, expand)); + intersected_expands.push((None, expand, None)); + } } - OpKind::Vertex(mut end_v) => { - let prev_opr = subplan.plan.pop().ok_or_else(|| { + // case 2/3: PathExpand/EdgeExpand + GetV + OpKind::Vertex(mut get_v) => { + let prev_opr_kind = to_op_kind(&subplan.plan.pop().ok_or_else(|| { FnGenError::unsupported_error(&format!( "subplan with only getV in pb::Intersect::plan {:?}", - end_v, + get_v, )) - })?; - let prev_opr_kind = to_op_kind(&prev_opr)?; + })?)?; match prev_opr_kind { + OpKind::Edge(edge_expand) => { + // case2: ExpandE + GetV(Adj) + if get_v.opt == pb::get_v::VOpt::Itself as i32 { + Err(FnGenError::unsupported_error(&format!( + "Subplan in Intersection in EdgeExpandE+GetV {:?}", + PhysicalPlanPrinter(&subplan_clone), + )))? + } + // note that this get_v won't take filters, as it should be translated to auxilia. + if let Some(params) = &get_v.params { + if params.has_predicates() || params.has_columns() { + Err(FnGenError::unsupported_error(&format!( + "Subplan in Intersection in EdgeExpandE+GetV {:?}", + PhysicalPlanPrinter(&subplan_clone), + )))? + } + } + if let Some(opr) = subplan.plan.last() { + if opr.is_repartition() { + intersected_expands.push(( + subplan.plan.pop(), + edge_expand, + Some(get_v), + )); + } else { + Err(FnGenError::unsupported_error(&format!( + "Subplan in Intersection in EdgeExpandE+GetV {:?}", + PhysicalPlanPrinter(&subplan_clone), + )))? + } + } else { + intersected_expands.push((None, edge_expand, Some(get_v))); + } + } OpKind::Path(mut path_expand) => { - // the case of PathExpand + EndV - if end_v.opt != pb::get_v::VOpt::End as i32 { + // case3: PathExpand + GetV(EndV) + if get_v.opt != pb::get_v::VOpt::End as i32 { Err(FnGenError::unsupported_error(&format!( - "Subplan in Intersection {:?}", - subplan, + "Subplan in Intersection in PathExpand + GetV {:?}", + PhysicalPlanPrinter(&subplan_clone), )))? } - let repartition = if let Some(prev) = subplan.plan.last() { - if let OpKind::Repartition(path_expand_repartition) = - to_op_kind(prev)? - { - subplan.plan.pop(); - Some(path_expand_repartition) + let path_repartition = if let Some(opr) = subplan.plan.last() { + if opr.is_repartition() { + subplan.plan.pop() } else { Err(FnGenError::unsupported_error(&format!( - "subplan in pb::Intersect::plan {:?}", - subplan, + "Subplan in Intersection in PathExpand + GetV {:?}", + PhysicalPlanPrinter(&subplan_clone), )))? } } else { @@ -574,34 +640,53 @@ impl IRJobAssembly { hop_range.lower )))? } + let mut edge_expand = base_edge_expand.clone(); + let mut edge_repartition = None; if hop_range.lower == 1 && hop_range.upper == 2 { // optimized Path(1..2) to as EdgeExpand - let mut edge_expand = base_edge_expand.clone(); edge_expand.v_tag = path_expand.start_tag; - edge_expand.alias = end_v.alias; - intersected_expands.push((repartition, edge_expand)); + edge_expand.alias = get_v.alias; + edge_repartition = path_repartition.clone(); } else { // translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand, - let mut edge_expand = base_edge_expand.clone(); edge_expand.v_tag = None; // edge expand should carry endv's alias, which is the intersect key. - edge_expand.alias = end_v.alias.clone(); - end_v.alias.take(); + edge_expand.alias = get_v.alias.clone(); + get_v.alias.take(); hop_range.lower -= 1; hop_range.upper -= 1; // pre expand path_expand(l-1, h-1) - if let Some(repartition) = repartition.clone() { - pre_expands.push(repartition.into()); + if let Some(repartition) = path_repartition.clone() { + pre_expands.push(repartition); } pre_expands.push(path_expand.into()); - pre_expands.push(end_v.into()); - // and then expand and intersect on the last edge_expand - intersected_expands.push((repartition, edge_expand)); + pre_expands.push(get_v.into()); + if path_repartition.is_some() { + edge_repartition = Some( + pb::Repartition { + strategy: Some( + pb::repartition::Strategy::ToAnother( + pb::repartition::Shuffle { + shuffle_key: None, + }, + ), + ), + } + .into(), + ); + } } + // and then expand and intersect on the last edge_expand + intersected_expands.push(( + edge_repartition.clone(), + edge_expand, + None, + )); } + _ => Err(FnGenError::unsupported_error(&format!( "Subplan in Intersection to intersect: {:?}", - subplan + PhysicalPlanPrinter(&subplan), )))?, } } @@ -612,22 +697,49 @@ impl IRJobAssembly { )))?, } } + // pre-expanding for the path_expand case if !pre_expands.is_empty() { stream = self.install(stream, &pre_expands)?; } + // process intersect of edge_expands + let is_optimized = intersected_expands + .iter() + .all(|(_, _, get_v)| get_v.is_none()); + let mut intersect_expand_funcs = Vec::with_capacity(intersected_expands.len()); + for (repartition, expand, get_v) in intersected_expands { + let expand_func = if !is_optimized { + self.udf_gen + .gen_general_edge_expand_collection(expand, get_v)? + } else { + self.udf_gen + .gen_edge_expand_collection(expand)? + }; + intersect_expand_funcs.push((repartition, expand_func)); + } // intersect of edge_expands - for (repartition, expand_intersect_opr) in intersected_expands { + for (repartition, expand_intersect_func) in intersect_expand_funcs { if let Some(repartition) = repartition { - stream = self.install(stream, &vec![repartition.into()])?; + stream = self.install(stream, &vec![repartition])?; } - let expand_func = self - .udf_gen - .gen_edge_expand_collection(expand_intersect_opr)?; stream = stream.filter_map_with_name("ExpandIntersect", move |input| { - expand_func.exec(input) + expand_intersect_func.exec(input) })?; } + // unfold the intersection + let unfold = + pb::Unfold { tag: Some(intersect.key.into()), alias: Some(intersect.key.into()) }; + stream = self.install(stream, &vec![unfold.into()])?; + + // add vertex filters + if let Some(mut auxilia) = auxilia { + auxilia.tag = Some(intersect.key.into()); + if let Some(auxilia_repartition) = auxilia_repartition { + stream = self.install(stream, &vec![auxilia_repartition, auxilia.into()])?; + } else { + stream = self.install(stream, &vec![auxilia.into()])?; + } + } } OpKind::Vertex(vertex) => { let vertex_opt: algebra_pb::get_v::VOpt = unsafe { std::mem::transmute(vertex.opt) }; diff --git a/interactive_engine/executor/ir/runtime/src/process/entry.rs b/interactive_engine/executor/ir/runtime/src/process/entry.rs index 72ae1a9b9d63..a1dedfb57ab8 100644 --- a/interactive_engine/executor/ir/runtime/src/process/entry.rs +++ b/interactive_engine/executor/ir/runtime/src/process/entry.rs @@ -33,7 +33,7 @@ use pegasus::codec::{Decode, Encode, ReadExt, WriteExt}; use pegasus_common::downcast::*; use pegasus_common::impl_as_any; -use crate::process::operator::map::IntersectionEntry; +use crate::process::operator::map::{GeneralIntersectionEntry, IntersectionEntry}; #[derive(Debug, PartialEq)] pub enum EntryType { @@ -152,12 +152,21 @@ impl Encode for DynEntry { self.as_object().unwrap().write_to(writer)?; } EntryType::Intersection => { - writer.write_u8(5)?; - self.inner + if let Some(intersect) = self .as_any_ref() .downcast_ref::() - .unwrap() - .write_to(writer)?; + { + writer.write_u8(5)?; + intersect.write_to(writer)?; + } else if let Some(intersect) = self + .as_any_ref() + .downcast_ref::() + { + writer.write_u8(8)?; + intersect.write_to(writer)?; + } else { + unreachable!() + } } EntryType::Collection => { writer.write_u8(6)?; @@ -212,6 +221,10 @@ impl Decode for DynEntry { let pair = PairEntry::read_from(reader)?; Ok(DynEntry::new(pair)) } + 8 => { + let general_intersect = GeneralIntersectionEntry::read_from(reader)?; + Ok(DynEntry::new(general_intersect)) + } _ => unreachable!(), } } @@ -421,6 +434,12 @@ impl Entry for IntersectionEntry { } } +impl Entry for GeneralIntersectionEntry { + fn get_type(&self) -> EntryType { + EntryType::Intersection + } +} + impl Entry for GraphPath { fn get_type(&self) -> EntryType { EntryType::Path @@ -636,6 +655,18 @@ impl From> for DynEntry { } } +impl From for DynEntry { + fn from(i: IntersectionEntry) -> Self { + DynEntry::new(i) + } +} + +impl From for DynEntry { + fn from(i: GeneralIntersectionEntry) -> Self { + DynEntry::new(i) + } +} + impl From for DynEntry { fn from(c: CollectionEntry) -> Self { DynEntry::new(c) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs index b7e73c355fb0..400dac9e39aa 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs @@ -22,7 +22,7 @@ use pegasus_common::downcast::AsAny; use crate::error::{FnExecError, FnGenResult}; use crate::process::entry::{CollectionEntry, Entry, EntryType}; use crate::process::operator::flatmap::FlatMapFuncGen; -use crate::process::operator::map::IntersectionEntry; +use crate::process::operator::map::{GeneralIntersectionEntry, IntersectionEntry}; use crate::process::record::Record; #[derive(Debug)] @@ -53,19 +53,38 @@ impl FlatMapFunction for UnfoldOperator { // The reason is that the alias of the collection is system-given, which is used as a hint of intersection, // hence there's no need to preserve the collection anymore. let entry = input.take(self.tag.as_ref()).unwrap(); - let intersection = entry + if let Some(intersection) = entry .as_any_ref() .downcast_ref::() - .ok_or_else(|| { - FnExecError::unexpected_data_error("downcast intersection entry in UnfoldOperator") - })?; - let mut res = Vec::with_capacity(intersection.len()); - for item in intersection.iter().cloned() { - let mut new_entry = input.clone(); - new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias); - res.push(new_entry); + { + let mut res = Vec::with_capacity(intersection.len()); + for item in intersection.iter().cloned() { + let mut new_entry = input.clone(); + new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias); + res.push(new_entry); + } + Ok(Box::new(res.into_iter())) + } else if let Some(general_intersection) = entry + .as_any_ref() + .downcast_ref::() + { + let mut res = Vec::with_capacity(general_intersection.len()); + for (vid, matchings) in general_intersection.matchings_iter() { + for matching in matchings { + let mut new_entry = input.clone(); + for (column, tag) in matching { + new_entry.append(column.clone(), Some(tag)); + } + new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias); + res.push(new_entry); + } + } + Ok(Box::new(res.into_iter())) + } else { + Err(FnExecError::unexpected_data_error( + "downcast intersection entry in UnfoldOperator", + ))? } - Ok(Box::new(res.into_iter())) } EntryType::Collection => { let entry = input.get(self.tag).unwrap(); diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs index 67c35c75c3dd..302625c3aabb 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs @@ -16,13 +16,15 @@ use std::any::Any; use std::collections::BTreeMap; use std::convert::TryInto; +use std::ops::RangeBounds; use dyn_type::BorrowObject; use graph_proxy::apis::graph::element::GraphElement; -use graph_proxy::apis::{Direction, Element, QueryParams, Statement, ID}; +use graph_proxy::apis::{Direction, Edge, Element, QueryParams, Statement, ID}; use ir_common::error::ParsePbError; use ir_common::generated::physical as pb; use ir_common::KeyId; +use itertools::Itertools; use pegasus::api::function::{FilterMapFunction, FnResult}; use pegasus::codec::{Decode, Encode, ReadExt, WriteExt}; use pegasus_common::downcast::*; @@ -242,17 +244,469 @@ impl FilterMapFuncGen for pb::EdgeExpand { } } +// EdgeMatching denotes the matching of edges of one EdgeExpand during the intersection, +// e.g., from a previously matched vertex a1, we expand an edge [a1->c1]. +// We define `EdgeMatching` (rather than use Edge directly), to support duplicated edge matchings, +// e.g., we may actually expand two a1->c1 (with different edge types). +#[derive(Debug, Clone, Hash, PartialEq, PartialOrd)] +struct EdgeMatching { + matching: Vec, +} + +impl Default for EdgeMatching { + fn default() -> Self { + Self { matching: Vec::new() } + } +} + +impl EdgeMatching { + fn new(matching: Vec) -> EdgeMatching { + EdgeMatching { matching } + } + + fn is_empty(&self) -> bool { + self.matching.is_empty() + } +} + +impl Encode for EdgeMatching { + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + self.matching.write_to(writer) + } +} + +impl Decode for EdgeMatching { + fn read_from(reader: &mut R) -> std::io::Result { + let matching = >::read_from(reader)?; + Ok(EdgeMatching { matching }) + } +} + +// The EdgeMatchings starting from the same vertex during the intersection. +// e.g., from a previously matched edge [a1->b1], when expanding `EdgeExpand`s from vertices a1, +// we have EdgeMatchings of [a1->c1, a1->c2, a1->c3] +#[derive(Debug, Clone, Hash, PartialEq, PartialOrd, Default)] +struct EdgeMatchings { + matchings: Vec, +} + +impl EdgeMatchings { + fn with_capacity(capacity: usize) -> EdgeMatchings { + EdgeMatchings { matchings: vec![EdgeMatching::default(); capacity] } + } + + fn new(matchings: Vec) -> EdgeMatchings { + EdgeMatchings { matchings } + } + + fn swap(&mut self, a: usize, b: usize) { + self.matchings.swap(a, b); + } + + fn drain(&mut self, range: R) + where + R: RangeBounds, + { + self.matchings.drain(range); + } +} + +impl Encode for EdgeMatchings { + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + self.matchings.write_to(writer) + } +} + +impl Decode for EdgeMatchings { + fn read_from(reader: &mut R) -> std::io::Result { + let matchings = >::read_from(reader)?; + Ok(EdgeMatchings { matchings }) + } +} + +/// A more general entry implementation for intersection, which preserves the matchings of intermediate expanded edges if needed; +/// +/// For example, given a triangle pattern [A->B->C<-A], +/// The plan consists of an EdgeExpand{tag: A, alias: B}, and then a Intersection {EdgeExpand{tag: A, alias:C, edge_alias: TagA}, EdgeExpand{tag: B, alias: C, edge_alias: TagB}} +/// Then during execution, based on one matching [a1->b1] of Pattern [A->B], we apply the IntersectionOpr, and save the intermediate results in `GeneralIntersectionEntry` +/// Specifically, +/// 1. To match Expand[A->C], we have `EdgeMatchings` of [a1->c1, a1->c2, a1->c3, a1->c4], saved in `edge_vecs`. +/// and then the intersected vertices in `vertex_vec` is [c1,c2,c3, c4], i.e., matchings of vertex C. +/// A more complicated case is [(a1-knows->c1, a1-family->c1), a1->c2, a1->c3, a1->c4], where the two a1->c1 edges with different types is saved as `EdgeMatching`, +/// and in this case, the intersected vertices is still [c1,c2,c3, c4], but the count_vec would be marked as [2, 1, 1, 1] (2 c1, 1 c2, etc.). +/// Moreover, we save the edge_alias of this EdgeExpand in `edge_tags`, i.e., [TagA] +/// +/// 2. Then to match Expand[B->C], we find matchings [b1->c1, b1->c2, b1->c3]. Thus, we updated GeneralIntersectionEntry as: +/// `vertex_vec` to save the intersected vertex as [c1,c2,c3], where c4 is filtered. And `count_vec` would be updated accordingly. +/// `edge_vecs` to save the matchings of the expanded edges as +/// [ +/// [a1->c1, a1->c2, a1->c3] +/// [b1->c1, b1->c2, b1->c3] +/// ] +/// , where the a1->c4 is filtered. +/// and `edge_tags` is [TagA, TagB]. +/// +/// 3. Finally, we can apply the `matchings_iter` function, to flatten the GeneralIntersectionEntry into a series of matchings, in a Record-like format. +#[derive(Debug, Clone, Hash, PartialEq, PartialOrd)] +pub struct GeneralIntersectionEntry { + // Preserves the common intersected vertices, e.g., [c1,c2,c3] + vertex_vec: Vec, + // Preserves the EdgeMatchings during the intersection. + // Each entry is one EdgeMatchings, that is, the adjacent edges targeting to each vertex in the vertex_vec (corresponding to the same index). + // e.g., from a previously matched edge [a1->b1], to intersect two `EdgeExpand`s from a1 and b1 respectively, we may have results: + // [ + // [a1->c1, a1->c2, a1->c3] + // [b1->c1, b1->c2, b1->c3] + // ] + edge_vecs: Vec, + // A list of tags, each tags the results of one EdgeExpand during the intersection. + // e.g., for two `EdgeExpands`, we may have tags of [TagA, TagB], + // then, edge matchings a1->c1, a1->c2, a1->c3 has tag of TagA, and b1->c1, b1->c2, b1->c3 has tag of TagB. + edge_tags: Vec, + // the number of matchings for each intersected vertices + count_vec: Vec, +} + +impl_as_any!(GeneralIntersectionEntry); + +impl GeneralIntersectionEntry { + pub fn from_iter>(iter: I) -> GeneralIntersectionEntry { + let mut vertex_count_map = BTreeMap::new(); + for vertex in iter { + let cnt = vertex_count_map.entry(vertex).or_insert(0); + *cnt += 1; + } + let mut vertex_vec = Vec::with_capacity(vertex_count_map.len()); + let mut count_vec = Vec::with_capacity(vertex_count_map.len()); + for (vertex, cnt) in vertex_count_map.into_iter() { + vertex_vec.push(vertex); + count_vec.push(cnt); + } + GeneralIntersectionEntry { + vertex_vec, + edge_vecs: Vec::with_capacity(0), + edge_tags: Vec::with_capacity(0), + count_vec, + } + } + + pub fn from_edge_iter>(iter: I, edge_tag: KeyId) -> GeneralIntersectionEntry { + let mut vertex_edge_map = BTreeMap::new(); + for edge in iter { + let vertex = edge.get_other_id(); + let edges = vertex_edge_map + .entry(vertex) + .or_insert_with(Vec::new); + edges.push(edge); + } + let mut vertex_vec = Vec::with_capacity(vertex_edge_map.len()); + let mut count_vec = Vec::with_capacity(vertex_edge_map.len()); + let mut edge_vec = Vec::with_capacity(vertex_edge_map.len()); + for (vertex, edges) in vertex_edge_map.into_iter() { + vertex_vec.push(vertex); + count_vec.push(edges.len() as u32); + edge_vec.push(EdgeMatching::new(edges)); + } + GeneralIntersectionEntry { + vertex_vec, + edge_vecs: vec![EdgeMatchings::new(edge_vec)], + edge_tags: vec![edge_tag], + count_vec, + } + } + + // intersect ids indicates no edges need to be preserved, just intersect on the target vertex ids + fn intersect>(&mut self, seeker: Iter) { + let len = self.vertex_vec.len(); + let mut s = vec![0; len]; + for vid in seeker { + if let Ok(idx) = self + .vertex_vec + .binary_search_by(|e| e.cmp(&vid)) + { + s[idx] += 1; + } + } + let mut idx = 0; + for (i, cnt) in s.into_iter().enumerate() { + if cnt != 0 { + self.vertex_vec.swap(idx, i); + self.count_vec.swap(idx, i); + for edge_vec in self.edge_vecs.iter_mut() { + edge_vec.swap(idx, i); + } + self.count_vec[idx] *= cnt; + idx += 1; + } + } + self.vertex_vec.drain(idx..); + self.count_vec.drain(idx..); + for edge_vec in self.edge_vecs.iter_mut() { + edge_vec.drain(idx..); + } + } + + // intersect edges indicates to intersect on the target vertex ids, while preserving the expanded edges + fn general_intersect>(&mut self, seeker: Iter, edge_tag: KeyId) { + let len = self.vertex_vec.len(); + let mut e = vec![Vec::new(); len]; + for edge in seeker { + let vid = edge.get_other_id(); + if let Ok(idx) = self + .vertex_vec + .binary_search_by(|e| e.cmp(&vid)) + { + e[idx].push(edge); + } + } + let mut idx = 0; + let mut expanded_edge_matchings = EdgeMatchings::with_capacity(len); + for (i, edges) in e.into_iter().enumerate() { + let cnt = edges.len() as u32; + if cnt != 0 { + self.vertex_vec.swap(idx, i); + self.count_vec.swap(idx, i); + for edge_vec in self.edge_vecs.iter_mut() { + edge_vec.swap(idx, i); + } + self.count_vec[idx] *= cnt; + expanded_edge_matchings.matchings[idx] = EdgeMatching::new(edges); + idx += 1; + } + } + self.vertex_vec.drain(idx..); + self.count_vec.drain(idx..); + for edge_vec in self.edge_vecs.iter_mut() { + edge_vec.drain(idx..); + } + expanded_edge_matchings.matchings.drain(idx..); + if !expanded_edge_matchings.matchings.is_empty() { + self.edge_vecs.push(expanded_edge_matchings); + self.edge_tags.push(edge_tag); + } + } + + fn is_empty(&self) -> bool { + self.vertex_vec.is_empty() + } + + fn len(&self) -> usize { + let mut len = 0; + for count in self.count_vec.iter() { + len += *count; + } + len as usize + } + + pub fn iter(&self) -> impl Iterator { + self.vertex_vec + .iter() + .zip(&self.count_vec) + .flat_map(move |(vertex, count)| std::iter::repeat(vertex).take(*count as usize)) + } + + // output the results of matchings in the intersection. + // e.g., edge_vecs preserves + // [ + // [a1->c1, a1->c2, a1->c3] + // [b1->c1, b1->c2, b1->c3] + // ] + // with edge_tags as [TagA, TagB], + // then the output looks like: + // (c1, [(a1->c1, TagA), (b1->c1, TagB)]), (c2, [(a1->c2, TagA), (b1->c2, TagB)]), (c3, [(a1->c3, TagA), (b1->c3, TagB)]) + // Here, each item corresponds to a record (a complete matching). + pub fn matchings_iter(&self) -> impl Iterator>)> { + if self.edge_vecs.is_empty() { + return vec![].into_iter(); + } + let mut result = Vec::with_capacity(self.vertex_vec.len()); + for i in 0..self.edge_vecs[0].matchings.len() { + if self.edge_vecs[0].matchings[i].is_empty() { + warn!( + "The {}-th entry of {:?} is empty in intersection, should be erased", + i, self.edge_vecs[0] + ); + continue; + } + // the target vertex id + let dst = self.edge_vecs[0].matchings[i].matching[0].get_other_id(); + // the records with target dst consists of columns of TagA, TagB, ..., which is a cartesian product of all these tags + let product = (0..self.edge_vecs.len()) + .map(|tag_idx| &self.edge_vecs[tag_idx].matchings[i].matching) + .multi_cartesian_product(); + let records_num: usize = (0..self.edge_vecs.len()) + .map(|tag_index| { + self.edge_vecs[tag_index].matchings[i] + .matching + .len() + }) + .product(); + let mut records = Vec::with_capacity(records_num); + // each combination can be regarded as multiple columns in record (with no alias, so we need to zip it). + for combination in product { + let record: Vec<_> = combination + .into_iter() + .zip(self.edge_tags.iter().cloned()) + .collect(); + records.push(record); + } + result.push((dst, records)); + } + return result.into_iter(); + } +} + +impl Encode for GeneralIntersectionEntry { + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + self.vertex_vec.write_to(writer)?; + self.edge_vecs.write_to(writer)?; + self.edge_tags.write_to(writer)?; + self.count_vec.write_to(writer)?; + Ok(()) + } +} + +impl Decode for GeneralIntersectionEntry { + fn read_from(reader: &mut R) -> std::io::Result { + let vertex_vec = >::read_from(reader)?; + let edge_vecs = >::read_from(reader)?; + let edge_tags = >::read_from(reader)?; + let count_vec = >::read_from(reader)?; + Ok(GeneralIntersectionEntry { vertex_vec, edge_vecs, edge_tags, count_vec }) + } +} + +impl Element for GeneralIntersectionEntry { + fn len(&self) -> usize { + self.len() + } + + fn as_borrow_object(&self) -> BorrowObject { + BorrowObject::None + } +} + +// a more general version of ExpandOrIntersect operator +// to expand neighbors and intersect with the ones of the same tag found previously (if exists). +// If edge_tag (the alias of expanded edges) is specified, the intermediate expanded edges will also be preserved. +// Notice that do not mix the usage of GeneralExpandOrIntersect and ExpandOrIntersect: +// if during the intersection, all the expanded edges are not needed, use the optimized version of ExpandOrIntersect, +// otherwise, use GeneralExpandOrIntersect. +pub struct GeneralExpandOrIntersect { + start_v_tag: Option, + end_v_tag: KeyId, + edge_tag: Option, + stmt: Box>, +} + +impl FilterMapFunction for GeneralExpandOrIntersect { + fn exec(&self, mut input: Record) -> FnResult> { + let entry = input.get(self.start_v_tag).ok_or_else(|| { + FnExecError::get_tag_error(&format!( + "get start_v_tag {:?} from record in `ExpandOrIntersect` operator, the record is {:?}", + self.start_v_tag, input + )) + })?; + match entry.get_type() { + EntryType::Vertex => { + let id = entry.id(); + let edge_iter = self.stmt.exec(id)?; + if let Some(pre_entry) = input.get_mut(Some(self.end_v_tag)) { + // the case of expansion and intersection + let pre_intersection = pre_entry + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "entry is not a intersection in ExpandOrIntersect" + )) + })?; + if let Some(edge_tag) = self.edge_tag { + pre_intersection.general_intersect(edge_iter, edge_tag); + } else { + pre_intersection.intersect(edge_iter.map(|e| e.get_other_id())); + } + if pre_intersection.is_empty() { + Ok(None) + } else { + Ok(Some(input)) + } + } else { + // the case of expansion only + let neighbors_intersection = if let Some(edge_tag) = self.edge_tag { + GeneralIntersectionEntry::from_edge_iter(edge_iter, edge_tag) + } else { + GeneralIntersectionEntry::from_iter(edge_iter.map(|e| e.get_other_id())) + }; + if neighbors_intersection.is_empty() { + Ok(None) + } else { + // append columns without changing head + let columns = input.get_columns_mut(); + columns.insert(self.end_v_tag as usize, DynEntry::new(neighbors_intersection)); + Ok(Some(input)) + } + } + } + _ => Err(FnExecError::unsupported_error(&format!( + "expand or intersect entry {:?} of tag {:?} failed in ExpandOrIntersect", + entry, self.end_v_tag + )))?, + } + } +} + +impl FilterMapFuncGen for (pb::EdgeExpand, Option) { + fn gen_filter_map(self) -> FnGenResult>> { + if self.1.is_none() && self.0.expand_opt != pb::edge_expand::ExpandOpt::Vertex as i32 { + return Err(FnGenError::unsupported_error(&format!( + "GeneralExpandOrIntersect with {:?}", + self + ))); + } + let graph = graph_proxy::apis::get_graph().ok_or_else(|| FnGenError::NullGraphError)?; + let start_v_tag = self.0.v_tag; + let edge_tag = self.0.alias; + let end_v_tag = if let Some(getv) = self.1 { getv.alias } else { self.0.alias } + .ok_or_else(|| ParsePbError::from("`GetV::alias` cannot be empty for intersection"))?; + let direction_pb: pb::edge_expand::Direction = unsafe { ::std::mem::transmute(self.0.direction) }; + let direction = Direction::from(direction_pb); + let query_params: QueryParams = self.0.params.try_into()?; + if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 { + debug!( + "Runtime expand collection operator of edge with start_v_tag {:?}, edge_tag {:?}, end_v_tag {:?}, direction {:?}, query_params {:?}", + start_v_tag, edge_tag, end_v_tag, direction, query_params + ); + } + + // Expand edges, since we need to preserve the edge information + let stmt = graph.prepare_explore_edge(direction, &query_params)?; + let edge_expand_operator = GeneralExpandOrIntersect { start_v_tag, edge_tag, end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) + } +} + #[cfg(test)] mod tests { - - use graph_proxy::apis::ID; + use graph_proxy::apis::{Edge, ID}; + use ir_common::KeyId; use super::IntersectionEntry; + use crate::process::operator::map::GeneralIntersectionEntry; + + const EDGE_TAG_A: KeyId = 0; + const EDGE_TAG_B: KeyId = 1; fn to_vertex_iter(id_vec: Vec) -> impl Iterator { id_vec.into_iter() } + fn to_edge_iter(eid_vec: Vec<(ID, ID)>) -> impl Iterator { + eid_vec + .into_iter() + .map(|(src, dst)| Edge::new(0, None, src, dst, Default::default())) + } + #[test] fn intersect_test_01() { let mut intersection = IntersectionEntry::from_iter(to_vertex_iter(vec![1, 2, 3])); @@ -324,4 +778,179 @@ mod tests { intersection.intersect(seeker); assert_eq!(intersection.drain().collect::>(), vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]) } + + fn general_intersect_test(iter1: Vec<(ID, ID)>, iter2: Vec<(ID, ID)>) -> Vec> { + let mut intersection = GeneralIntersectionEntry::from_edge_iter(to_edge_iter(iter1), EDGE_TAG_A); + println!("intersection: {:?}", intersection); + let seeker = to_edge_iter(iter2); + intersection.general_intersect(seeker, EDGE_TAG_B); + println!("intersection: {:?}", intersection); + let mut records = vec![]; + let matchings_collect = intersection.matchings_iter(); + for (vid, matchings) in matchings_collect { + for matching in matchings { + let mut record = vec![]; + for (edge, tag) in matching { + assert_eq!(vid, edge.get_other_id() as ID); + record.push((edge.src_id, edge.dst_id, tag)); + } + records.push(record); + } + } + records.sort(); + records + } + + #[test] + fn general_intersect_test_01() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 2), (0, 3)], + vec![(10, 1), (10, 2), (10, 3), (10, 4), (10, 5)] + ), + vec![ + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)] + ] + ); + } + + #[test] + fn general_intersect_test_02() { + assert_eq!( + general_intersect_test( + vec![(10, 1), (10, 2), (10, 3), (10, 4), (10, 5)], + vec![(0, 3), (0, 2), (0, 1)] + ), + vec![ + vec![(10, 1, EDGE_TAG_A), (0, 1, EDGE_TAG_B)], + vec![(10, 2, EDGE_TAG_A), (0, 2, EDGE_TAG_B)], + vec![(10, 3, EDGE_TAG_A), (0, 3, EDGE_TAG_B)] + ] + ); + } + + #[test] + fn general_intersect_test_03() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5)], + vec![(10, 9), (10, 7), (10, 5), (10, 3), (10, 1)] + ), + vec![ + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(0, 5, EDGE_TAG_A), (10, 5, EDGE_TAG_B)] + ] + ); + } + + #[test] + fn general_intersect_test_04() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5)], + vec![(0, 9), (0, 8), (0, 7), (0, 6)] + ) + .len(), + 0 + ); + } + + #[test] + fn general_intersect_test_05() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (6, 1)], + vec![(10, 1), (10, 2), (10, 3)] + ), + vec![ + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(6, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + ] + ); + } + + #[test] + fn general_intersect_test_06() { + assert_eq!( + general_intersect_test( + vec![(10, 1), (10, 2), (10, 3)], + vec![(0, 1), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5)] + ), + vec![ + vec![(10, 1, EDGE_TAG_A), (0, 1, EDGE_TAG_B)], + vec![(10, 1, EDGE_TAG_A), (0, 1, EDGE_TAG_B)], + vec![(10, 2, EDGE_TAG_A), (0, 2, EDGE_TAG_B)], + vec![(10, 3, EDGE_TAG_A), (0, 3, EDGE_TAG_B)] + ] + ); + } + + #[test] + fn general_intersect_test_07() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 1), (0, 2), (0, 2), (0, 3), (0, 3), (0, 4), (0, 5)], + vec![(10, 1), (10, 2), (10, 3)] + ), + vec![ + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)] + ] + ); + } + + #[test] + fn general_intersect_test_08() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 2), (0, 3)], + vec![(10, 1), (10, 1), (10, 2), (10, 2), (10, 3), (10, 3), (10, 4), (10, 5)] + ), + vec![ + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)] + ] + ); + let mut intersection = IntersectionEntry::from_iter(to_vertex_iter(vec![1, 2, 3])); + let seeker = to_vertex_iter(vec![1, 1, 2, 2, 3, 3, 4, 5]); + intersection.intersect(seeker); + assert_eq!(intersection.drain().collect::>(), vec![1, 1, 2, 2, 3, 3]) + } + + #[test] + fn general_intersect_test_09() { + assert_eq!( + general_intersect_test( + vec![(0, 1), (0, 1), (0, 2), (0, 2), (0, 3), (0, 3)], + vec![(10, 1), (10, 1), (10, 2), (10, 2), (10, 3), (10, 3), (10, 4), (10, 5)] + ), + vec![ + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 1, EDGE_TAG_A), (10, 1, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 2, EDGE_TAG_A), (10, 2, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)], + vec![(0, 3, EDGE_TAG_A), (10, 3, EDGE_TAG_B)] + ] + ); + } } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/mod.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/mod.rs index 53b2aed83354..01577f9120dd 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/mod.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/mod.rs @@ -19,7 +19,7 @@ mod path_end; mod path_start; mod project; -pub use expand_intersect::IntersectionEntry; +pub use expand_intersect::{GeneralIntersectionEntry, IntersectionEntry}; use pegasus::api::function::{FilterMapFunction, MapFunction}; use crate::error::FnGenResult; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs index f0ae4ff70b30..03e63494c38a 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs @@ -30,7 +30,7 @@ use prost::Message; use crate::error::{FnExecError, FnExecResult, FnGenResult}; use crate::process::entry::{CollectionEntry, DynEntry, Entry, EntryType, PairEntry}; -use crate::process::operator::map::IntersectionEntry; +use crate::process::operator::map::{GeneralIntersectionEntry, IntersectionEntry}; use crate::process::operator::sink::{SinkGen, Sinker}; use crate::process::record::Record; @@ -77,20 +77,40 @@ impl RecordSinkEncoder { } } EntryType::Intersection => { - let intersection = e + if let Some(intersection) = e .as_any_ref() .downcast_ref::() - .unwrap(); - let mut collection_pb = Vec::with_capacity(intersection.len()); - for v in intersection.iter() { - let vertex_pb = self.vid_to_pb(v); - let element_pb = - result_pb::Element { inner: Some(result_pb::element::Inner::Vertex(vertex_pb)) }; - collection_pb.push(element_pb); + { + let mut collection_pb = Vec::with_capacity(intersection.len()); + for v in intersection.iter() { + let vertex_pb = self.vid_to_pb(v); + let element_pb = result_pb::Element { + inner: Some(result_pb::element::Inner::Vertex(vertex_pb)), + }; + collection_pb.push(element_pb); + } + Some(result_pb::entry::Inner::Collection(result_pb::Collection { + collection: collection_pb, + })) + } else if let Some(general_intersection) = e + .as_any_ref() + .downcast_ref::() + { + let mut collection_pb = Vec::with_capacity(general_intersection.len()); + for v in general_intersection.iter() { + let vertex_pb = self.vid_to_pb(v); + let element_pb = result_pb::Element { + inner: Some(result_pb::element::Inner::Vertex(vertex_pb)), + }; + collection_pb.push(element_pb); + } + + Some(result_pb::entry::Inner::Collection(result_pb::Collection { + collection: collection_pb, + })) + } else { + Err(FnExecError::unsupported_error("unsupported intersection entry type"))? } - Some(result_pb::entry::Inner::Collection(result_pb::Collection { - collection: collection_pb, - })) } _ => { if let Some(map_pb) = self.try_map_to_pb(e) {