Skip to content

Commit

Permalink
Extend implicit handling of runtime vs pkg runtime to the handlers of…
Browse files Browse the repository at this point in the history
… from functions (#2818)

* update from handlers to support PackageableRuntime

* update from handlers to support PackageableRuntime

---------

Co-authored-by: sameer saini <[email protected]>
  • Loading branch information
rafaelbey and sameersaini authored Apr 30, 2024
1 parent 936400d commit 4360b02
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ private void registerTDS()

register(
m(
m(h("meta::pure::mapping::from_T_m__Runtime_1__T_m_", false, ps -> res(ps.get(0)._genericType(), ps.get(0)._multiplicity()), ps -> ps.size() == 2 && typeOne(ps.get(1), Sets.immutable.with("Runtime", "EngineRuntime")))),
m(h("meta::pure::mapping::from_T_m__Runtime_1__T_m_", false, ps -> res(ps.get(0)._genericType(), ps.get(0)._multiplicity()), ps -> ps.size() == 2 && typeOne(ps.get(1), Sets.immutable.with("Runtime", "EngineRuntime", "PackageableRuntime")))),
m(h("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__TabularDataSet_1_", false, ps -> res("meta::pure::tds::TabularDataSet", "one"), ps -> ps.size() == 3 && "TabularDataSet".equals(ps.get(0)._genericType()._rawType()._name())),
h("meta::pure::mapping::from_T_m__Mapping_1__Runtime_1__T_m_", false, ps -> res(ps.get(0)._genericType(), ps.get(0)._multiplicity()), ps -> ps.size() == 3)
),
Expand Down Expand Up @@ -2521,9 +2521,9 @@ private Map<String, Dispatch> buildDispatch()
map.put("meta::pure::graphFetch::execution::serialize_Checked_MANY__RootGraphFetchTree_1__String_1_", (List<ValueSpecification> ps) -> ps.size() == 2 && ("Nil".equals(ps.get(0)._genericType()._rawType()._name()) || "Checked".equals(ps.get(0)._genericType()._rawType()._name())) && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "RootGraphFetchTree", "ExtendedRootGraphFetchTree", "RoutedRootGraphFetchTree", "SerializeTopRootGraphFetchTree").contains(ps.get(1)._genericType()._rawType()._name()));
map.put("meta::pure::graphFetch::execution::serialize_T_MANY__RootGraphFetchTree_1__AlloySerializationConfig_1__String_1_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "RootGraphFetchTree", "ExtendedRootGraphFetchTree", "RoutedRootGraphFetchTree", "SerializeTopRootGraphFetchTree").contains(ps.get(1)._genericType()._rawType()._name()) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "AlloySerializationConfig".equals(ps.get(2)._genericType()._rawType()._name())));
map.put("meta::pure::graphFetch::execution::serialize_T_MANY__RootGraphFetchTree_1__String_1_", (List<ValueSpecification> ps) -> ps.size() == 2 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "RootGraphFetchTree", "ExtendedRootGraphFetchTree", "RoutedRootGraphFetchTree", "SerializeTopRootGraphFetchTree").contains(ps.get(1)._genericType()._rawType()._name()));
map.put("meta::pure::mapping::from_T_m__Mapping_1__Runtime_1__T_m_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && typeOne(ps.get(2), Sets.immutable.with("Runtime", "EngineRuntime")));
map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__ExecutionContext_1__TabularDataSet_1_", (List<ValueSpecification> ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "Runtime".equals(ps.get(2)._genericType()._rawType()._name())) && isOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "ExecutionContext", "ExtendedExecutionContext", "RelationalExecutionContext", "AuthenticationContext", "AnalyticsExecutionContext", "BatchQueryContext", "LatestBatchQueryContext", "DirtyLatestBatchQueryContext", "VectorBatchQueryContext", "WatermarkExecutionContext", "SpecificWatermarkExecutionContext", "RefinerWatermarkExecutionContext").contains(ps.get(3)._genericType()._rawType()._name()));
map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__TabularDataSet_1_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "Runtime".equals(ps.get(2)._genericType()._rawType()._name())));
map.put("meta::pure::mapping::from_T_m__Mapping_1__Runtime_1__T_m_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && typeOne(ps.get(2), Sets.immutable.with("Runtime", "EngineRuntime", "PackageableRuntime")));
map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__ExecutionContext_1__TabularDataSet_1_", (List<ValueSpecification> ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "PackageableRuntime").contains(ps.get(2)._genericType()._rawType()._name()) && isOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "ExecutionContext", "ExtendedExecutionContext", "RelationalExecutionContext", "AuthenticationContext", "AnalyticsExecutionContext", "BatchQueryContext", "LatestBatchQueryContext", "DirtyLatestBatchQueryContext", "VectorBatchQueryContext", "WatermarkExecutionContext", "SpecificWatermarkExecutionContext", "RefinerWatermarkExecutionContext").contains(ps.get(3)._genericType()._rawType()._name()));
map.put("meta::pure::mapping::from_TabularDataSet_1__Mapping_1__Runtime_1__TabularDataSet_1_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "TabularDataSet", "TabularDataSetImplementation", "TableTDS").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Mapping".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "PackageableRuntime").contains(ps.get(2)._genericType()._rawType()._name()));
map.put("meta::core::runtime::mergeRuntimes_Any_$1_MANY$__Runtime_1_", (List<ValueSpecification> ps) -> ps.size() == 1 && matchOneMany(ps.get(0)._multiplicity()));
map.put("meta::core::runtime::getRuntimeWithModelConnection_Class_1__Any_MANY__Runtime_1_", (List<ValueSpecification> ps) -> ps.size() == 2 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Class", "MappingClass", "ClassProjection").contains(ps.get(0)._genericType()._rawType()._name()));
map.put("meta::core::runtime::getRuntimeWithModelQueryConnection_Class_1__Binding_1__Byte_MANY__Runtime_1_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Class", "ClassProjection", "MappingClass").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "Binding".equals(ps.get(1)._genericType()._rawType()._name())) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "Byte".equals(ps.get(2)._genericType()._rawType()._name())));
Expand Down Expand Up @@ -2612,7 +2612,7 @@ private Map<String, Dispatch> buildDispatch()
map.put("meta::pure::functions::date::calendar::wtd_Date_1__String_1__Date_1__Number_$0_1$", (List<ValueSpecification> ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "String".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(2)._genericType()._rawType()._name()) && matchZeroOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "Number", "Integer", "Float", "Decimal").contains(ps.get(3)._genericType()._rawType()._name()));
map.put("meta::pure::functions::date::calendar::ytd_Date_1__String_1__Date_1__Number_$0_1$", (List<ValueSpecification> ps) -> ps.size() == 4 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "String".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && Sets.immutable.with("Nil", "Date", "StrictDate", "DateTime", "LatestDate").contains(ps.get(2)._genericType()._rawType()._name()) && matchZeroOne(ps.get(3)._multiplicity()) && Sets.immutable.with("Nil", "Number", "Integer", "Float", "Decimal").contains(ps.get(3)._genericType()._rawType()._name()));

map.put("meta::pure::mapping::from_T_m__Runtime_1__T_m_", (List<ValueSpecification> ps) -> ps.size() == 2 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "EngineRuntime").contains(ps.get(1)._genericType()._rawType()._name()));
map.put("meta::pure::mapping::from_T_m__Runtime_1__T_m_", (List<ValueSpecification> ps) -> ps.size() == 2 && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "Runtime", "EngineRuntime", "PackageableRuntime").contains(ps.get(1)._genericType()._rawType()._name()));
map.put("meta::pure::functions::relation::filter_Relation_1__Function_1__Relation_1_", (List<ValueSpecification> ps) -> ps.size() == 2 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || check(funcType(ps.get(1)._genericType()), (FunctionType ft) -> isOne(ft._returnMultiplicity()) && ("Nil".equals(ft._returnType()._rawType()._name()) || "Boolean".equals(ft._returnType()._rawType()._name())) && check(ft._parameters().toList(), (List<? extends VariableExpression> nps) -> nps.size() == 1 && isOne(nps.get(0)._multiplicity())))));
map.put("meta::pure::functions::relation::concatenate_Relation_1__Relation_1__Relation_1_", (List<ValueSpecification> ps) -> ps.size() == 2 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(1)._genericType()._rawType()._name()));
map.put("meta::pure::functions::relation::rename_Relation_1__ColSpec_1__ColSpec_1__Relation_1_", (List<ValueSpecification> ps) -> ps.size() == 3 && isOne(ps.get(0)._multiplicity()) && Sets.immutable.with("Nil", "Relation", "RelationElementAccessor", "TDS", "RelationStoreAccessor").contains(ps.get(0)._genericType()._rawType()._name()) && isOne(ps.get(1)._multiplicity()) && ("Nil".equals(ps.get(1)._genericType()._rawType()._name()) || "ColSpec".equals(ps.get(1)._genericType()._rawType()._name())) && isOne(ps.get(2)._multiplicity()) && ("Nil".equals(ps.get(2)._genericType()._rawType()._name()) || "ColSpec".equals(ps.get(2)._genericType()._rawType()._name())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.testcontainers.DockerClientFactory;

Expand Down Expand Up @@ -142,9 +143,59 @@ public void testElasticPlanExecution() throws IOException

String resultPlanJson = objectmapper.writeValueAsString(originalPlan);

//assert that streaming results does not modify the plan
//assert that streaming results does not modify the plan
Assert.assertEquals(initialPlanJson, resultPlanJson);

}
}

@Test
public void testElasticPlanForFunctionWithMappingAndRuntimePropertyExecution() throws IOException
{
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
)
{
SingleExecutionPlan originalPlan = getPlanFromFunctionGrammar("abc::abc::functionWithMappingAndRuntimeProperty__TabularDataSet_1_");

//create a deep copy of the original plan that we pass into the execute call
ObjectMapper objectmapper = ObjectMapperFactory.getNewStandardObjectMapperWithPureProtocolExtensionSupports();
String initialPlanJson = objectmapper.writeValueAsString(originalPlan);


//execute plan
TDSResult result = (TDSResult) PlanExecutor.newPlanExecutorBuilder().withAvailableStoreExecutors().build().execute(originalPlan);

result.stream(outputStream, SerializationFormat.DEFAULT); //this should trigger tryAdvance -- streaming results in batches

String resultPlanJson = objectmapper.writeValueAsString(originalPlan);

//assert that streaming results does not modify the plan
Assert.assertEquals(initialPlanJson, resultPlanJson);
}
}

@Test
public void testElasticPlanForFunctionWithRuntimePropertyExecution() throws IOException
{
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
)
{
SingleExecutionPlan originalPlan = getPlanFromFunctionGrammar("abc::abc::functionWithRuntimeProperty__TabularDataSet_1_");

//create a deep copy of the original plan that we pass into the execute call
ObjectMapper objectmapper = ObjectMapperFactory.getNewStandardObjectMapperWithPureProtocolExtensionSupports();
String initialPlanJson = objectmapper.writeValueAsString(originalPlan);


//execute plan
TDSResult result = (TDSResult) PlanExecutor.newPlanExecutorBuilder().withAvailableStoreExecutors().build().execute(originalPlan);

result.stream(outputStream, SerializationFormat.DEFAULT); //this should trigger tryAdvance -- streaming results in batches

String resultPlanJson = objectmapper.writeValueAsString(originalPlan);

//assert that streaming results does not modify the plan
Assert.assertEquals(initialPlanJson, resultPlanJson);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,30 @@ function abc::abc::indexToTDSGroupByFunction(): TabularDataSet[1]
{
indexToTDS(abc::abc::Store, 'index1')->from(abc::abc::EmptyMapping, abc::abc::Runtime)->groupBy(['prop1', '_id'], agg('count', r | $r.getString('prop1'), agg | $agg->count()));
}

// Check to see if the fn compiles
function abc::abc::functionWithMappingAndRuntimeParam(mapping: meta::pure::mapping::Mapping[1], runtime: meta::pure::runtime::PackageableRuntime[1]): TabularDataSet[1]
{
indexToTDS(abc::abc::Store, 'index1')->from(
$mapping,
$runtime
);
}

function abc::abc::functionWithMappingAndRuntimeProperty(): TabularDataSet[1]
{
let mapping = abc::abc::EmptyMapping;
let runtime = abc::abc::Runtime;
indexToTDS(abc::abc::Store, 'index1')->from(
$mapping,
$runtime
);
}

function abc::abc::functionWithRuntimeProperty(): TabularDataSet[1]
{
let runtime = abc::abc::Runtime;
indexToTDS(abc::abc::Store, 'index1')->from(
$runtime
);
}

0 comments on commit 4360b02

Please sign in to comment.