From 90971b8c04215f7ec9aa057126e11dd0e5aaf8e6 Mon Sep 17 00:00:00 2001 From: An Phi Date: Fri, 20 Dec 2024 01:45:00 -0500 Subject: [PATCH] datacube: fix persistent query --- .../repl/core/legend/LegendInterface.java | 9 +- .../core/legend/LocalLegendInterface.java | 16 ++- .../dataCube/server/REPLServerHelpers.java | 114 +++++++++++++++--- .../server/handler/DataCubeQueryBuilder.java | 2 +- 4 files changed, 119 insertions(+), 22 deletions(-) diff --git a/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LegendInterface.java b/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LegendInterface.java index af06c5f5aad..d02cd09c75f 100644 --- a/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LegendInterface.java +++ b/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LegendInterface.java @@ -22,7 +22,14 @@ public interface LegendInterface { - PureModelContextData parse(String txt); + default PureModelContextData parse(String txt) + { + return this.parse(txt, true); + } + + PureModelContextData parse(String txt, boolean returnSourceInformation); + + String render(PureModelContextData model); PureModel compile(PureModelContextData model); diff --git a/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LocalLegendInterface.java b/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LocalLegendInterface.java index 6fcd5667ea8..fcf844e97ce 100644 --- a/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LocalLegendInterface.java +++ b/legend-engine-config/legend-engine-repl/legend-engine-repl-client/src/main/java/org/finos/legend/engine/repl/core/legend/LocalLegendInterface.java @@ -14,13 +14,14 @@ package org.finos.legend.engine.repl.core.legend; -import java.util.concurrent.ForkJoinPool; import org.eclipse.collections.api.RichIterable; import org.eclipse.collections.api.tuple.Pair; import org.finos.legend.engine.language.pure.compiler.Compiler; import org.finos.legend.engine.language.pure.compiler.toPureGraph.PureModel; import org.finos.legend.engine.language.pure.compiler.toPureGraph.PureModelProcessParameter; import org.finos.legend.engine.language.pure.grammar.from.PureGrammarParser; +import org.finos.legend.engine.language.pure.grammar.to.PureGrammarComposer; +import org.finos.legend.engine.language.pure.grammar.to.PureGrammarComposerContext; import org.finos.legend.engine.plan.generation.PlanGenerator; import org.finos.legend.engine.plan.platform.PlanPlatform; import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData; @@ -31,6 +32,7 @@ import org.finos.legend.pure.generated.Root_meta_pure_extension_Extension; import java.net.URL; +import java.util.concurrent.ForkJoinPool; import static org.finos.legend.engine.repl.shared.ExecutionHelper.REPL_RUN_FUNCTION_QUALIFIED_PATH; @@ -39,7 +41,7 @@ public class LocalLegendInterface implements LegendInterface private final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); @Override - public PureModelContextData parse(String txt) + public PureModelContextData parse(String txt, boolean returnSourceInformation) { // txt = "#>{a::DB.test}#->filter(t|$t.name->startsWith('Dr'))->meta::pure::mapping::from(^meta::core::runtime::Runtime\n" + // " (\n" + @@ -52,7 +54,7 @@ public PureModelContextData parse(String txt) // " )\n" + // " )\n" + // " )"; - return PureGrammarParser.newInstance().parseModel(txt); + return PureGrammarParser.newInstance().parseModel(txt, returnSourceInformation); // // "" + // "###Runtime\n" + @@ -78,6 +80,12 @@ public PureModelContextData parse(String txt) // "function a::b::c::d():Any[*]{"+txt+"}"); } + @Override + public String render(PureModelContextData model) + { + return PureGrammarComposer.newInstance(PureGrammarComposerContext.Builder.newInstance().build()).renderPureModelContextData(model); + } + @Override public PureModel compile(PureModelContextData pureModelContextData) { @@ -87,7 +95,7 @@ public PureModel compile(PureModelContextData pureModelContextData) @Override public Root_meta_pure_executionPlan_ExecutionPlan generatePlan(PureModel pureModel, boolean debug) { - RichIterable extensions = PureCoreExtensionLoader.extensions().flatCollect(e -> e.extraPureCoreExtensions(pureModel.getExecutionSupport())); + RichIterable extensions = PureCoreExtensionLoader.extensions().flatCollect(e -> e.extraPureCoreExtensions(pureModel.getExecutionSupport())); Pair res = PlanGenerator.generateExecutionPlanAsPure(pureModel.getConcreteFunctionDefinition_safe(REPL_RUN_FUNCTION_QUALIFIED_PATH), null, pureModel, PlanPlatform.JAVA, "", debug, extensions); if (debug) { diff --git a/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/REPLServerHelpers.java b/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/REPLServerHelpers.java index 7de4ce43707..7ed172a67ac 100644 --- a/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/REPLServerHelpers.java +++ b/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/REPLServerHelpers.java @@ -29,22 +29,31 @@ import org.finos.legend.engine.plan.execution.PlanExecutor; import org.finos.legend.engine.plan.execution.result.builder.tds.TDSBuilder; import org.finos.legend.engine.plan.execution.stores.relational.result.RelationalResult; +import org.finos.legend.engine.protocol.pure.v1.model.context.EngineErrorType; import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData; import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.SQLExecutionNode; import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.Connection; +import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.ConnectionPointer; +import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.PackageableConnection; import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.domain.Function; import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.PackageableRuntime; +import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.section.SectionIndex; +import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Database; +import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Schema; +import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Table; import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.ValueSpecification; import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.application.AppliedFunction; import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.ClassInstance; -import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.packageableElement.PackageableElementPtr; import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.classInstance.relation.ColSpec; import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.classInstance.relation.ColSpecArray; +import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.classInstance.relation.RelationStoreAccessor; +import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.packageableElement.PackageableElementPtr; import org.finos.legend.engine.repl.client.Client; import org.finos.legend.engine.repl.core.legend.LegendInterface; import org.finos.legend.engine.repl.dataCube.server.model.DataCubeQuery; import org.finos.legend.engine.repl.dataCube.server.model.DataCubeQueryColumn; import org.finos.legend.engine.repl.shared.ExecutionHelper; +import org.finos.legend.engine.shared.core.operational.errorManagement.EngineException; import org.finos.legend.pure.m3.coreinstance.meta.pure.metamodel.relation.RelationType; import org.finos.legend.pure.m3.navigation.M3Paths; @@ -61,11 +70,20 @@ public class REPLServerHelpers { public static void handleResponse(HttpExchange exchange, int responseCode, String response, REPLServerState state) + { + handleResponse(exchange, responseCode, response, state, "application/json"); + } + + public static void handleResponse(HttpExchange exchange, int responseCode, String response, REPLServerState state, String contentType) { try { OutputStream os = exchange.getResponseBody(); byte[] byteResponse = response != null ? response.getBytes(StandardCharsets.UTF_8) : new byte[0]; + if (contentType != null) + { + exchange.getResponseHeaders().add("Content-Type", contentType); + } exchange.sendResponseHeaders(responseCode, byteResponse.length); os.write(byteResponse); os.close(); @@ -106,8 +124,8 @@ private void initialize(PureModelContextData pureModelContextData, Listfrom(), such as when mapping is specified Function function = (Function) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)).getFirst(); - String runtime = null; - String mapping = null; + String runtimePath = null; + String mappingPath = null; Deque fns = new LinkedList<>(); ValueSpecification currentExpression = function.body.get(0); while (currentExpression instanceof AppliedFunction) @@ -119,23 +137,23 @@ private void initialize(PureModelContextData pureModelContextData, Listfrom(), only one is expected"); } - runtime = newRuntime; + runtimePath = newRuntime; } else if (fn.parameters.size() == 3) { // TODO: verify the type of the element (i.e. Mapping & Runtime) String newMapping = ((PackageableElementPtr) fn.parameters.get(1)).fullPath; String newRuntime = ((PackageableElementPtr) fn.parameters.get(2)).fullPath; - if ((mapping != null && !mapping.equals(newMapping)) || (runtime != null && !runtime.equals(newRuntime))) + if ((mappingPath != null && !mappingPath.equals(newMapping)) || (runtimePath != null && !runtimePath.equals(newRuntime))) { throw new RuntimeException("Can't launch DataCube. Source query contains multiple different ->from(), only one is expected"); } - mapping = newMapping; - runtime = newRuntime; + mappingPath = newMapping; + runtimePath = newRuntime; } } else @@ -149,24 +167,88 @@ else if (fn.parameters.size() == 3) fn.parameters.set(0, currentExpression); currentExpression = fn; } - Connection connection = null; - if (runtime != null) + + // Build the minimal PMCD needed to persist to run the query + // NOTE: the ONLY use case we want to support right now is when user uses a single DB with relation store accessor + // with a single connection in a single runtime, no mapping, no join, etc. + // Those cases would be too complex to handle and result in too big of a PMCD to persist. + PureModelContextData model = null; + PackageableRuntime runtime = null; + if (runtimePath != null) + { + String _runtimePath = runtimePath; + runtime = (PackageableRuntime) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(_runtimePath)).getOnly(); + } + Database database = null; + if (currentExpression instanceof ClassInstance && ((ClassInstance) currentExpression).value instanceof RelationStoreAccessor) { - String _runtime = runtime; - PackageableRuntime rt = (PackageableRuntime) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(_runtime)).getFirst(); + RelationStoreAccessor accessor = (RelationStoreAccessor) ((ClassInstance) currentExpression).value; + + if (accessor.path.size() <= 1) + { + throw new EngineException("Error in the accessor definition. Please provide a table.", accessor.sourceInformation, EngineErrorType.COMPILATION); + } + String schemaName = (accessor.path.size() == 3) ? accessor.path.get(1) : "default"; + String tableName = (accessor.path.size() == 3) ? accessor.path.get(2) : accessor.path.get(1); + + // clone the database, only extract the schema and table that we need + Database _database = (Database) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(accessor.path.get(0))).getOnly(); + Schema _schema = ListIterate.select(_database.schemas, s -> s.name.equals(schemaName)).getOnly(); + Table _table = ListIterate.select(_schema.tables, t -> t.name.equals(tableName)).getOnly(); + database = new Database(); + database.name = _database.name; + database._package = _database._package; + Schema schema = new Schema(); + schema.name = _schema.name; + Table table = new Table(); + table.name = _table.name; + table.columns = _table.columns; + schema.tables = Lists.mutable.with(table); + database.schemas = Lists.mutable.with(schema); + } + PackageableConnection connection = null; + if (runtimePath != null) + { + String _runtime = runtimePath; + PackageableRuntime rt = (PackageableRuntime) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(_runtime)).getOnly(); if (rt != null && rt.runtimeValue.connections.size() == 1 && rt.runtimeValue.connections.get(0).storeConnections.size() == 1) { - connection = rt.runtimeValue.connections.get(0).storeConnections.get(0).connection; + Connection conn = rt.runtimeValue.connections.get(0).storeConnections.get(0).connection; + if (conn instanceof ConnectionPointer) + { + connection = (PackageableConnection) ListIterate.select(pureModelContextData.getElements(), e -> e.getPath().equals(((ConnectionPointer) conn).connection)).getOnly(); + } + } + } + // The only case we want to support persisting the model is when we have a single DB and connection + if (database != null && connection != null && runtime != null && mappingPath == null) + { + model = PureModelContextData.newBuilder() + .withSerializer(pureModelContextData.serializer) + .withElements(Lists.mutable.with(database, connection, runtime)) + .build(); + try + { + this.legendInterface.compile(model); + model = this.legendInterface.parse(this.legendInterface.render(model), false); + model = PureModelContextData.newBuilder().withSerializer(model.serializer).withElements(ListIterate.reject(model.getElements(), el -> el instanceof SectionIndex)).build(); + } + catch (Exception e) + { + this.client.printDebug("Error while compiling persistent model: " + e.getMessage()); + // something was wrong with the assembled model, reset it + model = null; } } + Map source = Maps.mutable.empty(); source.put("_type", "repl"); source.put("timestamp", this.startTime); source.put("query", currentExpression.accept(DEPRECATED_PureGrammarComposerCore.Builder.newInstance().build())); - source.put("runtime", runtime); - source.put("mapping", mapping); + source.put("runtime", runtimePath); + source.put("mapping", mappingPath); source.put("columns", columns); - source.put("connection", connection); + source.put("model", model); this.source = source; // -------------------- QUERY -------------------- diff --git a/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/handler/DataCubeQueryBuilder.java b/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/handler/DataCubeQueryBuilder.java index 9debbe9347a..e9c6b4dfe25 100644 --- a/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/handler/DataCubeQueryBuilder.java +++ b/legend-engine-config/legend-engine-repl/legend-engine-repl-data-cube/src/main/java/org/finos/legend/engine/repl/dataCube/server/handler/DataCubeQueryBuilder.java @@ -84,7 +84,7 @@ public HttpHandler getHandler(REPLServerState state) BufferedReader bufferReader = new BufferedReader(inputStreamReader); String requestBody = bufferReader.lines().collect(Collectors.joining()); DataCubeGetValueSpecificationCodeInput input = state.objectMapper.readValue(requestBody, DataCubeGetValueSpecificationCodeInput.class); - handleResponse(exchange, 200, DataCubeHelpers.getQueryCode(input.value, input.pretty), state); + handleResponse(exchange, 200, DataCubeHelpers.getQueryCode(input.value, input.pretty), state, "text/plain"); } catch (Exception e) {