Skip to content

Commit

Permalink
Use result set metadata types for REPL caching (finos#3116)
Browse files Browse the repository at this point in the history
* Use result set metadata types for REPL caching

* Update TestDuckDBCommands.java
  • Loading branch information
gs-ssh16 authored Sep 22, 2024
1 parent 3cbe194 commit 3f59899
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.finos.legend.engine.plan.execution.result.serialization.TemporaryFile;
import org.finos.legend.engine.plan.execution.stores.StoreType;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.DatabaseManager;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.commands.Column;
import org.finos.legend.engine.plan.execution.stores.relational.plugin.RelationalStoreState;
import org.finos.legend.engine.plan.execution.stores.relational.result.RelationalResult;
import org.finos.legend.engine.plan.execution.stores.relational.serialization.RelationalResultToCSVSerializerWithTransformersApplied;
Expand All @@ -38,6 +39,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

import static org.finos.legend.engine.repl.relational.schema.MetadataReader.getTables;
import static org.finos.legend.engine.repl.shared.ExecutionHelper.REPL_RUN_FUNCTION_SIGNATURE;
Expand Down Expand Up @@ -101,6 +103,7 @@ public boolean process(String line) throws Exception
if (res instanceof RelationalResult)
{
RelationalResult relationalResult = (RelationalResult) res;
List<Column> relationalResultColumns = relationalResult.getResultSetColumns();
String tempDir = ((RelationalStoreState) this.client.getPlanExecutor().getExecutorsOfType(StoreType.Relational).getOnly().getStoreState()).getRelationalExecutor().getRelationalExecutionConfiguration().tempPath;
try (TemporaryFile tempFile = new TemporaryFile(tempDir))
{
Expand All @@ -119,7 +122,7 @@ public boolean process(String line) throws Exception
String tableName = specifiedTableName != null ? specifiedTableName : "test" + (getTables(connection).size() + 1);
try (Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile()));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile(), relationalResultColumns));
this.client.println("Cached into table: '" + tableName + "'. Launching DataCube...");

String functionBodyCode = "#>{" + DataCube.getLocalDatabasePath() + "." + tableName + "}#->select()->from(" + DataCube.getLocalRuntimePath() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.finos.legend.engine.plan.execution.result.serialization.TemporaryFile;
import org.finos.legend.engine.plan.execution.stores.StoreType;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.DatabaseManager;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.commands.Column;
import org.finos.legend.engine.plan.execution.stores.relational.plugin.RelationalStoreState;
import org.finos.legend.engine.plan.execution.stores.relational.result.RelationalResult;
import org.finos.legend.engine.plan.execution.stores.relational.serialization.RelationalResultToCSVSerializerWithTransformersApplied;
Expand All @@ -41,6 +42,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

import static org.finos.legend.engine.repl.relational.schema.MetadataReader.getTables;
import static org.finos.legend.engine.repl.shared.ExecutionHelper.executeCode;
Expand Down Expand Up @@ -94,6 +96,7 @@ public boolean process(String line) throws Exception
if (res instanceof RelationalResult)
{
RelationalResult relationalResult = (RelationalResult) res;
List<Column> relationalResultColumns = relationalResult.getResultSetColumns();
String tempDir = ((RelationalStoreState) this.client.getPlanExecutor().getExecutorsOfType(StoreType.Relational).getOnly().getStoreState()).getRelationalExecutor().getRelationalExecutionConfiguration().tempPath;
try (TemporaryFile tempFile = new TemporaryFile(tempDir))
{
Expand All @@ -112,7 +115,7 @@ public boolean process(String line) throws Exception
String tableName = specifiedTableName != null ? specifiedTableName : "test" + (getTables(connection).size() + 1);
try (Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile()));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile(), relationalResultColumns));
this.client.println("Cached into table: '" + tableName + "'");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,33 @@
<artifactId>eclipse-collections</artifactId>
</dependency>
<!-- ECLIPSE COLLECTIONS -->

<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-executionPlan-execution</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-protocol-pure</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-identity-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Test -->
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.commands.RelationalDatabaseCommandsVisitor;

import java.util.List;
import java.util.stream.Collectors;

public class DuckDBCommands extends RelationalDatabaseCommands
{
Expand Down Expand Up @@ -47,6 +48,13 @@ public String load(String tableName, String location)
return "CREATE TABLE " + tableName + " AS SELECT * FROM read_csv('" + location + "', header=true);";
}

@Override
public String load(String tableName, String location, List<Column> columns)
{
String columnTypesString = columns.stream().map(c -> String.format("'%s': '%s'", c.name, c.type)).collect(Collectors.joining(", ", "{", "}"));
return "CREATE TABLE " + tableName + " AS SELECT * FROM read_csv('" + location + "', header = true, columns = " + columnTypesString + ");";
}

@Override
public String dropTable(String tableName, String location)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.plan.execution.stores.relational.connection.driver.vendors.duckdb;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.eclipse.collections.impl.factory.Lists;
import org.finos.legend.engine.plan.execution.result.serialization.TemporaryFile;
import org.finos.legend.engine.plan.execution.stores.relational.activity.RelationalExecutionActivity;
import org.finos.legend.engine.plan.execution.stores.relational.config.TemporaryTestDbConfiguration;
import org.finos.legend.engine.plan.execution.stores.relational.connection.driver.commands.Column;
import org.finos.legend.engine.plan.execution.stores.relational.connection.ds.state.ConnectionStateManager;
import org.finos.legend.engine.plan.execution.stores.relational.connection.manager.ConnectionManagerSelector;
import org.finos.legend.engine.plan.execution.stores.relational.result.RelationalResult;
import org.finos.legend.engine.plan.execution.stores.relational.result.SQLExecutionResult;
import org.finos.legend.engine.plan.execution.stores.relational.serialization.RelationalResultToCSVSerializer;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.RelationalTdsInstantiationExecutionNode;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.SQLExecutionNode;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.result.DataTypeResultType;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.DatabaseType;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.RelationalDatabaseConnection;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.authentication.TestDatabaseAuthenticationStrategy;
import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.connection.specification.DuckDBDatasourceSpecification;
import org.finos.legend.engine.shared.core.identity.Identity;
import org.junit.*;
import org.junit.rules.TemporaryFolder;

import javax.security.auth.Subject;
import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class TestDuckDBCommands
{
@Rule
public final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private static final DuckDBCommands DUCK_DB_COMMANDS = new DuckDBCommands();
private static final ConnectionManagerSelector CONNECTION_MANAGER_SELECTOR = new ConnectionManagerSelector(new TemporaryTestDbConfiguration(-1), Collections.emptyList(), Optional.empty());

@AfterClass
public static void tearDown() throws IOException
{
ConnectionStateManager.getInstance().close();
}

@Test
public void testLoadCommand() throws Exception
{
try (
Connection connection = CONNECTION_MANAGER_SELECTOR.getDatabaseConnection((Subject) null, this.testDuckDBConnection());
Statement statement = connection.createStatement()
)
{
String loadSql = DUCK_DB_COMMANDS.load("load_command_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile());
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from load_command_test_table"))
{
assertOnColumnCountAndColumnTypes(rs.getMetaData(), 4, "(id:BIGINT)|(firstName:VARCHAR)|(lastName:VARCHAR)|(age:BIGINT)");
assertOnResultSetCSV(
rs,
"1,Peter,Smith,23\r\n" +
"2,John,Johnson,22\r\n" +
"3,John,Hill,12\r\n" +
"4,Anthony,Allen,22\r\n" +
"5,Fabrice,Roberts,34\r\n" +
"6,Oliver,Hill,32\r\n" +
"7,David,Harris,35\r\n"
);
}
}
}

@Test
public void testLoadCommandWithProvidedTypes() throws Exception
{
try (
Connection connection = CONNECTION_MANAGER_SELECTOR.getDatabaseConnection((Subject) null, this.testDuckDBConnection());
Statement statement = connection.createStatement()
)
{
List<Column> columns = Arrays.asList(
new Column("id", "INT"),
new Column("firstName", "VARCHAR"),
new Column("lastName", "VARCHAR(32)"),
new Column("age", "DOUBLE")
);
String loadSql = DUCK_DB_COMMANDS.load("load_command_with_types_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile(), columns);
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from load_command_with_types_test_table"))
{
assertOnColumnCountAndColumnTypes(rs.getMetaData(), 4, "(id:INTEGER)|(firstName:VARCHAR)|(lastName:VARCHAR)|(age:DOUBLE)");
assertOnResultSetCSV(
rs,
"1,Peter,Smith,23.0\r\n" +
"2,John,Johnson,22.0\r\n" +
"3,John,Hill,12.0\r\n" +
"4,Anthony,Allen,22.0\r\n" +
"5,Fabrice,Roberts,34.0\r\n" +
"6,Oliver,Hill,32.0\r\n" +
"7,David,Harris,35.0\r\n"
);
}
}
}

@Test
public void testLoadCommandWithProvidedTypesFromRelationalResult() throws Exception
{
RelationalResult relationalResult;
try (TemporaryFile tempFile = new TemporaryFile(TEMPORARY_FOLDER.getRoot().getAbsolutePath()))
{
RelationalDatabaseConnection duckDbConnection = this.testDuckDBConnection();
try (
Connection connection = CONNECTION_MANAGER_SELECTOR.getDatabaseConnection((Subject) null, duckDbConnection);
Statement statement = connection.createStatement()
)
{
statement.execute(DUCK_DB_COMMANDS.load("load_command_with_types_test_table_1", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile()));
SQLExecutionNode sqlExecutionNode = new SQLExecutionNode();
sqlExecutionNode.isResultColumnsDynamic = true;
sqlExecutionNode.connection = duckDbConnection;
RelationalTdsInstantiationExecutionNode tdsNode = new RelationalTdsInstantiationExecutionNode();
tdsNode.resultType = new DataTypeResultType();
SQLExecutionResult sqlExecutionResult = new SQLExecutionResult(
Lists.mutable.of(new RelationalExecutionActivity("select firstName as \"First Name\", avg(age) as \"Average Age\" from load_command_with_types_test_table_1 group by firstName", "")),
sqlExecutionNode,
"DuckDB",
"UTC",
connection,
Identity.getAnonymousIdentity(),
Lists.mutable.empty(),
null
);
relationalResult = new RelationalResult(sqlExecutionResult, tdsNode);
RelationalResultToCSVSerializer csvSerializer = new RelationalResultToCSVSerializer(relationalResult, true);
tempFile.writeFile(csvSerializer);
relationalResult.close();
}

try (
Connection connection = CONNECTION_MANAGER_SELECTOR.getDatabaseConnection((Subject) null, duckDbConnection);
Statement statement = connection.createStatement()
)
{
String loadSql = DUCK_DB_COMMANDS.load("load_command_with_types_test_table_2", tempFile.path.toString(), relationalResult.getResultSetColumns());
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from load_command_with_types_test_table_2 order by \"Average Age\""))
{
assertOnColumnCountAndColumnTypes(rs.getMetaData(), 2, "(First Name:VARCHAR)|(Average Age:DOUBLE)");
assertOnResultSetCSV(
rs,
"John,17.0\r\n" +
"Anthony,22.0\r\n" +
"Peter,23.0\r\n" +
"Oliver,32.0\r\n" +
"Fabrice,34.0\r\n" +
"David,35.0\r\n"
);
}
}
}
}

private static void assertOnColumnCountAndColumnTypes(ResultSetMetaData resultSetMetaData, int expectedColumnCount, String expectedColumnTypes) throws SQLException
{
int columnCount = resultSetMetaData.getColumnCount();
Assert.assertEquals(expectedColumnCount, columnCount);

String columnsAndTypes = IntStream.range(1, expectedColumnCount + 1).mapToObj(i ->
{
try
{
return "(" + resultSetMetaData.getColumnLabel(i) + ":" + resultSetMetaData.getColumnTypeName(i) + ")";
}
catch (SQLException e)
{
throw new RuntimeException(e);
}
}).collect(Collectors.joining("|"));
Assert.assertEquals(expectedColumnTypes, columnsAndTypes);
}

private static void assertOnResultSetCSV(ResultSet resultSet, String expectedResult) throws SQLException, IOException
{
StringBuilder sb = new StringBuilder();
try (CSVPrinter csvPrinter = new CSVPrinter(sb, CSVFormat.DEFAULT))
{
csvPrinter.printRecords(resultSet);
}
Assert.assertEquals(expectedResult, sb.toString());
}

private RelationalDatabaseConnection testDuckDBConnection() throws Exception
{
DuckDBDatasourceSpecification duckDBDatasourceSpecification = new DuckDBDatasourceSpecification();
duckDBDatasourceSpecification.path = TEMPORARY_FOLDER.newFolder("duckDB").getAbsolutePath() + "/duck_db_file";
TestDatabaseAuthenticationStrategy testDatabaseAuthSpec = new TestDatabaseAuthenticationStrategy();
return new RelationalDatabaseConnection(duckDBDatasourceSpecification, testDatabaseAuthSpec, DatabaseType.DuckDB);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
id,firstName,lastName,age
1,Peter,Smith,23
2,John,Johnson,22
3,John,Hill,12
4,Anthony,Allen,22
5,Fabrice,Roberts,34
6,Oliver,Hill,32
7,David,Harris,35
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public String load(String tableName, String location)
throw new RuntimeException("Load not implemented for " + this.getClass().getSimpleName());
}

public String load(String tableName, String location, List<Column> columns)
{
throw new RuntimeException("Load not implemented for " + this.getClass().getSimpleName());
}

public String dropTable(String tableName, String location)
{
throw new RuntimeException("Drop table not implemented for " + this.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public String load(String tableName, String location)
return "CREATE TABLE " + tableName + " AS SELECT * FROM CSVREAD('" + location + "');";
}

@Override
public String load(String tableName, String location, List<Column> columns)
{
return "CREATE TABLE " + tableName + "(" + columns.stream().map(c -> c.name + " " + c.type).collect(Collectors.joining(", ")) + ") AS SELECT * FROM CSVREAD('" + location + "');";
}

@Override
public <T> T accept(RelationalDatabaseCommandsVisitor<T> visitor)
{
Expand Down
Loading

0 comments on commit 3f59899

Please sign in to comment.