diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SinkCleanupAuditDatasetAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SinkCleanupAuditDatasetAbstract.java deleted file mode 100644 index 7c29083ab7e..00000000000 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SinkCleanupAuditDatasetAbstract.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2024 Goldman Sachs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package org.finos.legend.engine.persistence.components.util; - -import org.finos.legend.engine.persistence.components.logicalplan.datasets.*; - -import java.util.Optional; - -import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.DEFAULT_SINK_CLEAN_UP_AUDIT_TABLE; -import static org.immutables.value.Value.*; - -@Immutable -@Style( - typeAbstract = "*Abstract", - typeImmutable = "*", - jdkOnly = true, - optionalAcceptNullable = true, - strictBuilder = true -) -public interface SinkCleanupAuditDatasetAbstract -{ - Optional auditDatasetDatabaseName(); - - Optional auditDatasetGroupName(); - - @Default - default String auditDatasetName() - { - return DEFAULT_SINK_CLEAN_UP_AUDIT_TABLE; - } - - @Default - default String tableNameField() - { - return "table_name"; - } - - @Default - default String requestedBy() - { - return "requested_by"; - } - - @Default - default String executionTimeField() - { - return "execution_ts_utc"; - } - - @Default - default String statusField() - { - return "status"; - } - - - @Derived - default Dataset get() - { - return DatasetDefinition.builder() - .database(auditDatasetDatabaseName()) - .group(auditDatasetGroupName()) - .name(auditDatasetName()) - .schema(SchemaDefinition.builder() - .addFields(Field.builder().name(tableNameField()).type(FieldType.of(DataType.VARCHAR, 255, null)).build()) - .addFields(Field.builder().name(executionTimeField()).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).build()) - .addFields(Field.builder().name(statusField()).type(FieldType.of(DataType.VARCHAR, 32, null)).build()) - .addFields(Field.builder().name(requestedBy()).type(FieldType.of(DataType.VARCHAR, 64, null)).build()) - .build()) - .build(); - } -} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java index 548d80fd5de..ddc79e1c3a9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java @@ -37,10 +37,9 @@ public class RelationalSinkCleanerTest extends IngestModeTest .addFields(batchIdOut) .build(); private final MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); - private final String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"execution_ts_utc\" DATETIME,\"status\" VARCHAR(32),\"requested_by\" VARCHAR(64))"; private final String dropMainTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main\""; + private final String dropLockTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main_legend_persistence_lock\""; private final String deleteFromMetadataTableQuery = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'"; - private final String insertToAuditTableQuery = "INSERT INTO sink_cleanup_audit (\"table_name\", \"execution_ts_utc\", \"status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000','SUCCEEDED','lh_dev')"; @BeforeEach @@ -64,13 +63,30 @@ void testGenerateOperationsForSinkCleanup() .build(); SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup(); - List preActionsSql = result.preActionsSql(); + List cleanupSql = result.cleanupSql(); + Assertions.assertEquals(2, result.dropSql().size()); + Assertions.assertEquals(dropMainTableQuery, result.dropSql().get(0)); + Assertions.assertEquals(dropLockTableQuery, result.dropSql().get(1)); + Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(0)); + } - Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0)); + @Test + void testGenerateOperationsForSinkCleanupWithLockTable() + { + RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() + .relationalSink(AnsiSqlSink.get()) + .mainDataset(mainTable) + .metadataDataset(metadata) + .executionTimestampClock(fixedClock_2000_01_01) + .requestedBy("lh_dev") + .lockDataset(LockInfoDataset.builder().name("lock_table").group("mydb").build()) + .build(); + SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup(); List cleanupSql = result.cleanupSql(); + Assertions.assertEquals(2, result.dropSql().size()); Assertions.assertEquals(dropMainTableQuery, result.dropSql().get(0)); + Assertions.assertEquals("DROP TABLE IF EXISTS \"mydb\".\"lock_table\"", result.dropSql().get(1)); Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(0)); - Assertions.assertEquals(insertToAuditTableQuery, cleanupSql.get(1)); } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java index b5955b52ccb..fe2419e59de 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java @@ -52,7 +52,7 @@ public class ApiUtils { - private static final String LOCK_INFO_DATASET_SUFFIX = "_legend_persistence_lock"; + public static final String LOCK_INFO_DATASET_SUFFIX = "_legend_persistence_lock"; public static Dataset deriveMainDatasetFromStaging(Datasets datasets, IngestMode ingestMode) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java index da209aea4f5..a49840e58d6 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java @@ -19,8 +19,6 @@ import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; import org.finos.legend.engine.persistence.components.logicalplan.operations.*; import org.finos.legend.engine.persistence.components.logicalplan.values.*; import org.finos.legend.engine.persistence.components.relational.RelationalSink; @@ -30,8 +28,8 @@ import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer; import org.finos.legend.engine.persistence.components.transformer.TransformOptions; import org.finos.legend.engine.persistence.components.transformer.Transformer; +import org.finos.legend.engine.persistence.components.util.LockInfoDataset; import org.finos.legend.engine.persistence.components.util.MetadataDataset; -import org.finos.legend.engine.persistence.components.util.SinkCleanupAuditDataset; import org.immutables.value.Value; import org.immutables.value.Value.Default; import org.slf4j.Logger; @@ -40,6 +38,9 @@ import java.time.Clock; import java.util.ArrayList; import java.util.List; +import java.util.Optional; + +import static org.finos.legend.engine.persistence.components.relational.api.ApiUtils.LOCK_INFO_DATASET_SUFFIX; @Value.Immutable @Value.Style( @@ -60,14 +61,10 @@ public abstract class RelationalSinkCleanerAbstract public abstract String requestedBy(); - @Default - public SinkCleanupAuditDataset auditDataset() - { - return SinkCleanupAuditDataset.builder().build(); - } - public abstract MetadataDataset metadataDataset(); + public abstract Optional lockDataset(); + @Default public Clock executionTimestampClock() { @@ -89,17 +86,12 @@ public SinkCleanupGeneratorResult generateOperationsForSinkCleanup() { Transformer transformer = new RelationalTransformer(relationalSink(), transformOptions()); - // Pre-action SQL - LogicalPlan preActionsLogicalPlan = buildLogicalPlanForPreActions(); - SqlPlan preActionsSqlPlan = transformer.generatePhysicalPlan(preActionsLogicalPlan); - //Sink clean-up SQL's LogicalPlan dropLogicalPlan = buildLogicalPlanForDropActions(); SqlPlan dropSqlPlan = transformer.generatePhysicalPlan(dropLogicalPlan); - LogicalPlan cleanupLogicalPlan = buildLogicalPlanForCleanupAndAuditActions(); + LogicalPlan cleanupLogicalPlan = buildLogicalPlanForMetadataCleanup(); SqlPlan cleanupSqlPlan = transformer.generatePhysicalPlan(cleanupLogicalPlan); return SinkCleanupGeneratorResult.builder() - .preActionsSqlPlan(preActionsSqlPlan) .cleanupSqlPlan(cleanupSqlPlan) .dropSqlPlan(dropSqlPlan) .build(); @@ -114,11 +106,7 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne LOGGER.info("Generating SQL's for sink cleanup"); SinkCleanupGeneratorResult result = generateOperationsForSinkCleanup(); - //3. Create datasets - LOGGER.info("Creating the datasets"); - executor.executePhysicalPlan(result.preActionsSqlPlan()); - - //4. Execute sink cleanup operations + //3. Execute sink cleanup operations return executeSinkCleanup(result); } @@ -145,25 +133,38 @@ public Executor initExecutor(RelationalConnection connection) // ---------- UTILITY METHODS ---------- - private LogicalPlan buildLogicalPlanForPreActions() + private LogicalPlan buildLogicalPlanForDropActions() { List operations = new ArrayList<>(); - operations.add(Create.of(true, auditDataset().get())); + operations.add(Drop.of(true, mainDataset(), false)); + operations.add(buildDropPlanForLockTable()); return LogicalPlan.of(operations); } - private LogicalPlan buildLogicalPlanForDropActions() + private Operation buildDropPlanForLockTable() { - List operations = new ArrayList<>(); - operations.add(Drop.of(true, mainDataset(), false)); - return LogicalPlan.of(operations); + LockInfoDataset lockInfoDataset; + if (lockDataset().isPresent()) + { + lockInfoDataset = lockDataset().get(); + } + else + { + String datasetName = mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new); + String lockDatasetName = datasetName + LOCK_INFO_DATASET_SUFFIX; + lockInfoDataset = LockInfoDataset.builder() + .database(mainDataset().datasetReference().database()) + .group(mainDataset().datasetReference().group()) + .name(lockDatasetName) + .build(); + } + return Drop.of(true, lockInfoDataset.get(), false); } - private LogicalPlan buildLogicalPlanForCleanupAndAuditActions() + private LogicalPlan buildLogicalPlanForMetadataCleanup() { List operations = new ArrayList<>(); operations.add(buildDeleteCondition()); - operations.add(buildInsertCondition(AuditTableStatus.SUCCEEDED)); return LogicalPlan.of(operations); } @@ -179,29 +180,6 @@ private Operation buildDeleteCondition() return Delete.of(metadataDataset().get(), whereCondition); } - private Operation buildInsertCondition(AuditTableStatus status) - { - DatasetReference auditTableRef = this.auditDataset().get().datasetReference(); - FieldValue tableName = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().tableNameField()).build(); - FieldValue executionTs = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().executionTimeField()).build(); - FieldValue auditStatus = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().statusField()).build(); - FieldValue requestedBy = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().requestedBy()).build(); - - List fieldsToInsert = new ArrayList<>(); - fieldsToInsert.add(tableName); - fieldsToInsert.add(executionTs); - fieldsToInsert.add(auditStatus); - fieldsToInsert.add(requestedBy); - - List selectFields = new ArrayList<>(); - selectFields.add(getMainTable()); - selectFields.add(BatchStartTimestamp.INSTANCE); - selectFields.add(StringValue.of(status.name())); - selectFields.add(StringValue.of(requestedBy())); - - return Insert.of(auditDataset().get(), Selection.builder().addAllFields(selectFields).build(), fieldsToInsert); - } - private StringValue getMainTable() { return StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SinkCleanupGeneratorResultAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SinkCleanupGeneratorResultAbstract.java index bd97898935d..4141b70a52e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SinkCleanupGeneratorResultAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SinkCleanupGeneratorResultAbstract.java @@ -31,18 +31,11 @@ ) public abstract class SinkCleanupGeneratorResultAbstract { - public abstract SqlPlan preActionsSqlPlan(); public abstract SqlPlan dropSqlPlan(); public abstract SqlPlan cleanupSqlPlan(); - - public List preActionsSql() - { - return preActionsSqlPlan().getSqlList(); - } - public List cleanupSql() { return cleanupSqlPlan().getSqlList(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java index dcf30bc8b21..c832ad1799a 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java @@ -16,11 +16,13 @@ package org.finos.legend.engine.persistence.components; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.relational.api.ApiUtils; import org.finos.legend.engine.persistence.components.relational.api.IngestStatus; import org.finos.legend.engine.persistence.components.relational.api.RelationalSinkCleaner; import org.finos.legend.engine.persistence.components.relational.api.SinkCleanupIngestorResult; import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; +import org.finos.legend.engine.persistence.components.util.LockInfoDataset; import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -37,6 +39,53 @@ void testExecuteSinkCleanup() MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); createSampleMainTableWithData(mainTable.name()); + String lockTable = mainTable.name() + ApiUtils.LOCK_INFO_DATASET_SUFFIX; + createLockTable(lockTable); + LockInfoDataset lockDataset = LockInfoDataset.builder() + .database(mainTable.datasetReference().database()) + .group(mainTable.datasetReference().group()) + .name(lockTable) + .build(); + + createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name()); + RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() + .relationalSink(H2Sink.get()) + .mainDataset(mainTable) + .executionTimestampClock(fixedClock_2000_01_01) + .requestedBy("lh_dev") + .metadataDataset(metadata) + .build(); + + //Table counts before sink cleanup + List> tableBeforeSinkCleanup = h2Sink.executeQuery("select count(*) as batch_metadata_count from \"batch_metadata\" where table_name = 'main'"); + Assertions.assertEquals(tableBeforeSinkCleanup.get(0).get("batch_metadata_count"), 1L); + Assertions.assertTrue(h2Sink.doesTableExist(mainTable)); + Assertions.assertTrue(h2Sink.doesTableExist(lockDataset.get())); + + SinkCleanupIngestorResult result = sinkCleaner.executeOperationsForSinkCleanup(JdbcConnection.of(h2Sink.connection())); + Assertions.assertEquals(result.status(), IngestStatus.SUCCEEDED); + + List> tableAfterSinkCleanup = h2Sink.executeQuery("select count(*) as batch_metadata_count from \"batch_metadata\" where table_name = 'main'"); + Assertions.assertEquals(tableAfterSinkCleanup.get(0).get("batch_metadata_count"), 0L); + + Assertions.assertFalse(h2Sink.doesTableExist(mainTable)); + Assertions.assertFalse(h2Sink.doesTableExist(lockDataset.get())); + } + + @Test + void testExecuteSinkCleanupWithLockTable() + { + MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + createSampleMainTableWithData(mainTable.name()); + String lockTable = "lock_table"; + createLockTable(lockTable); + LockInfoDataset lockDataset = LockInfoDataset.builder() + .database(mainTable.datasetReference().database()) + .group(mainTable.datasetReference().group()) + .name(lockTable) + .build(); + createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name()); RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() .relationalSink(H2Sink.get()) @@ -44,20 +93,28 @@ void testExecuteSinkCleanup() .executionTimestampClock(fixedClock_2000_01_01) .requestedBy("lh_dev") .metadataDataset(metadata) + .lockDataset(lockDataset) .build(); //Table counts before sink cleanup List> tableBeforeSinkCleanup = h2Sink.executeQuery("select count(*) as batch_metadata_count from \"batch_metadata\" where table_name = 'main'"); Assertions.assertEquals(tableBeforeSinkCleanup.get(0).get("batch_metadata_count"), 1L); + Assertions.assertTrue(h2Sink.doesTableExist(mainTable)); + Assertions.assertTrue(h2Sink.doesTableExist(lockDataset.get())); SinkCleanupIngestorResult result = sinkCleaner.executeOperationsForSinkCleanup(JdbcConnection.of(h2Sink.connection())); Assertions.assertEquals(result.status(), IngestStatus.SUCCEEDED); - //Table counts after sink cleanup - List> auditTableData = h2Sink.executeQuery("select count(*) as audit_table_count from \"sink_cleanup_audit\" where table_name = 'main'"); - Assertions.assertEquals(auditTableData.get(0).get("audit_table_count"), 1L); List> tableAfterSinkCleanup = h2Sink.executeQuery("select count(*) as batch_metadata_count from \"batch_metadata\" where table_name = 'main'"); Assertions.assertEquals(tableAfterSinkCleanup.get(0).get("batch_metadata_count"), 0L); + + Assertions.assertFalse(h2Sink.doesTableExist(mainTable)); + Assertions.assertFalse(h2Sink.doesTableExist(lockDataset.get())); + } + + private void createLockTable(String lockTable) + { + h2Sink.executeStatement("CREATE TABLE TEST." + lockTable + " (ID INT PRIMARY KEY, NAME VARCHAR(255), BIRTH DATETIME)"); } @Test