Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Component: Introduce Postgres Sink #2737

Merged
merged 25 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3e3a32d
Change digest UDF's signature to String[], String[] and add support f…
kumuwu Mar 14, 2024
ef4634e
Add sorting logic based on column names
kumuwu Mar 14, 2024
2f022a7
Add more tests
kumuwu Mar 15, 2024
fbc4522
Fix bigquery end to end tests
kumuwu Mar 18, 2024
a896c5b
Fix comments
kumuwu Mar 19, 2024
52a0866
Clean up
kumuwu Mar 19, 2024
0e29370
Rename method
kumuwu Mar 19, 2024
308e1ec
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Mar 21, 2024
d2b9aa5
Introduce postgres sink
kumuwu Mar 25, 2024
5ca255a
Add unitemporal snapshot end to end tests
kumuwu Mar 25, 2024
1ea9ea7
Fix h2 and snowflake length and scale problem for date/time data types
kumuwu Mar 26, 2024
323c5b5
Fix postgres data type mappings
kumuwu Mar 28, 2024
7959a7c
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Mar 28, 2024
91165a5
Fix pom + clean up PostgresSink
kumuwu Mar 28, 2024
1f3e374
Address comments
kumuwu Apr 1, 2024
23086ae
Add SQL tests for unitemporal snapshot
kumuwu Apr 1, 2024
e4eefa4
Remove extra print line
kumuwu Apr 1, 2024
5bef910
Add append only e2e tests
kumuwu Apr 5, 2024
1271d7b
Add append only SQL tests
kumuwu Apr 8, 2024
80c9ba0
Fix merge and to_json syntax
kumuwu Apr 8, 2024
edba665
Fix test
kumuwu Apr 8, 2024
c93ff97
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Apr 8, 2024
75f2d95
Fix pom
kumuwu Apr 8, 2024
5f45a2e
Improve e2e flow
kumuwu Apr 9, 2024
7166409
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Apr 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public static void logSql(Logger logger, SqlLogging sqlLogging, String sqlBefore
break;
case UNMASKED:
logger.info(sqlAfterReplacingPlaceholders);
System.out.println(sqlAfterReplacingPlaceholders);
kumuwu marked this conversation as resolved.
Show resolved Hide resolved
break;
case DISABLED:
break;
Expand All @@ -68,6 +69,7 @@ public static void logSql(Logger logger, SqlLogging sqlLogging, String sql)
if (!sqlLogging.equals(SqlLogging.DISABLED))
{
logger.info(sql);
System.out.println(sql);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Dataset constructDatasetFromDatabase(Dataset dataset)
@Override
public void setSqlLogging(SqlLogging sqlLogging)
{
this.sqlLogging = sqlLogging;
this.sqlLogging = SqlLogging.UNMASKED;
kumuwu marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMap
String databaseName = dataset.datasetReference().database().orElse(null);
try
{
if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping))
if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping))
{
throw new IllegalStateException("Only JdbcPropertiesToLogicalDataTypeMapping allowed in constructDatasetFromDatabase");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,25 @@ public interface JdbcPropertiesToLogicalDataTypeMapping extends TypeMapping
String SMALLINT = "SMALLINT";
String INTEGER = "INTEGER";
String BIGINT = "BIGINT";
String INT2 = "INT2";
String INT4 = "INT4";
String INT8 = "INT8";
String DECIMAL = "DECIMAL";
String NUMERIC = "NUMERIC";
String DOUBLE = "DOUBLE";
String REAL = "REAL";
String FLOAT4 = "FLOAT4";
String FLOAT8 = "FLOAT8";
String CHAR = "CHAR";
String VARCHAR = "VARCHAR";
String TEXT = "TEXT";
String BPCHAR = "BPCHAR";
String CLOB = "CLOB";
String BINARY = "BINARY";
String VARBINARY = "VARBINARY";
String BYTEA = "BYTEA";
String BIT = "BIT";
String BOOL = "BOOL";
String BOOLEAN = "BOOLEAN";
String DATE = "DATE";
String TIME = "TIME";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.sqldom.schema;

public class Text extends VariableSizeDataType
{

public Text()
{
super("TEXT");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public DataType getDataType(FieldType type)
case TIMESTAMP:
case DATETIME:
dataType = new Timestamp();
type.scale().ifPresent(dataType::setScale);
type.length().ifPresent(dataType::setLength);
break;
case TIMESTAMP_TZ:
dataType = new TimestampWithTimezone();
type.scale().ifPresent(dataType::setScale);
type.length().ifPresent(dataType::setLength);
break;
case DATE:
dataType = new Date();
Expand All @@ -96,7 +96,7 @@ public DataType getDataType(FieldType type)
break;
case TIME:
dataType = new Time();
type.scale().ifPresent(dataType::setScale);
type.length().ifPresent(dataType::setLength);
break;
case NUMERIC:
dataType = new Numeric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public FieldType getDataType(String typeName, String dataType, Integer columnSiz
case DATE:
return FieldType.builder().dataType(DataType.DATE).build();
case TIME:
return FieldType.builder().dataType(DataType.TIME).scale(decimalDigits).build();
return FieldType.builder().dataType(DataType.TIME).length(decimalDigits).build();
case TIMESTAMP:
return FieldType.builder().dataType(DataType.TIMESTAMP).scale(decimalDigits).build();
return FieldType.builder().dataType(DataType.TIMESTAMP).length(decimalDigits).build();
case TIMESTAMP_WITH_TIME_ZONE:
return FieldType.builder().dataType(DataType.TIMESTAMP_TZ).scale(decimalDigits).build();
return FieldType.builder().dataType(DataType.TIMESTAMP_TZ).length(decimalDigits).build();
case JSON:
return FieldType.builder().dataType(DataType.JSON).build();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class TestUtils
public static Field nullableIntIncome = Field.builder().name(incomeName).type(FieldType.of(DataType.INTEGER, Optional.empty(), Optional.empty())).fieldAlias(incomeName).build();
public static Field decimalIncome = Field.builder().name(incomeName).type(FieldType.of(DataType.DECIMAL, 10, 2)).fieldAlias(incomeName).build();
public static Field startTime = Field.builder().name(startTimeName).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(startTimeName).build();
public static Field startTimeTimestamp = Field.builder().name(startTimeName).type(FieldType.of(DataType.TIMESTAMP, null, 6)).primaryKey(true).fieldAlias(startTimeName).build();
public static Field startTimeTimestamp = Field.builder().name(startTimeName).type(FieldType.of(DataType.TIMESTAMP, 6, null)).primaryKey(true).fieldAlias(startTimeName).build();
public static Field expiryDate = Field.builder().name(expiryDateName).type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).fieldAlias(expiryDateName).build();
public static Field expiryDatePk = Field.builder().name(expiryDateName).type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(expiryDateName).build();
public static Field date = Field.builder().name(dateName).type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(dateName).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2024 Goldman Sachs

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component</artifactId>
<version>4.42.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>legend-engine-xt-persistence-component-relational-postgres</artifactId>
<packaging>jar</packaging>
<name>Legend Engine - XT - Persistence - Component - Relational Postgres</name>

<dependencies>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-logical-plan</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-physical-plan</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-core</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-ansi</artifactId>
</dependency>

<!-- DRIVER -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.2</version>
<scope>runtime</scope>
</dependency>
<!-- DRIVER -->

<!-- TEST -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
kumuwu marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
kumuwu marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.postgres;

import org.finos.legend.engine.persistence.components.executor.Executor;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Update;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.LowerCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.UpperCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection;
import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutor;
import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection;
import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcHelper;
import org.finos.legend.engine.persistence.components.relational.postgres.sql.PostgresDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.postgres.sql.PostgresJdbcPropertiesToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.postgres.sql.visitor.FieldVisitor;
import org.finos.legend.engine.persistence.components.relational.postgres.sql.visitor.SQLUpdateVisitor;
import org.finos.legend.engine.persistence.components.relational.postgres.sql.visitor.SchemaDefinitionVisitor;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.util.Capability;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class PostgresSink extends AnsiSqlSink
{
private static final RelationalSink INSTANCE;

private static final Set<Capability> CAPABILITIES;
private static final Map<Class<?>, LogicalPlanVisitor<?>> LOGICAL_PLAN_VISITOR_BY_CLASS;
private static final Map<DataType, Set<DataType>> IMPLICIT_DATA_TYPE_MAPPING;
private static final Map<DataType, Set<DataType>> EXPLICIT_DATA_TYPE_MAPPING;

static
{
Set<Capability> capabilities = new HashSet<>();
capabilities.add(Capability.MERGE);
capabilities.add(Capability.ADD_COLUMN);
capabilities.add(Capability.IMPLICIT_DATA_TYPE_CONVERSION);
capabilities.add(Capability.EXPLICIT_DATA_TYPE_CONVERSION);
capabilities.add(Capability.DATA_TYPE_LENGTH_CHANGE);
capabilities.add(Capability.DATA_TYPE_SCALE_CHANGE);
CAPABILITIES = Collections.unmodifiableSet(capabilities);

Map<Class<?>, LogicalPlanVisitor<?>> logicalPlanVisitorByClass = new HashMap<>();
logicalPlanVisitorByClass.put(SchemaDefinition.class, new SchemaDefinitionVisitor());
logicalPlanVisitorByClass.put(Field.class, new FieldVisitor());
logicalPlanVisitorByClass.put(Update.class, new SQLUpdateVisitor());
LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass);

// TODO: These two mappings have not been confirmed, to do with schema evolution
Map<DataType, Set<DataType>> implicitDataTypeMapping = new HashMap<>();
implicitDataTypeMapping.put(DataType.DECIMAL, new HashSet<>(Arrays.asList(DataType.TINYINT, DataType.SMALLINT, DataType.INTEGER, DataType.INT, DataType.BIGINT, DataType.FLOAT, DataType.DOUBLE, DataType.REAL, DataType.NUMERIC)));
implicitDataTypeMapping.put(DataType.DOUBLE, new HashSet<>(Arrays.asList(DataType.TINYINT, DataType.SMALLINT, DataType.INTEGER, DataType.INT, DataType.FLOAT, DataType.REAL)));
implicitDataTypeMapping.put(DataType.REAL, new HashSet<>(Arrays.asList(DataType.TINYINT, DataType.SMALLINT, DataType.INTEGER, DataType.INT, DataType.FLOAT, DataType.DOUBLE)));
implicitDataTypeMapping.put(DataType.BIGINT, new HashSet<>(Arrays.asList(DataType.TINYINT, DataType.SMALLINT, DataType.INTEGER, DataType.INT)));
implicitDataTypeMapping.put(DataType.INTEGER, new HashSet<>(Arrays.asList(DataType.INT, DataType.TINYINT, DataType.SMALLINT)));
implicitDataTypeMapping.put(DataType.SMALLINT, Collections.singleton(DataType.TINYINT));
implicitDataTypeMapping.put(DataType.VARCHAR, new HashSet<>(Arrays.asList(DataType.CHAR, DataType.STRING)));
implicitDataTypeMapping.put(DataType.TIMESTAMP, Collections.singleton(DataType.DATETIME));
IMPLICIT_DATA_TYPE_MAPPING = Collections.unmodifiableMap(implicitDataTypeMapping);

Map<DataType, Set<DataType>> explicitDataTypeMapping = new HashMap<>();
explicitDataTypeMapping.put(DataType.TINYINT, new HashSet<>(Arrays.asList(DataType.SMALLINT, DataType.INTEGER, DataType.INT, DataType.BIGINT, DataType.FLOAT, DataType.DOUBLE, DataType.DECIMAL, DataType.NUMERIC)));
explicitDataTypeMapping.put(DataType.SMALLINT, new HashSet<>(Arrays.asList(DataType.INTEGER, DataType.INT, DataType.BIGINT, DataType.FLOAT, DataType.DOUBLE, DataType.DECIMAL, DataType.NUMERIC)));
explicitDataTypeMapping.put(DataType.INTEGER, new HashSet<>(Arrays.asList(DataType.BIGINT, DataType.FLOAT, DataType.DOUBLE, DataType.DECIMAL, DataType.NUMERIC)));
explicitDataTypeMapping.put(DataType.BIGINT, new HashSet<>(Arrays.asList(DataType.DECIMAL, DataType.NUMERIC)));
explicitDataTypeMapping.put(DataType.REAL, new HashSet<>(Arrays.asList(DataType.DOUBLE, DataType.DECIMAL, DataType.NUMERIC)));
explicitDataTypeMapping.put(DataType.DOUBLE, new HashSet<>(Arrays.asList(DataType.DECIMAL, DataType.NUMERIC)));
explicitDataTypeMapping.put(DataType.CHAR, new HashSet<>(Arrays.asList(DataType.VARCHAR, DataType.LONGTEXT, DataType.STRING)));
explicitDataTypeMapping.put(DataType.VARCHAR, Collections.singleton(DataType.LONGTEXT));
EXPLICIT_DATA_TYPE_MAPPING = Collections.unmodifiableMap(explicitDataTypeMapping);

INSTANCE = new PostgresSink();
}

public static RelationalSink get()
{
return INSTANCE;
}

public static Connection createConnection(String user, String pwd, String jdbcUrl)
{
try
{
return DriverManager.getConnection(jdbcUrl, user, pwd);
}
catch (SQLException e)
{
throw new IllegalArgumentException(e);
}
}

private PostgresSink()
{
super(
CAPABILITIES,
IMPLICIT_DATA_TYPE_MAPPING,
EXPLICIT_DATA_TYPE_MAPPING,
SqlGenUtils.QUOTE_IDENTIFIER,
LOGICAL_PLAN_VISITOR_BY_CLASS,
(executor, sink, dataset) -> sink.doesTableExist(dataset),
(executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new PostgresDataTypeMapping()),
(executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new PostgresJdbcPropertiesToLogicalDataTypeMapping()));
}

@Override
public Executor<SqlGen, TabularData, SqlPlan> getRelationalExecutor(RelationalConnection relationalConnection)
{
if (relationalConnection instanceof JdbcConnection)
{
JdbcConnection jdbcConnection = (JdbcConnection) relationalConnection;
return new RelationalExecutor(this, JdbcHelper.of(jdbcConnection.connection()));
}
else
{
throw new UnsupportedOperationException("Only JdbcConnection is supported for Postgres Sink");
}
}

@Override
public Optional<Optimizer> optimizerForCaseConversion(CaseConversion caseConversion)
kumuwu marked this conversation as resolved.
Show resolved Hide resolved
{
switch (caseConversion)
{
case TO_LOWER:
return Optional.of(new LowerCaseOptimizer());
case TO_UPPER:
return Optional.of(new UpperCaseOptimizer());
case NONE:
return Optional.empty();
default:
throw new IllegalArgumentException("Unrecognized case conversion: " + caseConversion);
}
}
}
Loading
Loading