Skip to content

Commit

Permalink
datacube: fix persistent query
Browse files Browse the repository at this point in the history
  • Loading branch information
akphi committed Dec 20, 2024
1 parent 7419a9b commit 90971b8
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@

public interface LegendInterface
{
PureModelContextData parse(String txt);
default PureModelContextData parse(String txt)
{
return this.parse(txt, true);
}

PureModelContextData parse(String txt, boolean returnSourceInformation);

String render(PureModelContextData model);

PureModel compile(PureModelContextData model);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

package org.finos.legend.engine.repl.core.legend;

import java.util.concurrent.ForkJoinPool;
import org.eclipse.collections.api.RichIterable;
import org.eclipse.collections.api.tuple.Pair;
import org.finos.legend.engine.language.pure.compiler.Compiler;
import org.finos.legend.engine.language.pure.compiler.toPureGraph.PureModel;
import org.finos.legend.engine.language.pure.compiler.toPureGraph.PureModelProcessParameter;
import org.finos.legend.engine.language.pure.grammar.from.PureGrammarParser;
import org.finos.legend.engine.language.pure.grammar.to.PureGrammarComposer;
import org.finos.legend.engine.language.pure.grammar.to.PureGrammarComposerContext;
import org.finos.legend.engine.plan.generation.PlanGenerator;
import org.finos.legend.engine.plan.platform.PlanPlatform;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
Expand All @@ -31,6 +32,7 @@
import org.finos.legend.pure.generated.Root_meta_pure_extension_Extension;

import java.net.URL;
import java.util.concurrent.ForkJoinPool;

import static org.finos.legend.engine.repl.shared.ExecutionHelper.REPL_RUN_FUNCTION_QUALIFIED_PATH;

Expand All @@ -39,7 +41,7 @@ public class LocalLegendInterface implements LegendInterface
private final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

@Override
public PureModelContextData parse(String txt)
public PureModelContextData parse(String txt, boolean returnSourceInformation)
{
// txt = "#>{a::DB.test}#->filter(t|$t.name->startsWith('Dr'))->meta::pure::mapping::from(^meta::core::runtime::Runtime\n" +
// " (\n" +
Expand All @@ -52,7 +54,7 @@ public PureModelContextData parse(String txt)
// " )\n" +
// " )\n" +
// " )";
return PureGrammarParser.newInstance().parseModel(txt);
return PureGrammarParser.newInstance().parseModel(txt, returnSourceInformation);
//
// "" +
// "###Runtime\n" +
Expand All @@ -78,6 +80,12 @@ public PureModelContextData parse(String txt)
// "function a::b::c::d():Any[*]{"+txt+"}");
}

@Override
public String render(PureModelContextData model)
{
return PureGrammarComposer.newInstance(PureGrammarComposerContext.Builder.newInstance().build()).renderPureModelContextData(model);
}

@Override
public PureModel compile(PureModelContextData pureModelContextData)
{
Expand All @@ -87,7 +95,7 @@ public PureModel compile(PureModelContextData pureModelContextData)
@Override
public Root_meta_pure_executionPlan_ExecutionPlan generatePlan(PureModel pureModel, boolean debug)
{
RichIterable<? extends Root_meta_pure_extension_Extension> extensions = PureCoreExtensionLoader.extensions().flatCollect(e -> e.extraPureCoreExtensions(pureModel.getExecutionSupport()));
RichIterable<? extends Root_meta_pure_extension_Extension> extensions = PureCoreExtensionLoader.extensions().flatCollect(e -> e.extraPureCoreExtensions(pureModel.getExecutionSupport()));
Pair<Root_meta_pure_executionPlan_ExecutionPlan, String> res = PlanGenerator.generateExecutionPlanAsPure(pureModel.getConcreteFunctionDefinition_safe(REPL_RUN_FUNCTION_QUALIFIED_PATH), null, pureModel, PlanPlatform.JAVA, "", debug, extensions);
if (debug)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,31 @@
import org.finos.legend.engine.plan.execution.PlanExecutor;
import org.finos.legend.engine.plan.execution.result.builder.tds.TDSBuilder;
import org.finos.legend.engine.plan.execution.stores.relational.result.RelationalResult;
import org.finos.legend.engine.protocol.pure.v1.model.context.EngineErrorType;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.SQLExecutionNode;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.Connection;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.ConnectionPointer;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.PackageableConnection;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.domain.Function;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.PackageableRuntime;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.section.SectionIndex;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Database;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Schema;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Table;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.ValueSpecification;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.application.AppliedFunction;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.ClassInstance;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.packageableElement.PackageableElementPtr;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.classInstance.relation.ColSpec;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.classInstance.relation.ColSpecArray;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.classInstance.relation.RelationStoreAccessor;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.packageableElement.PackageableElementPtr;
import org.finos.legend.engine.repl.client.Client;
import org.finos.legend.engine.repl.core.legend.LegendInterface;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeQuery;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeQueryColumn;
import org.finos.legend.engine.repl.shared.ExecutionHelper;
import org.finos.legend.engine.shared.core.operational.errorManagement.EngineException;
import org.finos.legend.pure.m3.coreinstance.meta.pure.metamodel.relation.RelationType;
import org.finos.legend.pure.m3.navigation.M3Paths;

Expand All @@ -61,11 +70,20 @@
public class REPLServerHelpers
{
public static void handleResponse(HttpExchange exchange, int responseCode, String response, REPLServerState state)
{
handleResponse(exchange, responseCode, response, state, "application/json");
}

public static void handleResponse(HttpExchange exchange, int responseCode, String response, REPLServerState state, String contentType)
{
try
{
OutputStream os = exchange.getResponseBody();
byte[] byteResponse = response != null ? response.getBytes(StandardCharsets.UTF_8) : new byte[0];
if (contentType != null)
{
exchange.getResponseHeaders().add("Content-Type", contentType);
}
exchange.sendResponseHeaders(responseCode, byteResponse.length);
os.write(byteResponse);
os.close();
Expand Down Expand Up @@ -106,8 +124,8 @@ private void initialize(PureModelContextData pureModelContextData, List<DataCube
// remove any usage of multiple from(), only add one to the end
// TODO: we might need to account for other variants of ->from(), such as when mapping is specified
Function function = (Function) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)).getFirst();
String runtime = null;
String mapping = null;
String runtimePath = null;
String mappingPath = null;
Deque<AppliedFunction> fns = new LinkedList<>();
ValueSpecification currentExpression = function.body.get(0);
while (currentExpression instanceof AppliedFunction)
Expand All @@ -119,23 +137,23 @@ private void initialize(PureModelContextData pureModelContextData, List<DataCube
{
// TODO: verify the type of the element (i.e. Runtime)
String newRuntime = ((PackageableElementPtr) fn.parameters.get(1)).fullPath;
if (runtime != null && !runtime.equals(newRuntime))
if (runtimePath != null && !runtimePath.equals(newRuntime))
{
throw new RuntimeException("Can't launch DataCube. Source query contains multiple different ->from(), only one is expected");
}
runtime = newRuntime;
runtimePath = newRuntime;
}
else if (fn.parameters.size() == 3)
{
// TODO: verify the type of the element (i.e. Mapping & Runtime)
String newMapping = ((PackageableElementPtr) fn.parameters.get(1)).fullPath;
String newRuntime = ((PackageableElementPtr) fn.parameters.get(2)).fullPath;
if ((mapping != null && !mapping.equals(newMapping)) || (runtime != null && !runtime.equals(newRuntime)))
if ((mappingPath != null && !mappingPath.equals(newMapping)) || (runtimePath != null && !runtimePath.equals(newRuntime)))
{
throw new RuntimeException("Can't launch DataCube. Source query contains multiple different ->from(), only one is expected");
}
mapping = newMapping;
runtime = newRuntime;
mappingPath = newMapping;
runtimePath = newRuntime;
}
}
else
Expand All @@ -149,24 +167,88 @@ else if (fn.parameters.size() == 3)
fn.parameters.set(0, currentExpression);
currentExpression = fn;
}
Connection connection = null;
if (runtime != null)

// Build the minimal PMCD needed to persist to run the query
// NOTE: the ONLY use case we want to support right now is when user uses a single DB with relation store accessor
// with a single connection in a single runtime, no mapping, no join, etc.
// Those cases would be too complex to handle and result in too big of a PMCD to persist.
PureModelContextData model = null;
PackageableRuntime runtime = null;
if (runtimePath != null)
{
String _runtimePath = runtimePath;
runtime = (PackageableRuntime) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(_runtimePath)).getOnly();
}
Database database = null;
if (currentExpression instanceof ClassInstance && ((ClassInstance) currentExpression).value instanceof RelationStoreAccessor)
{
String _runtime = runtime;
PackageableRuntime rt = (PackageableRuntime) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(_runtime)).getFirst();
RelationStoreAccessor accessor = (RelationStoreAccessor) ((ClassInstance) currentExpression).value;

if (accessor.path.size() <= 1)
{
throw new EngineException("Error in the accessor definition. Please provide a table.", accessor.sourceInformation, EngineErrorType.COMPILATION);
}
String schemaName = (accessor.path.size() == 3) ? accessor.path.get(1) : "default";
String tableName = (accessor.path.size() == 3) ? accessor.path.get(2) : accessor.path.get(1);

// clone the database, only extract the schema and table that we need
Database _database = (Database) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(accessor.path.get(0))).getOnly();
Schema _schema = ListIterate.select(_database.schemas, s -> s.name.equals(schemaName)).getOnly();
Table _table = ListIterate.select(_schema.tables, t -> t.name.equals(tableName)).getOnly();
database = new Database();
database.name = _database.name;
database._package = _database._package;
Schema schema = new Schema();
schema.name = _schema.name;
Table table = new Table();
table.name = _table.name;
table.columns = _table.columns;
schema.tables = Lists.mutable.with(table);
database.schemas = Lists.mutable.with(schema);
}
PackageableConnection connection = null;
if (runtimePath != null)
{
String _runtime = runtimePath;
PackageableRuntime rt = (PackageableRuntime) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(_runtime)).getOnly();
if (rt != null && rt.runtimeValue.connections.size() == 1 && rt.runtimeValue.connections.get(0).storeConnections.size() == 1)
{
connection = rt.runtimeValue.connections.get(0).storeConnections.get(0).connection;
Connection conn = rt.runtimeValue.connections.get(0).storeConnections.get(0).connection;
if (conn instanceof ConnectionPointer)
{
connection = (PackageableConnection) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(((ConnectionPointer) conn).connection)).getOnly();
}
}
}
// The only case we want to support persisting the model is when we have a single DB and connection
if (database != null && connection != null && runtime != null && mappingPath == null)
{
model = PureModelContextData.newBuilder()
.withSerializer(pureModelContextData.serializer)
.withElements(Lists.mutable.with(database, connection, runtime))
.build();
try
{
this.legendInterface.compile(model);
model = this.legendInterface.parse(this.legendInterface.render(model), false);
model = PureModelContextData.newBuilder().withSerializer(model.serializer).withElements(ListIterate.reject(model.getElements(), el -> el instanceof SectionIndex)).build();
}
catch (Exception e)
{
this.client.printDebug("Error while compiling persistent model: " + e.getMessage());
// something was wrong with the assembled model, reset it
model = null;
}
}

Map<String, Object> source = Maps.mutable.empty();
source.put("_type", "repl");
source.put("timestamp", this.startTime);
source.put("query", currentExpression.accept(DEPRECATED_PureGrammarComposerCore.Builder.newInstance().build()));
source.put("runtime", runtime);
source.put("mapping", mapping);
source.put("runtime", runtimePath);
source.put("mapping", mappingPath);
source.put("columns", columns);
source.put("connection", connection);
source.put("model", model);
this.source = source;

// -------------------- QUERY --------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public HttpHandler getHandler(REPLServerState state)
BufferedReader bufferReader = new BufferedReader(inputStreamReader);
String requestBody = bufferReader.lines().collect(Collectors.joining());
DataCubeGetValueSpecificationCodeInput input = state.objectMapper.readValue(requestBody, DataCubeGetValueSpecificationCodeInput.class);
handleResponse(exchange, 200, DataCubeHelpers.getQueryCode(input.value, input.pretty), state);
handleResponse(exchange, 200, DataCubeHelpers.getQueryCode(input.value, input.pretty), state, "text/plain");
}
catch (Exception e)
{
Expand Down

0 comments on commit 90971b8

Please sign in to comment.