Skip to content

Commit

Permalink
Elasticsearch - support for complex expression on TDS project/extend
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbey committed Oct 12, 2023
1 parent cc17530 commit 319c13b
Show file tree
Hide file tree
Showing 24 changed files with 1,750 additions and 295 deletions.
29 changes: 27 additions & 2 deletions legend-engine-config/legend-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,31 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-elasticsearch-V7-executionPlan</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-extensions-collection-generation</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-extensions-collection-execution</artifactId>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- ENGINE -->

<!-- Shared Drop Wizard Bundles -->
Expand Down Expand Up @@ -897,12 +922,12 @@
</dependency>
<!-- RUNTIME -->

<!-- TEST -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<scope>runtime</scope>
</dependency>
<!-- TEST -->
<!-- <dependency>-->
<!-- <groupId>org.finos.legend.pure</groupId>-->
<!-- <artifactId>legend-pure</artifactId>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import org.finos.legend.engine.plan.execution.concurrent.ParallelGraphFetchExecutionExecutorPool;
import org.finos.legend.engine.plan.execution.graphFetch.GraphFetchExecutionConfiguration;
import org.finos.legend.engine.plan.execution.service.api.ServiceModelingApi;
import org.finos.legend.engine.plan.execution.stores.elasticsearch.v7.plugin.ElasticsearchV7StoreExecutor;
import org.finos.legend.engine.plan.execution.stores.elasticsearch.v7.plugin.ElasticsearchV7StoreExecutorBuilder;
import org.finos.legend.engine.plan.execution.stores.inMemory.plugin.InMemory;
import org.finos.legend.engine.plan.execution.stores.mongodb.plugin.MongoDBStoreExecutor;
import org.finos.legend.engine.plan.execution.stores.mongodb.plugin.MongoDBStoreExecutorBuilder;
Expand Down Expand Up @@ -270,12 +272,14 @@ public void run(T serverConfiguration, Environment environment)
MongoDBStoreExecutorConfiguration mongoDBExecutorConfiguration = MongoDBStoreExecutorConfiguration.newInstance().withCredentialProviderProvider(credentialProviderProvider).build();
MongoDBStoreExecutor mongoDBStoreExecutor = (MongoDBStoreExecutor) new MongoDBStoreExecutorBuilder().build(mongoDBExecutorConfiguration);

ElasticsearchV7StoreExecutor elasticsearchV7StoreExecutor = (ElasticsearchV7StoreExecutor) new ElasticsearchV7StoreExecutorBuilder().build();

PlanExecutor planExecutor;
ParallelGraphFetchExecutionExecutorPool parallelGraphFetchExecutionExecutorPool = null;
if (serverConfiguration.graphFetchExecutionConfiguration != null)
{
GraphFetchExecutionConfiguration graphFetchExecutionConfiguration = serverConfiguration.graphFetchExecutionConfiguration;
planExecutor = PlanExecutor.newPlanExecutor(graphFetchExecutionConfiguration, relationalStoreExecutor, serviceStoreExecutor, mongoDBStoreExecutor, InMemory.build());
planExecutor = PlanExecutor.newPlanExecutor(graphFetchExecutionConfiguration, relationalStoreExecutor, elasticsearchV7StoreExecutor, serviceStoreExecutor, mongoDBStoreExecutor, InMemory.build());
if (graphFetchExecutionConfiguration.canExecuteInParallel())
{
parallelGraphFetchExecutionExecutorPool = new ParallelGraphFetchExecutionExecutorPool(graphFetchExecutionConfiguration.getParallelGraphFetchExecutionConfig(), "thread-pool for parallel graphFetch execution");
Expand All @@ -284,7 +288,7 @@ public void run(T serverConfiguration, Environment environment)
}
else
{
planExecutor = PlanExecutor.newPlanExecutor(relationalStoreExecutor, serviceStoreExecutor, mongoDBStoreExecutor, InMemory.build());
planExecutor = PlanExecutor.newPlanExecutor(relationalStoreExecutor, elasticsearchV7StoreExecutor, serviceStoreExecutor, mongoDBStoreExecutor, InMemory.build());
}

// Session Management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
package org.finos.legend.engine.plan.execution.result.serialization;

import com.fasterxml.jackson.core.JsonGenerator;
import org.eclipse.collections.impl.block.procedure.checked.ThrowingProcedure2;
import org.finos.legend.engine.plan.dependencies.domain.date.PureDate;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.result.TDSColumn;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Objects;
import org.eclipse.collections.impl.block.procedure.checked.ThrowingProcedure2;
import org.finos.legend.engine.plan.dependencies.domain.date.PureDate;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.result.TDSColumn;

public class TDSColumnWithSerializer<T>
{
Expand Down Expand Up @@ -51,9 +50,16 @@ public TDSColumnWithSerializer(TDSColumn tdsColumn)
return (ThrowingProcedure2<JsonGenerator, Boolean>) JsonGenerator::writeBoolean;
case "Date":
case "DateTime":
return (ThrowingProcedure2<JsonGenerator, PureDate>) (jg, d) -> jg.writeString(d.toInstant().toString());
case "StrictDate":
return (ThrowingProcedure2<JsonGenerator, PureDate>) (jg, d) -> jg.writeString(d.toLocalDate().toString());
return (ThrowingProcedure2<JsonGenerator, PureDate>) (jg, d) ->
{
String formatted = d.toString();
if (d.hasMinute())
{
formatted += "Z";
}
jg.writeString(formatted);
};
default:
throw new UnsupportedOperationException("TDS type not supported: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,60 +500,19 @@ function meta::pure::router::routing::processColumnSpecification(v:ValueSpecific
function meta::pure::router::routing::processColSpecParams(fe:FunctionExpression[1], state:RoutingState[1], executionContext:ExecutionContext[1], vars:Map<VariableExpression, ValueSpecification>[1], inScopeVars:Map<String, List<Any>>[1], extensions:meta::pure::extension::Extension[*], debug:DebugContext[1]):Any[*]
{
let routedFe = ^$state(value=$fe)->routeValueSpecification($executionContext, $vars, $inScopeVars, $extensions, $debug);
let params = $routedFe.value->evaluateAndDeactivate()->cast(@FunctionExpression)->toOne().parametersValues->map(p|$p->byPassRouterInfo()->match([r:FunctionRoutedValueSpecification[1]|$r.originalFunction;, v:ValueSpecification[1]|$v]))->cast(@ValueSpecification);
let newParams = $params->map(p|if($p->instanceOf(SimpleFunctionExpression),| $p->processColumnSpecification($state, $executionContext, $vars, $inScopeVars, $extensions, $debug).value->cast(@ValueSpecification),| $p));
let nfe = ^$fe(parametersValues=$newParams);

// In plan generation flow, not always all variables will be present in inScopeVars as they could be function params (known only at runtime)
// Hence, reactivating with empty lists in such situations (as the expression of lambda is not expected to be reactivated)
$nfe->reactivateWithDummyValuesForOpenVariables($inScopeVars)->evaluateAndDeactivate();
let parametersValues = $routedFe.value->evaluateAndDeactivate()->cast(@FunctionExpression)->toOne().parametersValues->map(x | $x->byPassRouterInfo()->match([
i: InstanceValue[1] | $i.values->match([
// workaround a compile execution behavior that check for open variables and cause expressions to fail reactivation (does not happen on interpreted)
l: LambdaFunction<Any>[1] | ^$i(values = ^$l(openVariables = [])),
a: Any[*] | $i
]),
a: ValueSpecification[*] | $a
]))->cast(@ValueSpecification);

let nfe = ^$fe(parametersValues = $parametersValues);
let colSpec = $nfe->reactivate($inScopeVars)->evaluateAndDeactivate()->cast(@ColumnSpecification<Any>)->toOne();
}

function meta::pure::router::routing::reactivateWithDummyValuesForOpenVariables(vs:ValueSpecification[1], inScopeVars:Map<String, List<Any>>[1]) : Any[*]
{
let lambdas = $vs->extractLambdasFromVS();
let newInScopeVars = $lambdas->fold( {l, vars | $vars->meta::pure::router::routing::addDummyValuesforOpenVariables($l)}, $inScopeVars) ;
$vs->reactivate($newInScopeVars);
}

function meta::pure::router::routing::addDummyValuesforOpenVariables( inScopeVars : Map<String, List<Any>>[1], l :LambdaFunction<Any>[1] ) : Map<String, List<Any>>[1]
{
$l.openVariables->fold({v, vars| if($vars->get($v)->isEmpty(), | $vars-> meta::pure::functions::collection::put($v, list([])); , | $vars) }, $inScopeVars);
}

function meta::pure::router::routing::extractLambdasFromAny(a:Any[1]): LambdaFunction<Any>[*]
{
$a->match([
f:Function<Any>[1] | $f->extractLambdasFromFunction();,
vs: ValueSpecification[1] | $vs->extractLambdasFromVS();,
other: Any[1] | []
]);
}

function meta::pure::router::routing::extractLambdasFromVS(vs:ValueSpecification[1]):LambdaFunction<Any>[*]
{
$vs->match([
e:ExtendedRoutedValueSpecification[1] | $e.value->evaluateAndDeactivate()->extractLambdasFromVS() ,
f:FunctionRoutedValueSpecification[1] | $f.value->evaluateAndDeactivate()->extractLambdasFromVS(),
r:NoSetRoutedValueSpecification[1] | $r.value->evaluateAndDeactivate()->extractLambdasFromVS(),
fe:FunctionExpression[1] | $fe.func->evaluateAndDeactivate()->extractLambdasFromFunction()->concatenate($fe.parametersValues->evaluateAndDeactivate()->map(p| $p->extractLambdasFromVS())),
i:InstanceValue[1] | $i.values->evaluateAndDeactivate()->map(v|$v->extractLambdasFromAny()),
va:VariableExpression[1] | [],
vs:ValueSpecification[1] | []
]);
}

function meta::pure::router::routing::extractLambdasFromFunction(f:Function<Any>[1]): LambdaFunction<Any>[*]
{
$f->match([
p:Property<Nil,Any|*>[1] | [],
nf:NativeFunction<Any>[1] | [],
l: LambdaFunction<Any>[1] | $l,
qp:QualifiedProperty<Any>[1] | $qp.expressionSequence->evaluateAndDeactivate()->map(vs|$vs->extractLambdasFromVS());,
cfd:ConcreteFunctionDefinition<Any>[1] | $cfd.expressionSequence->evaluateAndDeactivate()->map(vs|$vs->extractLambdasFromVS());,
f:Function<Any>[1] | []
]);
}

function meta::pure::router::routing::processAggregationFunctionExpression(aggFuncExpr:FunctionExpression[1], routed:ExtendedRoutedValueSpecification[*], v:RoutingState[1], executionContext:ExecutionContext[1], vars:Map<VariableExpression, ValueSpecification>[1], inScopeVars:Map<String, List<Any>>[1], extensions:meta::pure::extension::Extension[*], debug:DebugContext[1]):RoutingState[1]
{
Expand Down Expand Up @@ -679,6 +638,9 @@ function meta::pure::router::routing::shouldStopFunctions(extensions:meta::pure:
groupBy_K_MANY__Function_MANY__AggregateValue_MANY__String_MANY__TabularDataSet_1_,
groupBy_TabularDataSet_1__String_MANY__AggregateValue_MANY__TabularDataSet_1_,
distinct_TabularDataSet_1__TabularDataSet_1_,
month_Date_1__Month_1_,
quarter_Date_1__Quarter_1_,
dayOfWeek_Date_1__DayOfWeek_1_,
mostRecentDayOfWeek_DayOfWeek_1__Date_1_,
mostRecentDayOfWeek_Date_1__DayOfWeek_1__Date_1_,
previousDayOfWeek_DayOfWeek_1__Date_1_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ function <<access.private>> meta::pure::tds::multipleColumnComp(row:TDSRow[1], o
if ($sortInfo->isEmpty(),
| 0,
| let s = $sortInfo->head();
let res = $row.get($s.column->toOne())->compare($other.get($s.column->toOne()));
let res = if($row.isNull($s.column->toOne()), |^TDSNull(), |$row.get($s.column->toOne()))
->compare(if($other.isNull($s.column->toOne()), |^TDSNull(), |$other.get($s.column->toOne())));
if ($res == 0, |$row->multipleColumnComp($other, $sortInfo->tail()), |if ($s.direction == SortDirection.ASC, |$res,|-$res));
);
}
Expand Down Expand Up @@ -541,7 +542,15 @@ function

//todo: remove this by making parent an association
$newTds->mutateAdd('rows', $tds.rows->map(r | ^TDSRow(parent=$newTds,
values=$r.values->concatenate($newColumnFunctions.func->map(f | $f->evaluate(^List<TDSRow>(values=[$r])))))));
values=$r.values->concatenate(
$newColumnFunctions.func->map(f |
let value = $f->evaluate(^List<TDSRow>(values=[$r]));
if($value->isEmpty(), |^TDSNull(), |$value);
)
)
)
)
);

$newTds;
}
Expand All @@ -559,7 +568,13 @@ function

//todo: remove this by making parent an association
$newTds->mutateAdd('rows', $tds.rows->map(r | ^TDSRow(parent=$newTds,
values=$newColumnFunctions.func->map(f | $f->evaluate(^List<TDSRow>(values=[$r]))))));
values=$newColumnFunctions.func->map(f |
let value = $f->evaluate(^List<TDSRow>(values=[$r]));
if($value->isEmpty(), |^TDSNull(), |$value);
)
)
)
);

$newTds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ function meta::external::store::elasticsearch::v7::contract::supports(): Functio
{
let supportedFunctions = meta::external::store::elasticsearch::v7::pureToEs::supportedRoutingFunctions().first;

{f:FunctionExpression[1]| $supportedFunctions->exists(x | $x->eval($f.func))};
{f:FunctionExpression[1]| $supportedFunctions->exists(x | $x->eval($f.func, []))};
}

function meta::external::store::elasticsearch::v7::contract::shouldStopPreeval(): Function<{Any[*]->Boolean[1]}>[1]
Expand Down
Loading

0 comments on commit 319c13b

Please sign in to comment.