Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Component: Support ingestion within existing transaction and multi table ingestion #12

Open
wants to merge 6 commits into
base: bulk_load_support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;
import org.finos.legend.engine.persistence.components.ingestmode.validitymilestoning.derivation.SourceSpecifiesFromDateTime;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.Optional;

public class TempDatasetsEnricher implements IngestModeVisitor<Datasets>
{
Datasets datasets;

public TempDatasetsEnricher(Datasets datasets)
{
this.datasets = datasets;
}


@Override
public Datasets visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return datasets;
}

@Override
public Datasets visitNontemporalSnapshot(NontemporalSnapshotAbstract nontemporalSnapshot)
{
return datasets;
}

@Override
public Datasets visitNontemporalDelta(NontemporalDeltaAbstract nontemporalDelta)
{
return datasets;
}

@Override
public Datasets visitUnitemporalSnapshot(UnitemporalSnapshotAbstract unitemporalSnapshot)
{
return datasets;
}

@Override
public Datasets visitUnitemporalDelta(UnitemporalDeltaAbstract unitemporalDelta)
{
return datasets;
}

@Override
public Datasets visitBitemporalSnapshot(BitemporalSnapshotAbstract bitemporalSnapshot)
{
return datasets;
}

@Override
public Datasets visitBitemporalDelta(BitemporalDeltaAbstract bitemporalDelta)
{
Datasets enrichedDatasets = datasets;
if (bitemporalDelta.validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime)
{
Optional<String> deleteIndicatorField = bitemporalDelta.mergeStrategy().accept(MergeStrategyVisitors.EXTRACT_DELETE_FIELD);
enrichedDatasets = enrichedDatasets.withTempDataset(LogicalPlanUtils.getTempDataset(enrichedDatasets));
if (deleteIndicatorField.isPresent())
{
enrichedDatasets = enrichedDatasets.withTempDatasetWithDeleteIndicator(LogicalPlanUtils.getTempDatasetWithDeleteIndicator(enrichedDatasets, deleteIndicatorField.get()));
}
}

return enrichedDatasets;
}

@Override
public Datasets visitBulkLoad(BulkLoadAbstract bulkLoad)
{
return datasets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,12 @@
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Not;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.NotEquals;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Or;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Join;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.JoinOperation;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Drop;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Operation;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete;
Expand Down Expand Up @@ -72,15 +68,14 @@
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_UPDATED;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_TERMINATED;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED;
import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.UNDERSCORE;

class BitemporalDeltaPlanner extends BitemporalPlanner
{
private static final String VALID_DATE_TIME_FROM_NAME = "legend_persistence_start_date";
private static final String VALID_DATE_TIME_THRU_NAME = "legend_persistence_end_date";
private static final String LEFT_DATASET_IN_JOIN_ALIAS = "legend_persistence_x";
private static final String RIGHT_DATASET_IN_JOIN_ALIAS = "legend_persistence_y";
private static final String TEMP_DATASET_BASE_NAME = "legend_persistence_temp";
private static final String TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME = "legend_persistence_tempWithDeleteIndicator";
private static final String STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME = "legend_persistence_stageWithoutDuplicates";

private final Optional<String> deleteIndicatorField;
Expand Down Expand Up @@ -177,57 +172,26 @@ class BitemporalDeltaPlanner extends BitemporalPlanner

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime)
{
this.tempDataset = getTempDataset(datasets);
this.tempDataset = LogicalPlanUtils.getTempDataset(datasets);
if (deleteIndicatorField.isPresent())
{
this.tempDatasetWithDeleteIndicator = getTempDatasetWithDeleteIndicator(datasets);
this.tempDatasetWithDeleteIndicator = LogicalPlanUtils.getTempDatasetWithDeleteIndicator(datasets, deleteIndicatorField.get());
}
}
}

private Dataset getStagingDatasetWithoutDuplicates(Datasets datasets)
{
String tableName = stagingDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
return datasets.stagingDatasetWithoutDuplicates().orElse(DatasetDefinition.builder()
.schema(stagingDataset().schema())
.database(stagingDataset().datasetReference().database())
.group(stagingDataset().datasetReference().group())
.name(LogicalPlanUtils.generateTableNameWithSuffix(stagingDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME))
.name(tableName + UNDERSCORE + STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME)
.alias(STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME)
.build());
}

private Dataset getTempDataset(Datasets datasets)
{
return datasets.tempDataset().orElse(DatasetDefinition.builder()
.schema(mainDataset().schema())
.database(mainDataset().datasetReference().database())
.group(mainDataset().datasetReference().group())
.name(LogicalPlanUtils.generateTableNameWithSuffix(mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_BASE_NAME))
.alias(TEMP_DATASET_BASE_NAME)
.build());
}

private Dataset getTempDatasetWithDeleteIndicator(Datasets datasets)
{
if (datasets.tempDatasetWithDeleteIndicator().isPresent())
{
return datasets.tempDatasetWithDeleteIndicator().get();
}
else
{
Field deleteIndicator = Field.builder().name(deleteIndicatorField.orElseThrow((IllegalStateException::new))).type(FieldType.of(DataType.BOOLEAN, Optional.empty(), Optional.empty())).build();
List<Field> mainFieldsPlusDeleteIndicator = new ArrayList<>(mainDataset().schema().fields());
mainFieldsPlusDeleteIndicator.add(deleteIndicator);
return DatasetDefinition.builder()
.schema(mainDataset().schema().withFields(mainFieldsPlusDeleteIndicator))
.database(mainDataset().datasetReference().database())
.group(mainDataset().datasetReference().group())
.name(LogicalPlanUtils.generateTableNameWithSuffix(mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME))
.alias(TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME)
.build();
}
}

@Override
protected BitemporalDelta ingestMode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.UUID;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
Expand All @@ -31,6 +32,8 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.All;
import org.finos.legend.engine.persistence.components.logicalplan.values.Array;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
Expand Down Expand Up @@ -73,7 +76,9 @@ public class LogicalPlanUtils
public static final String DEFAULT_META_TABLE = "batch_metadata";
public static final String DATA_SPLIT_LOWER_BOUND_PLACEHOLDER = "{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}";
public static final String DATA_SPLIT_UPPER_BOUND_PLACEHOLDER = "{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}";
private static final String UNDERSCORE = "_";
public static final String UNDERSCORE = "_";
public static final String TEMP_DATASET_BASE_NAME = "legend_persistence_temp";
public static final String TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME = "legend_persistence_tempWithDeleteIndicator";

private LogicalPlanUtils()
{
Expand Down Expand Up @@ -397,6 +402,38 @@ public static List<Value> extractStagedFilesFieldValues(Dataset dataset)
return stagedFilesFields;
}

public static Dataset getTempDataset(Datasets datasets)
{
return datasets.tempDataset().orElse(DatasetDefinition.builder()
.schema(datasets.mainDataset().schema())
.database(datasets.mainDataset().datasetReference().database())
.group(datasets.mainDataset().datasetReference().group())
.name(LogicalPlanUtils.generateTableNameWithSuffix(datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_BASE_NAME))
.alias(TEMP_DATASET_BASE_NAME)
.build());
}

public static Dataset getTempDatasetWithDeleteIndicator(Datasets datasets, String deleteIndicatorField)
{
if (datasets.tempDatasetWithDeleteIndicator().isPresent())
{
return datasets.tempDatasetWithDeleteIndicator().get();
}
else
{
Field deleteIndicator = Field.builder().name(deleteIndicatorField).type(FieldType.of(DataType.BOOLEAN, Optional.empty(), Optional.empty())).build();
List<Field> mainFieldsPlusDeleteIndicator = new ArrayList<>(datasets.mainDataset().schema().fields());
mainFieldsPlusDeleteIndicator.add(deleteIndicator);
return DatasetDefinition.builder()
.schema(datasets.mainDataset().schema().withFields(mainFieldsPlusDeleteIndicator))
.database(datasets.mainDataset().datasetReference().database())
.group(datasets.mainDataset().datasetReference().group())
.name(LogicalPlanUtils.generateTableNameWithSuffix(datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME))
.alias(TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME)
.build();
}
}

public static Set<DataType> SUPPORTED_DATA_TYPES_FOR_OPTIMIZATION_COLUMNS =
new HashSet<>(Arrays.asList(INT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public interface Executor<C extends PhysicalPlanNode, R extends ResultData, P ex
void revert();

void close();

RelationalExecutionHelper getRelationalExecutionHelper();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.executor;
package org.finos.legend.engine.persistence.components.executor;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.relational.sql.DataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.sql.JdbcPropertiesToLogicalDataTypeMapping;

import java.util.List;
import java.util.Map;
Expand All @@ -36,9 +34,9 @@ public interface RelationalExecutionHelper

boolean doesTableExist(Dataset dataset);

void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMapping);
void validateDatasetSchema(Dataset dataset, TypeMapping datatypeMapping);

Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, JdbcPropertiesToLogicalDataTypeMapping mapping);
Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, TypeMapping mapping);

void executeStatement(String sql);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.executor;

public interface TypeMapping
{

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.relational.bigquery.executor;

import org.finos.legend.engine.persistence.components.executor.Executor;
import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
Expand Down Expand Up @@ -133,6 +134,12 @@ public void close()
bigQueryHelper.close();
}

@Override
public RelationalExecutionHelper getRelationalExecutionHelper()
{
return this.bigQueryHelper;
}

private String getEnrichedSql(Map<String, String> placeholderKeyValues, String sql)
{
String enrichedSql = sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.TableId;
import org.finos.legend.engine.persistence.components.executor.TypeMapping;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
Expand All @@ -32,7 +33,7 @@
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
import org.finos.legend.engine.persistence.components.relational.bigquery.sqldom.constraints.columns.PKColumnConstraint;
import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper;
import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper;
import org.finos.legend.engine.persistence.components.relational.sql.DataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.sql.JdbcPropertiesToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.ColumnConstraint;
Expand Down Expand Up @@ -156,8 +157,13 @@ public boolean doesTableExist(Dataset dataset)
return tableExists;
}

public void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMapping)
public void validateDatasetSchema(Dataset dataset, TypeMapping typeMapping)
{
if (!(typeMapping instanceof DataTypeMapping))
{
throw new IllegalStateException("Only DataTypeMapping allowed in validateDatasetSchema");
}
DataTypeMapping datatypeMapping = (DataTypeMapping) typeMapping;
String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new);
String schema = dataset.datasetReference().group().orElse(null);

Expand Down Expand Up @@ -201,8 +207,13 @@ public void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMappi
validateColumns(userColumns, dbColumns);
}

public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, JdbcPropertiesToLogicalDataTypeMapping mapping)
public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, TypeMapping typeMapping)
{
if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping))
{
throw new IllegalStateException("Only JdbcPropertiesToLogicalDataTypeMapping allowed in constructDatasetFromDatabase");
}
JdbcPropertiesToLogicalDataTypeMapping mapping = (JdbcPropertiesToLogicalDataTypeMapping) typeMapping;
List<String> primaryKeysInDb = this.fetchPrimaryKeys(tableName, schemaName, databaseName);
com.google.cloud.bigquery.Table table = this.bigQuery.getTable(TableId.of(schemaName, tableName));

Expand Down
Loading