From 4360b0241348d4f7a841b56ed373f31583fb9f56 Mon Sep 17 00:00:00 2001 From: Rafael Bey <24432403+rafaelbey@users.noreply.github.com> Date: Tue, 30 Apr 2024 17:38:22 -0400 Subject: [PATCH] Extend implicit handling of runtime vs pkg runtime to the handlers of from functions (#2818) * update from handlers to support PackageableRuntime * update from handlers to support PackageableRuntime --------- Co-authored-by: sameer saini --- .../toPureGraph/handlers/Handlers.java | 10 ++-- ...chExecutionPlanFromGrammarIntegration.java | 53 ++++++++++++++++++- .../grammarForPlanIntegrationTesting.pure | 27 ++++++++++ 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/legend-engine-core/legend-engine-core-base/legend-engine-core-language-pure/legend-engine-language-pure-compiler/src/main/java/org/finos/legend/engine/language/pure/compiler/toPureGraph/handlers/Handlers.java b/legend-engine-core/legend-engine-core-base/legend-engine-core-language-pure/legend-engine-language-pure-compiler/src/main/java/org/finos/legend/engine/language/pure/compiler/toPureGraph/handlers/Handlers.java index 643659566fa..218356c7bb0 100644 --- a/legend-engine-core/legend-engine-core-base/legend-engine-core-language-pure/legend-engine-language-pure-compiler/src/main/java/org/finos/legend/engine/language/pure/compiler/toPureGraph/handlers/Handlers.java +++ b/legend-engine-core/legend-engine-core-base/legend-engine-core-language-pure/legend-engine-language-pure-compiler/src/main/java/org/finos/legend/engine/language/pure/compiler/toPureGraph/handlers/Handlers.java @@ -1163,7 +1163,7 @@ private void registerTDS() register( m( - m(h("meta::pure::mapping::from_T_m__Runtime_1__T_m_", false, ps -> res(ps.get(0)._genericType(), ps.get(0)._multiplicity()), ps -> ps.size() == 2 && typeOne(ps.get(1), Sets.immutable.with("Runtime", "EngineRuntime")))), + m(h("meta::pure::mapping::from_T_m__Runtime_1__T_m_", false, ps -> res(ps.get(0)._genericType(), ps.get(0)._multiplicity()), ps -> ps.size() == 2 && typeOne(ps.get(1), Sets.immutable.with("Runtime", "EngineRuntime", "PackageableRuntime")))), m(h("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__TabularDataSet_1_", false, ps -> res("meta::pure::tds::TabularDataSet", "one"), ps -> ps.size() == 3 && "TabularDataSet".equals(ps.get(0)._genericType()._rawType()._name())), h("meta::pure::mapping::from_T_m__Mapping_1__Runtime_1__T_m_", false, ps -> res(ps.get(0)._genericType(), ps.get(0)._multiplicity()), ps -> ps.size() == 3) ), @@ -2521,9 +2521,9 @@ private Map buildDispatch() map.put("meta::pure::graphFetch::execution::serialize_Checked_MANY__RootGraphFetchTree_1__String_1_", (List ps) -> ps.size() == 2 && ("Nil".equals(ps.get(0)._genericType()._rawType()._name()) || "Checked".equals(ps.get(0)._genericType()._rawType()._name())) && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "RootGraphFetchTree", "ExtendedRootGraphFetchTree", "RoutedRootGraphFetchTree", "SerializeTopRootGraphFetchTree").contains(ps.get(1)._genericType()._rawType()._name())); map.put("meta::pure::graphFetch::execution::serialize_T_MANY__RootGraphFetchTree_1__AlloySerializationConfig_1__String_1_", (List ps) -> ps.size() == 3 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "RootGraphFetchTree", "ExtendedRootGraphFetchTree", "RoutedRootGraphFetchTree", "SerializeTopRootGraphFetchTree").contains(ps.get(1)._genericType()._rawType()._name()) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "AlloySerializationConfig".equals(ps.get(2)._genericType()._rawType()._name()))); map.put("meta::pure::graphFetch::execution::serialize_T_MANY__RootGraphFetchTree_1__String_1_", (List ps) -> ps.size() == 2 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "RootGraphFetchTree", "ExtendedRootGraphFetchTree", "RoutedRootGraphFetchTree", "SerializeTopRootGraphFetchTree").contains(ps.get(1)._genericType()._rawType()._name())); - map.put("meta::pure::mapping::from_T_m__Mapping_1__Runtime_1__T_m_", (List ps) -> ps.size() == 3 && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && typeOne(ps.get(2), Sets.immutable.with("Runtime", "EngineRuntime"))); - map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__ExecutionContext_1__TabularDataSet_1_", (List ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "Runtime".equals(ps.get(2)._genericType()._rawType()._name())) && isOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "ExecutionContext", "ExtendedExecutionContext", "RelationalExecutionContext", "AuthenticationContext", "AnalyticsExecutionContext", "BatchQueryContext", "LatestBatchQueryContext", "DirtyLatestBatchQueryContext", "VectorBatchQueryContext", "WatermarkExecutionContext", "SpecificWatermarkExecutionContext", "RefinerWatermarkExecutionContext").contains(ps.get(3)._genericType()._rawType()._name())); - map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__TabularDataSet_1_", (List ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "Runtime".equals(ps.get(2)._genericType()._rawType()._name()))); + map.put("meta::pure::mapping::from_T_m__Mapping_1__Runtime_1__T_m_", (List ps) -> ps.size() == 3 && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && typeOne(ps.get(2), Sets.immutable.with("Runtime", "EngineRuntime", "PackageableRuntime"))); + map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__ExecutionContext_1__TabularDataSet_1_", (List ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "PackageableRuntime").contains(ps.get(2)._genericType()._rawType()._name()) && isOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "ExecutionContext", "ExtendedExecutionContext", "RelationalExecutionContext", "AuthenticationContext", "AnalyticsExecutionContext", "BatchQueryContext", "LatestBatchQueryContext", "DirtyLatestBatchQueryContext", "VectorBatchQueryContext", "WatermarkExecutionContext", "SpecificWatermarkExecutionContext", "RefinerWatermarkExecutionContext").contains(ps.get(3)._genericType()._rawType()._name())); + map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__TabularDataSet_1_", (List ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "PackageableRuntime").contains(ps.get(2)._genericType()._rawType()._name())); map.put("meta::core::runtime::mergeRuntimes_Any_$1_MANY$__Runtime_1_", (List ps) -> ps.size() == 1 && matchOneMany(ps.get(0)._multiplicity())); map.put("meta::core::runtime::getRuntimeWithModelConnection_Class_1__Any_MANY__Runtime_1_", (List ps) -> ps.size() == 2 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Class", "MappingClass", "ClassProjection").contains(ps.get(0)._genericType()._rawType()._name())); map.put("meta::core::runtime::getRuntimeWithModelQueryConnection_Class_1__Binding_1__Byte_MANY__Runtime_1_", (List ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Class", "ClassProjection", "MappingClass").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Binding".equals(ps.get(1)._genericType()._rawType()._name())) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "Byte".equals(ps.get(2)._genericType()._rawType()._name()))); @@ -2612,7 +2612,7 @@ private Map buildDispatch() map.put("meta::pure::functions::date::calendar::wtd_Date_1__String_1__Date_1__Number_$0_1$", (List ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "String".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(2)._genericType()._rawType()._name()) && matchZeroOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "Number", "Integer", "Float", "Decimal").contains(ps.get(3)._genericType()._rawType()._name())); map.put("meta::pure::functions::date::calendar::ytd_Date_1__String_1__Date_1__Number_$0_1$", (List ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "String".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(2)._genericType()._rawType()._name()) && matchZeroOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "Number", "Integer", "Float", "Decimal").contains(ps.get(3)._genericType()._rawType()._name())); - map.put("meta::pure::mapping::from_T_m__Runtime_1__T_m_", (List ps) -> ps.size() == 2 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "EngineRuntime").contains(ps.get(1)._genericType()._rawType()._name())); + map.put("meta::pure::mapping::from_T_m__Runtime_1__T_m_", (List ps) -> ps.size() == 2 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "EngineRuntime", "PackageableRuntime").contains(ps.get(1)._genericType()._rawType()._name())); map.put("meta::pure::functions::relation::filter_Relation_1__Function_1__Relation_1_", (List ps) -> ps.size() == 2 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || check(funcType(ps.get(1)._genericType()), (FunctionType ft) -> isOne(ft._returnMultiplicity()) && ("Nil".equals(ft._returnType()._rawType()._name()) || "Boolean".equals(ft._returnType()._rawType()._name())) && check(ft._parameters().toList(), (List nps) -> nps.size() == 1 && isOne(nps.get(0)._multiplicity()))))); map.put("meta::pure::functions::relation::concatenate_Relation_1__Relation_1__Relation_1_", (List ps) -> ps.size() == 2 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(1)._genericType()._rawType()._name())); map.put("meta::pure::functions::relation::rename_Relation_1__ColSpec_1__ColSpec_1__Relation_1_", (List ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "ColSpec".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "ColSpec".equals(ps.get(2)._genericType()._rawType()._name()))); diff --git a/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/java/org/finos/legend/engine/plan/execution/stores/elasticsearch/test/TestElasticsearchExecutionPlanFromGrammarIntegration.java b/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/java/org/finos/legend/engine/plan/execution/stores/elasticsearch/test/TestElasticsearchExecutionPlanFromGrammarIntegration.java index 66f89a5d380..0e907cdf80b 100644 --- a/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/java/org/finos/legend/engine/plan/execution/stores/elasticsearch/test/TestElasticsearchExecutionPlanFromGrammarIntegration.java +++ b/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/java/org/finos/legend/engine/plan/execution/stores/elasticsearch/test/TestElasticsearchExecutionPlanFromGrammarIntegration.java @@ -47,6 +47,7 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.testcontainers.DockerClientFactory; @@ -142,9 +143,59 @@ public void testElasticPlanExecution() throws IOException String resultPlanJson = objectmapper.writeValueAsString(originalPlan); - //assert that streaming results does not modify the plan + //assert that streaming results does not modify the plan Assert.assertEquals(initialPlanJson, resultPlanJson); } } + + @Test + public void testElasticPlanForFunctionWithMappingAndRuntimePropertyExecution() throws IOException + { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ) + { + SingleExecutionPlan originalPlan = getPlanFromFunctionGrammar("abc::abc::functionWithMappingAndRuntimeProperty__TabularDataSet_1_"); + + //create a deep copy of the original plan that we pass into the execute call + ObjectMapper objectmapper = ObjectMapperFactory.getNewStandardObjectMapperWithPureProtocolExtensionSupports(); + String initialPlanJson = objectmapper.writeValueAsString(originalPlan); + + + //execute plan + TDSResult result = (TDSResult) PlanExecutor.newPlanExecutorBuilder().withAvailableStoreExecutors().build().execute(originalPlan); + + result.stream(outputStream, SerializationFormat.DEFAULT); //this should trigger tryAdvance -- streaming results in batches + + String resultPlanJson = objectmapper.writeValueAsString(originalPlan); + + //assert that streaming results does not modify the plan + Assert.assertEquals(initialPlanJson, resultPlanJson); + } + } + + @Test + public void testElasticPlanForFunctionWithRuntimePropertyExecution() throws IOException + { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ) + { + SingleExecutionPlan originalPlan = getPlanFromFunctionGrammar("abc::abc::functionWithRuntimeProperty__TabularDataSet_1_"); + + //create a deep copy of the original plan that we pass into the execute call + ObjectMapper objectmapper = ObjectMapperFactory.getNewStandardObjectMapperWithPureProtocolExtensionSupports(); + String initialPlanJson = objectmapper.writeValueAsString(originalPlan); + + + //execute plan + TDSResult result = (TDSResult) PlanExecutor.newPlanExecutorBuilder().withAvailableStoreExecutors().build().execute(originalPlan); + + result.stream(outputStream, SerializationFormat.DEFAULT); //this should trigger tryAdvance -- streaming results in batches + + String resultPlanJson = objectmapper.writeValueAsString(originalPlan); + + //assert that streaming results does not modify the plan + Assert.assertEquals(initialPlanJson, resultPlanJson); + } + } } diff --git a/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/resources/grammarForPlanIntegrationTesting.pure b/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/resources/grammarForPlanIntegrationTesting.pure index 3c2b7697bf8..463a4e419f2 100644 --- a/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/resources/grammarForPlanIntegrationTesting.pure +++ b/legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test/src/test/resources/grammarForPlanIntegrationTesting.pure @@ -55,3 +55,30 @@ function abc::abc::indexToTDSGroupByFunction(): TabularDataSet[1] { indexToTDS(abc::abc::Store, 'index1')->from(abc::abc::EmptyMapping, abc::abc::Runtime)->groupBy(['prop1', '_id'], agg('count', r | $r.getString('prop1'), agg | $agg->count())); } + +// Check to see if the fn compiles +function abc::abc::functionWithMappingAndRuntimeParam(mapping: meta::pure::mapping::Mapping[1], runtime: meta::pure::runtime::PackageableRuntime[1]): TabularDataSet[1] +{ + indexToTDS(abc::abc::Store, 'index1')->from( + $mapping, + $runtime + ); +} + +function abc::abc::functionWithMappingAndRuntimeProperty(): TabularDataSet[1] +{ + let mapping = abc::abc::EmptyMapping; + let runtime = abc::abc::Runtime; + indexToTDS(abc::abc::Store, 'index1')->from( + $mapping, + $runtime + ); +} + +function abc::abc::functionWithRuntimeProperty(): TabularDataSet[1] +{ + let runtime = abc::abc::Runtime; + indexToTDS(abc::abc::Store, 'index1')->from( + $runtime + ); +} \ No newline at end of file