Skip to content

Commit

Permalink
Legend SQL - use engine runtime for relational store provider
Browse files Browse the repository at this point in the history
  • Loading branch information
gs-jp1 committed Oct 31, 2023
1 parent 34189f4 commit 11c1ca5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@

package org.finos.legend.engine.query.sql.providers;

import org.eclipse.collections.impl.list.mutable.FastList;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.ConnectionPointer;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.PackageableConnection;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.LegacyRuntime;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.EngineRuntime;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Database;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Schema;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.Table;
Expand Down Expand Up @@ -67,19 +65,15 @@ protected SQLSource createSource(TableSource source, Database store, Packageable
String tableName = source.getArgumentValueAs(ARG_TABLE, -1, String.class, true);

Lambda lambda = tableToTDS(store, schemaName, tableName);

ConnectionPointer connectionPtr = new ConnectionPointer();
connectionPtr.connection = connection.getPath();

LegacyRuntime runtime = new LegacyRuntime();
runtime.connections = FastList.newListWith(connectionPtr);
EngineRuntime runtime = SQLProviderUtils.createRuntime(connection.getPath(), store.getPath());

Collections.addAll(keys, new SQLSourceArgument(ARG_SCHEMA, null, schemaName), new SQLSourceArgument(ARG_TABLE, null, tableName));

return new SQLSource(TYPE, lambda, null, runtime, null, null, keys);
}

public static Lambda tableToTDS(Database database, String schemaName, String tableName)

protected static Lambda tableToTDS(Database database, String schemaName, String tableName)
{
Schema schema = SQLProviderUtils.extractElement("schema", database.schemas, s -> SQLProviderUtils.equalsEscaped(s.name, schemaName));
Table table = SQLProviderUtils.extractElement("table", schema.tables, t -> SQLProviderUtils.equalsEscaped(t.name, tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextPointer;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.ConnectionPointer;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.LegacyRuntime;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.EngineRuntime;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.Lambda;
import org.finos.legend.engine.query.sql.providers.core.SQLSource;
import org.finos.legend.engine.query.sql.providers.core.SQLSourceArgument;
import org.finos.legend.engine.query.sql.providers.core.SQLSourceProvider;
import org.finos.legend.engine.query.sql.providers.core.SQLSourceResolvedContext;
import org.finos.legend.engine.query.sql.providers.core.TableSource;
import org.finos.legend.engine.query.sql.providers.core.TableSourceArgument;
import org.finos.legend.engine.query.sql.providers.core.*;
import org.finos.legend.engine.query.sql.providers.shared.AbstractTestLegendStoreSQLSourceProvider;
import org.finos.legend.engine.query.sql.providers.shared.SQLSourceProviderTestUtils;
import org.finos.legend.engine.query.sql.providers.shared.project.ProjectCoordinateLoader;
Expand Down Expand Up @@ -213,8 +208,7 @@ private void testSuccess(ProjectCoordinateWrapper projectCoordinateWrapper, Pure
ConnectionPointer connectionPtr = new ConnectionPointer();
connectionPtr.connection = CONNECTION_NAME;

LegacyRuntime runtime = new LegacyRuntime();
runtime.connections = FastList.newListWith(connectionPtr);
EngineRuntime runtime = SQLProviderUtils.createRuntime(CONNECTION_NAME, databaseName);

SQLSource expected = new SQLSource("relationalStore", lambda, null, runtime, null, null, keys);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.impl.list.mutable.FastList;
import org.eclipse.collections.impl.utility.ListIterate;
import org.finos.legend.engine.protocol.pure.v1.model.context.PackageableElementPointer;
import org.finos.legend.engine.protocol.pure.v1.model.context.PackageableElementType;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.PackageableElement;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.connection.ConnectionPointer;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.EngineRuntime;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.IdentifiedConnection;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.runtime.StoreConnections;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.application.AppliedFunction;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.CString;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.raw.Lambda;
Expand Down Expand Up @@ -80,4 +86,27 @@ public static boolean equalsEscaped(String value, String toMatch)
{
return value.equals(toMatch) || value.equals("\"" + toMatch + "\"");
}

public static EngineRuntime createRuntime(String connection, String store)
{
ConnectionPointer connectionPtr = new ConnectionPointer();
connectionPtr.connection = connection;

PackageableElementPointer storePointer = new PackageableElementPointer();
storePointer.path = store;
storePointer.type = PackageableElementType.STORE;

IdentifiedConnection identifiedConnection = new IdentifiedConnection();
identifiedConnection.id = "connection1";
identifiedConnection.connection = connectionPtr;

StoreConnections storeConnection = new StoreConnections();
storeConnection.store = storePointer;
storeConnection.storeConnections = FastList.newListWith(identifiedConnection);

EngineRuntime runtime = new EngineRuntime();
runtime.connections = FastList.newListWith(storeConnection);

return runtime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class SQLExecutor
private final PlanExecutor planExecutor;
private final Function<PureModel, RichIterable<? extends Root_meta_pure_extension_Extension>> routerExtensions;
private final Iterable<? extends PlanTransformer> transformers;
private final MutableMap<Object, SQLSourceProvider> providers;
private final MutableMap<String, SQLSourceProvider> providers;

public SQLExecutor(ModelManager modelManager,
PlanExecutor planExecutor,
Expand Down Expand Up @@ -216,7 +216,7 @@ private Pair<RichIterable<SQLSource>, PureModelContext> getSourcesAndModel(Query

if (!schemasValid)
{
throw new IllegalArgumentException("Unsupported schema types " + String.join(", ", grouped.keySet().select(k -> !providers.containsKey(k))));
throw new IllegalArgumentException("Unsupported schema types [" + String.join(", ", grouped.keySet().select(k -> !providers.containsKey(k))) + "], supported types: [" + String.join(", ", providers.keySet()) + "]");
}

RichIterable<SQLSourceResolvedContext> resolved = grouped.keySet().collect(k -> resolve(grouped.get(k), context, providers.get(k), profiles));
Expand Down

0 comments on commit 11c1ca5

Please sign in to comment.