From 332b6b339f578872220557d91b87ed55f12247a4 Mon Sep 17 00:00:00 2001 From: Mythreyi Date: Tue, 5 Mar 2024 17:23:50 +0800 Subject: [PATCH] Adding the API's and logic for sink clean-up --- .../components/RelationalSinkCleanerTest.java | 39 ++++++----- .../api/RelationalSinkCleanerAbstract.java | 41 ++++-------- .../components/RelationalSinkCleanerTest.java | 67 ++++++++++++++----- 3 files changed, 84 insertions(+), 63 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java index fb37ecb5ee6..fcb15a3b802 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java @@ -16,11 +16,11 @@ package org.finos.legend.engine.persistence.components; import org.finos.legend.engine.persistence.components.logicalplan.datasets.*; -import org.finos.legend.engine.persistence.components.relational.CaseConversion; import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink; import org.finos.legend.engine.persistence.components.relational.api.RelationalSinkCleaner; import org.finos.legend.engine.persistence.components.relational.api.SinkCleanupGeneratorResult; import org.finos.legend.engine.persistence.components.util.LockInfoDataset; +import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,13 +30,19 @@ public class RelationalSinkCleanerTest extends IngestModeTest { private static DatasetDefinition mainTable; - private static LockInfoDataset lockTable = LockInfoDataset.builder().name("lock_info").build(); + private final static LockInfoDataset lockTable = LockInfoDataset.builder().name("lock_info").build(); protected SchemaDefinition mainTableSchema = SchemaDefinition.builder() .addFields(id) .addFields(name) .addFields(batchIdIn) .addFields(batchIdOut) .build(); + private final MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); + private final String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"batch_start_ts_utc\" DATETIME,\"batch_end_ts_utc\" DATETIME,\"batch_status\" VARCHAR(32),\"requested_by\" VARCHAR(32))"; + private final String dropMainTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main\" CASCADE"; + private final String deleteFromMetadataTableQuery = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'"; + private final String insertToAuditTableQuery = "INSERT INTO sink_cleanup_audit (\"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'SUCCEEDED','lh_dev')"; + @BeforeEach void initializeTables() @@ -53,26 +59,24 @@ void testGenerateOperationsForSinkCleanup() RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() .relationalSink(AnsiSqlSink.get()) .mainDataset(mainTable) + .metadataDataset(metadata) .executionTimestampClock(fixedClock_2000_01_01) .requestedBy("lh_dev") .build(); SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup(); List preActionsSql = result.preActionsSql(); - String creationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"batch_start_ts_utc\" DATETIME,\"batch_end_ts_utc\" DATETIME,\"batch_status\" VARCHAR(32),\"requested_by\" VARCHAR(32))"; - Assertions.assertEquals(creationQuery, preActionsSql.get(0)); - String dropMainTable = "DROP TABLE IF EXISTS \"mydb\".\"main\" CASCADE"; - String deleteFromMetadataTable = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'"; - String insertToAuditTable = "INSERT INTO sink_cleanup_audit (\"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE','lh_dev')"; + Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0)); + List cleanupSql = result.cleanupSql(); - Assertions.assertEquals(dropMainTable, cleanupSql.get(0)); - Assertions.assertEquals(deleteFromMetadataTable, cleanupSql.get(1)); - Assertions.assertEquals(insertToAuditTable, cleanupSql.get(2)); + Assertions.assertEquals(dropMainTableQuery, cleanupSql.get(0)); + Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(1)); + Assertions.assertEquals(insertToAuditTableQuery, cleanupSql.get(2)); } @Test - void testGenerateOperationsForSinkCleanupWithConcurrencyFlagAndUpperCase() + void testGenerateOperationsForSinkCleanupWithConcurrencyFlag() { RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() .relationalSink(AnsiSqlSink.get()) @@ -80,26 +84,21 @@ void testGenerateOperationsForSinkCleanupWithConcurrencyFlagAndUpperCase() .executionTimestampClock(fixedClock_2000_01_01) .enableConcurrentSafety(true) .lockInfoDataset(lockTable) - .caseConversion(CaseConversion.TO_UPPER) + .metadataDataset(metadata) .requestedBy("lh_dev") .build(); SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup(); List preActionsSql = result.preActionsSql(); - String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS SINK_CLEANUP_AUDIT(\"TABLE_NAME\" VARCHAR(255),\"BATCH_START_TS_UTC\" DATETIME,\"BATCH_END_TS_UTC\" DATETIME,\"BATCH_STATUS\" VARCHAR(32),\"REQUESTED_BY\" VARCHAR(32))"; - String lockTableCreationQuery = "CREATE TABLE IF NOT EXISTS LOCK_INFO(\"INSERT_TS_UTC\" DATETIME,\"LAST_USED_TS_UTC\" DATETIME,\"TABLE_NAME\" VARCHAR UNIQUE)"; + String lockTableCreationQuery = "CREATE TABLE IF NOT EXISTS lock_info(\"insert_ts_utc\" DATETIME,\"last_used_ts_utc\" DATETIME,\"table_name\" VARCHAR UNIQUE)"; Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0)); Assertions.assertEquals(lockTableCreationQuery, preActionsSql.get(1)); - //todo : table name (main) in small ? - String initializeLockQuery = "INSERT INTO LOCK_INFO (\"INSERT_TS_UTC\", \"TABLE_NAME\") (SELECT '2000-01-01 00:00:00.000000','main' WHERE NOT (EXISTS (SELECT * FROM LOCK_INFO as lock_info)))"; - String acquireLockQuery = "UPDATE LOCK_INFO as lock_info SET lock_info.\"LAST_USED_TS_UTC\" = '2000-01-01 00:00:00.000000'"; + String initializeLockQuery = "INSERT INTO lock_info (\"insert_ts_utc\", \"table_name\") (SELECT '2000-01-01 00:00:00.000000','main' WHERE NOT (EXISTS (SELECT * FROM lock_info as lock_info)))"; + String acquireLockQuery = "UPDATE lock_info as lock_info SET lock_info.\"last_used_ts_utc\" = '2000-01-01 00:00:00.000000'"; Assertions.assertEquals(initializeLockQuery, result.initializeLockSql().get(0)); Assertions.assertEquals(acquireLockQuery, result.acquireLockSql().get(0)); - String dropMainTableQuery = "DROP TABLE IF EXISTS \"MYDB\".\"MAIN\" CASCADE"; - String deleteFromMetadataTableQuery = "DELETE FROM BATCH_METADATA as batch_metadata WHERE UPPER(batch_metadata.\"TABLE_NAME\") = 'MAIN'"; - String insertToAuditTableQuery = "INSERT INTO SINK_CLEANUP_AUDIT (\"TABLE_NAME\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"REQUESTED_BY\") (SELECT 'main','2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE','lh_dev')"; List cleanupSql = result.cleanupSql(); Assertions.assertEquals(dropMainTableQuery, cleanupSql.get(0)); Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(1)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java index a5466fe5549..0683bd045d2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSinkCleanerAbstract.java @@ -23,7 +23,6 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; import org.finos.legend.engine.persistence.components.logicalplan.operations.*; import org.finos.legend.engine.persistence.components.logicalplan.values.*; -import org.finos.legend.engine.persistence.components.relational.CaseConversion; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.SqlPlan; import org.finos.legend.engine.persistence.components.relational.sql.TabularData; @@ -31,7 +30,10 @@ import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer; import org.finos.legend.engine.persistence.components.transformer.TransformOptions; import org.finos.legend.engine.persistence.components.transformer.Transformer; -import org.finos.legend.engine.persistence.components.util.*; +import org.finos.legend.engine.persistence.components.util.LockInfoDataset; +import org.finos.legend.engine.persistence.components.util.LockInfoUtils; +import org.finos.legend.engine.persistence.components.util.MetadataDataset; +import org.finos.legend.engine.persistence.components.util.SinkCleanupAuditDataset; import org.immutables.value.Value; import org.immutables.value.Value.Default; import org.slf4j.Logger; @@ -70,11 +72,7 @@ public SinkCleanupAuditDataset auditDataset() return SinkCleanupAuditDataset.builder().build(); } - @Default - public MetadataDataset metadataDataset() - { - return MetadataDataset.builder().build(); - } + public abstract MetadataDataset metadataDataset(); @Default public Clock executionTimestampClock() @@ -82,12 +80,6 @@ public Clock executionTimestampClock() return Clock.systemUTC(); } - @Default - public CaseConversion caseConversion() - { - return CaseConversion.NONE; - } - @Default public boolean enableConcurrentSafety() { @@ -97,12 +89,7 @@ public boolean enableConcurrentSafety() @Value.Derived protected TransformOptions transformOptions() { - TransformOptions.Builder builder = TransformOptions.builder() - .executionTimestampClock(executionTimestampClock()); - - relationalSink().optimizerForCaseConversion(caseConversion()).ifPresent(builder::addOptimizers); - - return builder.build(); + return TransformOptions.builder().executionTimestampClock(executionTimestampClock()).build(); } // ---------- Private Fields ---------- @@ -154,7 +141,6 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne initExecutor(connection); // 2. Generate sink cleanup Operations - // todo : need to enrich datastes? (enrichedDatasets = ApiUtils.enrichAndApplyCase(datasets, caseConversion());) LOGGER.info("Generating SQL's for sink cleanup"); SinkCleanupGeneratorResult result = generateOperationsForSinkCleanup(); @@ -181,11 +167,13 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne executor.commit(); ingestorResult = SinkCleanupIngestorResult.builder().status(IngestStatus.SUCCEEDED).build(); } - //todo : throw exception vs return failure status? catch (Exception e) { executor.revert(); - throw e; + ingestorResult = SinkCleanupIngestorResult.builder() + .status(IngestStatus.FAILED) + .message(e.toString()) + .build(); } finally { @@ -195,7 +183,6 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne } // ---------- UTILITY METHODS ---------- - private Executor initExecutor(RelationalConnection connection) { LOGGER.info("Invoked initExecutor method, will initialize the executor"); @@ -225,7 +212,7 @@ private LogicalPlan buildLogicalPlanForCleanupActions() private Operation buildDeleteCondition() { - StringValue mainTableName = getMainTableName(); + StringValue mainTableName = getMainTable(); FieldValue tableNameFieldValue = FieldValue.builder().datasetRef(metadataDataset().get().datasetReference()).fieldName(metadataDataset().tableNameField()).build(); FunctionImpl tableNameInUpperCase = FunctionImpl.builder().functionName(FunctionName.UPPER).addValue(tableNameFieldValue).build(); StringValue mainTableNameInUpperCase = StringValue.builder().value(mainTableName.value().map(field -> field.toUpperCase())) @@ -251,16 +238,16 @@ private Operation buildInsertCondition() fieldsToInsert.add(requestedBy); List selectFields = new ArrayList<>(); - selectFields.add(getMainTableName()); + selectFields.add(getMainTable()); selectFields.add(BatchStartTimestamp.INSTANCE); selectFields.add(BatchEndTimestamp.INSTANCE); - selectFields.add(StringValue.of(MetadataUtils.MetaTableStatus.DONE.toString())); + selectFields.add(StringValue.of(IngestStatus.SUCCEEDED.name())); selectFields.add(StringValue.of(requestedBy())); return Insert.of(auditDataset().get(), Selection.builder().addAllFields(selectFields).build(), fieldsToInsert); } - private StringValue getMainTableName() + private StringValue getMainTable() { return StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new)); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java index 125e4f81346..9df5ec6eab1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSinkCleanerTest.java @@ -22,6 +22,7 @@ import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; import org.finos.legend.engine.persistence.components.util.LockInfoDataset; +import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -34,14 +35,16 @@ public class RelationalSinkCleanerTest extends BaseTest @Test void testExecuteSinkCleanup() { + MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); - createSampleMainTableWithData(); - createBatchMetadataTableWithData(); + createSampleMainTableWithData(mainTable.name()); + createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name()); RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() .relationalSink(H2Sink.get()) .mainDataset(mainTable) .executionTimestampClock(fixedClock_2000_01_01) .requestedBy("lh_dev") + .metadataDataset(metadata) .build(); //Table counts before sink cleanup @@ -61,10 +64,11 @@ void testExecuteSinkCleanup() @Test void testExecuteSinkCleanupWithConcurrency() { + MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); LockInfoDataset lockTable = LockInfoDataset.builder().name("lock_info").build(); - createSampleMainTableWithData(); - createBatchMetadataTableWithData(); + createSampleMainTableWithData(mainTable.name()); + createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name()); RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() .relationalSink(H2Sink.get()) .mainDataset(mainTable) @@ -72,6 +76,7 @@ void testExecuteSinkCleanupWithConcurrency() .enableConcurrentSafety(true) .lockInfoDataset(lockTable) .requestedBy("lh_dev") + .metadataDataset(metadata) .build(); //Table counts before sink cleanup @@ -88,29 +93,59 @@ void testExecuteSinkCleanupWithConcurrency() Assertions.assertEquals(tableAfterSinkCleanup.get(0).get("batch_metadata_count"), 0L); } - private void createBatchMetadataTableWithData() + @Test + void testExecuteSinkCleanupWithFailureStatus() { - List list = new ArrayList<>(); - list.add("CREATE TABLE IF NOT EXISTS batch_metadata" + - "(`table_name` VARCHAR(255)," + + MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build(); + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + createSampleMainTableWithData(mainTable.name()); + createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name()); + RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder() + .relationalSink(H2Sink.get()) + .mainDataset(mainTable) + .executionTimestampClock(fixedClock_2000_01_01) + .requestedBy("lh_dev") + .metadataDataset(metadata.withMetadataDatasetName("batch_metadata2")) + .build(); + + SinkCleanupIngestorResult result = sinkCleaner.executeOperationsForSinkCleanup(JdbcConnection.of(h2Sink.connection())); + Assertions.assertEquals(result.status(), IngestStatus.FAILED); + System.out.println(result.message().get()); + } + + private void createBatchMetadataTableWithData(String metaTableName, String mainTableName) + { + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + metaTableName + + " (`table_name` VARCHAR(255)," + "`batch_start_ts_utc` DATETIME," + "`batch_end_ts_utc` DATETIME," + "`batch_status` VARCHAR(32)," + "`table_batch_id` INTEGER," + "`batch_source_info` JSON," + - "`ADDITIONAL_METADATA` JSON)"); - list.add("INSERT INTO batch_metadata " + - "(\"table_name\", \"table_batch_id\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\")" + - " (SELECT 'main',1,'2000-01-01 00:00:00.000000','2000-01-01 00:00:00.000000','DONE')"); + "`ADDITIONAL_METADATA` JSON)"; + String insertData = "INSERT INTO " + metaTableName + + " (\"table_name\", \"table_batch_id\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\")" + + " (SELECT '" + mainTableName + "',1,'2000-01-01 00:00:00.000000','2000-01-01 00:00:00.000000','DONE')"; + List list = new ArrayList<>(); + if (metaTableName.toUpperCase().equals(metaTableName)) + { + list.add(createMetaTable.toUpperCase()); + list.add(insertData.toUpperCase()); + } + else + { + list.add(createMetaTable); + list.add(insertData); + } h2Sink.executeStatements(list); } - private void createSampleMainTableWithData() + private void createSampleMainTableWithData(String tableName) { List list = new ArrayList<>(); - list.add("CREATE TABLE main(ID INT PRIMARY KEY, NAME VARCHAR(255), BIRTH DATETIME)"); - list.add("INSERT INTO main VALUES (1, 'A', '2020-01-01 00:00:00')"); - list.add("INSERT INTO main VALUES (2, 'B', '2021-01-01 00:00:00')"); + list.add("CREATE TABLE " + tableName + " (ID INT PRIMARY KEY, NAME VARCHAR(255), BIRTH DATETIME)"); + list.add("INSERT INTO " + tableName + " VALUES (1, 'A', '2020-01-01 00:00:00')"); + list.add("INSERT INTO " + tableName + " VALUES (2, 'B', '2021-01-01 00:00:00')"); h2Sink.executeStatements(list); } }