Skip to content

Commit

Permalink
Route ValueSpecification rather than FunctionExpression
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbey authored and aormerod-gs committed Nov 20, 2024
1 parent a9292a2 commit c521872
Show file tree
Hide file tree
Showing 33 changed files with 210 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ function meta::pure::graphFetch::executionPlan::nodeIsMerged(node:LocalGraphFetc
function meta::pure::graphFetch::executionPlan::planGraphFetchExecution(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], extensions: Extension[*], debug: DebugContext[1]): ExecutionNode[1]
{
print(if($debug.debug,|$debug.space+'>Generating Graph Fetch Execution Plan:\n',|''));
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
let resultSize = $fe.parametersValues->evaluateAndDeactivate()->at(0)->match([rvs:ExtendedRoutedValueSpecification[1] | $rvs.value.multiplicity->toOne(),
vs: ValueSpecification[1] | $fe.multiplicity->toOne()]);
assert($fe.func->in(graphFetchFunctions()) || $fe->isUnionOnGraphFetch(true));
Expand Down Expand Up @@ -194,7 +194,7 @@ function meta::pure::graphFetch::executionPlan::enrichTargetTreeForFilterPropert

function meta::pure::graphFetch::executionPlan::planStoreUnionGraphFetchExecution(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], sets: InstanceSetImplementation[*], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], extensions: meta::pure::extension::Extension[*], debug: DebugContext[1]): ExecutionNode[1]
{
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
let params = $fe.parametersValues->evaluateAndDeactivate();
let p1 = $params->at(0);
let p2 = $params->at(1)->cast(@InstanceValue);
Expand All @@ -214,12 +214,12 @@ function meta::pure::graphFetch::executionPlan::planStoreUnionGraphFetchExecutio
)->evaluateAndDeactivate()
}, $rf->eval($sets->at(0)));

planRouterUnionGraphFetchExecution(^$sq(fe = $newFe), $ext, $mapping, $runtime, $exeCtx, $extensions, $debug);
planRouterUnionGraphFetchExecution(^$sq(vs = $newFe), $ext, $mapping, $runtime, $exeCtx, $extensions, $debug);
}

function meta::pure::graphFetch::executionPlan::planRouterUnionGraphFetchExecution(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], extensions: Extension[*], debug: DebugContext[1]): ExecutionNode[1]
{
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
assert($fe->isUnionOnGraphFetch(true), | 'Non union graphFetch function encountered');

let subClusters = $fe.parametersValues->evaluateAndDeactivate()->map({p | $p->meta::pure::router::clustering::cluster($mapping, $runtime, $sq.inScopeVars, $exeCtx, $extensions, $debug).cluster->evaluateAndDeactivate()})->cast(@meta::pure::router::metamodel::clustering::ClusteredValueSpecification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,11 @@ function <<access.private>> meta::pure::lineage::scanProject::scanProjectRecursi
);
),
pair('meta::pure::tds::project_T_MANY__ColumnSpecification_MANY__TabularDataSet_1_',
|let ins = $fe.parametersValues->at(1)->evaluateAndDeactivate();
$ins->match(
[
i:InstanceValue[1] |
let cols = $i.values->evaluateAndDeactivate()->cast(@SimpleFunctionExpression);
let colSpecs = $cols->map(c | $c->reactivate($vars)->cast(@BasicColumnSpecification<Any>));
^Project(projectfuncEntryPoint = $fe,
columns = zip($colSpecs.name, $colSpecs.func->cast(@FunctionDefinition<Any>)));,
a:Any[*]|[]
]
);

),
|let cols = $fe.parametersValues->at(1)->evaluateAndDeactivate();
let columns = $cols->map(c|$c->meta::pure::lineage::scanProject::extractCol($vars));
^Project(projectfuncEntryPoint = $fe,
columns = zip($columns.name, $columns.func->cast(@FunctionDefinition<Any>)));
),
pair('meta::pure::tds::groupBy_K_MANY__Function_MANY__AggregateValue_MANY__String_MANY__TabularDataSet_1_',
| let funcs = $fe.parametersValues->at(1)->match([s:InstanceValue[1]|$s.values, a:Any[*]|^Unknown()]);
let aggregateFuncs = $fe.parametersValues->at(2)->scanAggregateValue();
Expand Down Expand Up @@ -109,6 +101,18 @@ function <<access.private>> meta::pure::lineage::scanProject::scanProjectRecursi
);
}

function <<access.private>> meta::pure::lineage::scanProject::extractCol(vs:ValueSpecification[1],vars:Map<String, List<Any>>[1]):BasicColumnSpecification<Any>[*]
{
$vs->match(
[
i:InstanceValue[1] | let cols = $i.values->evaluateAndDeactivate()->cast(@SimpleFunctionExpression);
let colSpecs = $cols->map(c | $c->reactivate($vars)->cast(@BasicColumnSpecification<Any>));,
s:SimpleFunctionExpression[1] | $s->reactivate($vars)->cast(@BasicColumnSpecification<Any>);,
a:Any[*]| [] ;
]
);
}

function <<access.private>> meta::pure::lineage::scanProject::scanRootGraphFetchTree(root:meta::pure::graphFetch::RootGraphFetchTree<Any>[1]):Pair<String, FunctionDefinition<Any>>[*]
{
let rootGetAllExpression = meta::pure::lineage::scanProject::createGetAllApplicationForRootGraphFetchTree($root);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Class meta::pure::mapping::MappingInstanceData

Class meta::pure::mapping::StoreQuery
{
fe : FunctionExpression[1];
vs : ValueSpecification[1];
inScopeVars : Map<String, List<Any>>[1];
store : Store[1];
//sets : Pair<String, List<Any>>[*]; //inputs to either cross store ( query )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ Class meta::pure::mapping::modelToModel::graphFetch::executionPlan::InMemoryCros

function meta::pure::mapping::modelToModel::graphFetch::executionPlan::planInMemoryGraphFetchExecution(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], extensions: Extension[*], debug: DebugContext[1]): ExecutionNode[1]
{
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
if ($fe->meta::pure::router::utils::isUnionOnGraphFetch(true),
| planRouterUnionGraphFetchExecution($sq, $ext, $mapping->toOne(), $runtime->toOne(), $exeCtx, $extensions, $debug),
| assert($fe.func->in(graphFetchFunctions()), | 'Non graphFetch function encountered');
Expand All @@ -270,7 +270,7 @@ function meta::pure::mapping::modelToModel::graphFetch::executionPlan::planInMem
function meta::pure::mapping::modelToModel::graphFetch::executionPlan::planRootGraphFetchExecutionInMemory(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], clusteredTree: StoreMappingClusteredGraphFetchTree[1], orderedPaths: String[*], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], enableConstraints: Boolean[1], checked: Boolean[1], extensions: meta::pure::extension::Extension[*], debug: DebugContext[1]): InMemoryRootGraphFetchExecutionNode[1]
{
let store = $sq.store;
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
let rootTree = $clusteredTree->byPassClusteringInfo()->cast(@RoutedRootGraphFetchTree<Any>);
let batchSize = if($fe.func == graphFetch_T_MANY__RootGraphFetchTree_1__Integer_1__T_MANY_ || $fe.func == graphFetchChecked_T_MANY__RootGraphFetchTree_1__Integer_1__Checked_MANY_,
| $fe->instanceValuesAtParameter(2, $sq.inScopeVars)->toOne()->cast(@Integer),
Expand Down Expand Up @@ -582,7 +582,7 @@ function meta::pure::mapping::modelToModel::graphFetch::executionPlan::generateP
function <<access.private>> meta::pure::mapping::modelToModel::graphFetch::executionPlan::planInMemoryStoreMergeGraphFetchExecution(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], sets: InstanceSetImplementation[*], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], extensions: meta::pure::extension::Extension[*], debug: DebugContext[1]): ExecutionNode[1]
{
$mapping->meta::pure::mapping::modelToModel::validateMergeMapping();
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
let params = $fe.parametersValues->evaluateAndDeactivate();
let p1 = $params->at(0);
let p2 = $params->at(1)->cast(@InstanceValue);
Expand All @@ -603,7 +603,7 @@ function <<access.private>> meta::pure::mapping::modelToModel::graphFetch::execu
values = $sets->map(set | $rf->eval($set))->evaluateAndDeactivate())
)->evaluateAndDeactivate();

planInMemoryRouterMergeGraphFetchExecution(^$sq(fe = $newFe), $ext, $mapping, $runtime, $exeCtx, $extensions, $debug,$sq,$tree);
planInMemoryRouterMergeGraphFetchExecution(^$sq(vs = $newFe), $ext, $mapping, $runtime, $exeCtx, $extensions, $debug,$sq,$tree);
}
function <<access.protected>> meta::pure::mapping::modelToModel::validateMergeMapping(mapping:Mapping[1]):Boolean[1]
{
Expand All @@ -621,7 +621,7 @@ function <<access.protected>> meta::pure::mapping::modelToModel::validateMergeMa

function <<access.private>> meta::pure::mapping::modelToModel::graphFetch::executionPlan::planInMemoryRouterMergeGraphFetchExecution(sq: StoreQuery[1], ext: RoutedValueSpecification[0..1], mapping: Mapping[1], runtime: Runtime[1], exeCtx: meta::pure::runtime::ExecutionContext[1], extensions: Extension[*], debug: DebugContext[1],rootSQ:StoreQuery[1],rootTree:ClusteredGraphFetchTree[1]): ExecutionNode[1]
{
let fe = $sq.fe->evaluateAndDeactivate();
let fe = $sq.vs->evaluateAndDeactivate()->cast(@FunctionExpression);
let subClusters = $fe.parametersValues->at(0)->evaluateAndDeactivate()->cast(@InstanceValue).values->cast(@ValueSpecification)->map({p | $p->cluster($mapping, $runtime, $sq.inScopeVars, $exeCtx, $extensions, $debug).cluster->evaluateAndDeactivate()})->cast(@StoreMappingClusteredValueSpecification);

let firstCluster = $subClusters->at(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ function meta::pure::platform::executionPlan::generation::processValueSpecificat
f:SimpleFunctionExpression[1] | $f->getFunctionProcessor($extensions)->eval($f, $state, $extensions, $debug),
c:ClusteredValueSpecification[1] | $c->plan($state.inScopeVars, $state.exeCtx, $extensions, $debug),
e:ExtendedRoutedValueSpecification[1] | $e.value->processValueSpecification($state, $extensions, $debug),
i:InstanceValue[1] | [],
i:InstanceValue[1] | $i->processInstanceValue($state, $extensions, $debug),
v:VariableExpression[1] | $v->processVariableExpression($state, $extensions, $debug),
v:ValueSpecification[1] | []
]);
}
Expand All @@ -58,7 +59,29 @@ function <<access.private>> meta::pure::platform::executionPlan::generation::def
requiredVariableInputs = $variableExpressionInputs->concatenate($varPlaceHolderInputs),
resultSizeRange = $fe.multiplicity,
executionNodes = $children,
fromCluster = generatePlatformClusterForFunction($fe, $state.inScopeVars)
fromCluster = generatePlatformCluster($fe, $state.inScopeVars)
);
}

function <<access.private>> meta::pure::platform::executionPlan::generation::processVariableExpression(var:VariableExpression[1], state:PlatformPlanGenerationState[1], extensions : Extension[*], debug:DebugContext[1]):ExecutionNode[1]
{
^VariableResolutionExecutionNode
(
varName = $var.name->toOne(),
resultType = ^ResultType(type=if($var.genericType.rawType->isEmpty(),|Any,|$var.genericType.rawType->toOne())),
resultSizeRange = $var.multiplicity,
fromCluster = generatePlatformCluster($var, $state.inScopeVars)
);
}

function <<access.private>> meta::pure::platform::executionPlan::generation::processInstanceValue(iv:InstanceValue[1], state:PlatformPlanGenerationState[1], extensions : Extension[*], debug:DebugContext[1]):ExecutionNode[1]
{
^PureExpressionPlatformExecutionNode
(
expression = $iv,
resultType = ^ResultType(type=if($iv.genericType.rawType->isEmpty(),|Any,|$iv.genericType.rawType->toOne())),
resultSizeRange = $iv.multiplicity,
fromCluster = generatePlatformCluster($iv, $state.inScopeVars)
);
}

Expand All @@ -76,7 +99,7 @@ function <<access.private>> meta::pure::platform::executionPlan::generation::ser
expression = ^$fe(parametersValues=$substitute->concatenate($params->tail())),
resultType = ^ResultType(type=$fe.genericType.rawType->toOne()),
executionNodes = $children,
fromCluster = generatePlatformClusterForFunction($fe, $state.inScopeVars)
fromCluster = generatePlatformCluster($fe, $state.inScopeVars)
);
}

Expand All @@ -92,7 +115,7 @@ function <<access.private>> meta::pure::platform::executionPlan::generation::uni
resultType = ^ResultType(type=$fe.genericType.rawType->toOne()),
isChildrenExecutionParallelizable = true,
executionNodes = $updatedChildren,
fromCluster = generatePlatformClusterForFunction($fe, $state.inScopeVars)
fromCluster = generatePlatformCluster($fe, $state.inScopeVars)
);
}

Expand All @@ -119,13 +142,13 @@ function <<access.private>> meta::pure::platform::executionPlan::generation::let
);
}

function <<access.private>> meta::pure::platform::executionPlan::generation::generatePlatformClusterForFunction(fe:FunctionExpression[1], inScopeVars : Map<String, List<Any>>[1]):PlatformClusteredValueSpecification[1]
function <<access.private>> meta::pure::platform::executionPlan::generation::generatePlatformCluster(vs:ValueSpecification[1], inScopeVars : Map<String, List<Any>>[1]):PlatformClusteredValueSpecification[1]
{
^PlatformClusteredValueSpecification(
val = $fe,
val = $vs,
executable=true,
multiplicity = $fe.multiplicity,
genericType = $fe.genericType,
multiplicity = $vs.multiplicity,
genericType = $vs.genericType,
openVars = $inScopeVars
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,27 @@ import meta::pure::router::utils::*;
import meta::core::runtime::*;
function meta::pure::router::clustering::generateExecutionNodeFromCluster(cluster:ClusteredValueSpecification[1], inScopeVars:Map<String, List<Any>>[1], context:meta::pure::runtime::ExecutionContext[1], extensions:meta::pure::extension::Extension[*], debugContext:DebugContext[1]):ExecutionNode[1]
{
let fe = $cluster.val->byPassValueSpecificationWrapper()->cast(@FunctionExpression);
let vs = $cluster.val->byPassValueSpecificationWrapper();

$cluster->match([
sc:StoreMappingClusteredValueSpecification[1] |
//This is required to populate the setup sql for test connections where csv data has been supplied. It is done here to ensure that the plan generation is always
//the only place where sql is generated for this case.
$sc->buildExecutionNodeForStoreClusteredVS($fe, $sc.mapping, $inScopeVars, $context, $extensions, $debugContext),
$sc->buildExecutionNodeForStoreClusteredVS($vs, $sc.mapping, $inScopeVars, $context, $extensions, $debugContext),
sc:StoreClusteredValueSpecification[1] |
//This is required to populate the setup sql for test connections where csv data has been supplied. It is done here to ensure that the plan generation is always
//the only place where sql is generated for this case.
$sc->buildExecutionNodeForStoreClusteredVS($fe, [], $inScopeVars, $context, $extensions, $debugContext),
$sc->buildExecutionNodeForStoreClusteredVS($vs, [], $inScopeVars, $context, $extensions, $debugContext),
ef:ExternalFormatClusteredValueSpecification[1] |
let state = ^meta::external::format::shared::executionPlan::ExternalFormatPlanGenerationState(inScopeVars = $inScopeVars, exeCtx = $ef.exeCtx->toOne(), binding = $ef.binding);
$fe->meta::external::format::shared::executionPlan::processValueSpecification($state, $extensions, $debugContext)->toOne();,
$vs->meta::external::format::shared::executionPlan::processValueSpecification($state, $extensions, $debugContext)->toOne();,
pl:PlatformClusteredValueSpecification[1] |
let state = ^meta::pure::platform::executionPlan::generation::PlatformPlanGenerationState(inScopeVars = $inScopeVars, exeCtx = $pl.exeCtx->toOne());
$fe->meta::pure::platform::executionPlan::generation::processValueSpecification($state, $extensions, $debugContext)->toOne();
$vs->meta::pure::platform::executionPlan::generation::processValueSpecification($state, $extensions, $debugContext)->toOne();
]);
}

function meta::pure::router::clustering::buildExecutionNodeForStoreClusteredVS(sc:StoreClusteredValueSpecification[1], fe:FunctionExpression[1], mapping:meta::pure::mapping::Mapping[0..1], inScopeVars:Map<String, List<Any>>[1], context:meta::pure::runtime::ExecutionContext[1], extensions:meta::pure::extension::Extension[*], debugContext:DebugContext[1]):ExecutionNode[1]
function meta::pure::router::clustering::buildExecutionNodeForStoreClusteredVS(sc:StoreClusteredValueSpecification[1], vs:ValueSpecification[1], mapping:meta::pure::mapping::Mapping[0..1], inScopeVars:Map<String, List<Any>>[1], context:meta::pure::runtime::ExecutionContext[1], extensions:meta::pure::extension::Extension[*], debugContext:DebugContext[1]):ExecutionNode[1]
{
//This is required to populate the setup sql for test connections where csv data has been supplied. It is done here to ensure that the plan generation is always
//the only place where sql is generated for this case.
Expand All @@ -74,7 +74,7 @@ function meta::pure::router::clustering::buildExecutionNodeForStoreClusteredVS(s
),
r:Runtime[0..1] | $r
]);
let query = ^meta::pure::mapping::StoreQuery(store=$sc.store, fe=$fe, inScopeVars=$inScopeVars);
let query = ^meta::pure::mapping::StoreQuery(store=$sc.store, vs=$vs, inScopeVars=$inScopeVars);
let res = $sc.s.planExecution->toOne()->eval($query, $sc.val->match([r:RoutedValueSpecification[1]|$r, a:Any[*]|[]])->cast(@RoutedValueSpecification), $mapping, $rt, if ($sc.exeCtx->isEmpty(), | $context, | $sc.exeCtx->toOne()), $extensions, $debugContext);
^$res(fromCluster=$sc);
}
Expand Down
Loading

0 comments on commit c521872

Please sign in to comment.