Skip to content

Commit

Permalink
Adding the API's and logic for sink clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
rengam32 committed Mar 5, 2024
1 parent 4761b3e commit 332b6b3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package org.finos.legend.engine.persistence.components;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink;
import org.finos.legend.engine.persistence.components.relational.api.RelationalSinkCleaner;
import org.finos.legend.engine.persistence.components.relational.api.SinkCleanupGeneratorResult;
import org.finos.legend.engine.persistence.components.util.LockInfoDataset;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -30,13 +30,19 @@
public class RelationalSinkCleanerTest extends IngestModeTest
{
private static DatasetDefinition mainTable;
private static LockInfoDataset lockTable = LockInfoDataset.builder().name("lock_info").build();
private final static LockInfoDataset lockTable = LockInfoDataset.builder().name("lock_info").build();
protected SchemaDefinition mainTableSchema = SchemaDefinition.builder()
.addFields(id)
.addFields(name)
.addFields(batchIdIn)
.addFields(batchIdOut)
.build();
private final MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build();
private final String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"batch_start_ts_utc\" DATETIME,\"batch_end_ts_utc\" DATETIME,\"batch_status\" VARCHAR(32),\"requested_by\" VARCHAR(32))";
private final String dropMainTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main\" CASCADE";
private final String deleteFromMetadataTableQuery = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'";
private final String insertToAuditTableQuery = "INSERT INTO sink_cleanup_audit (\"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'SUCCEEDED','lh_dev')";


@BeforeEach
void initializeTables()
Expand All @@ -53,53 +59,46 @@ void testGenerateOperationsForSinkCleanup()
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(AnsiSqlSink.get())
.mainDataset(mainTable)
.metadataDataset(metadata)
.executionTimestampClock(fixedClock_2000_01_01)
.requestedBy("lh_dev")
.build();
SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup();

List<String> preActionsSql = result.preActionsSql();
String creationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"batch_start_ts_utc\" DATETIME,\"batch_end_ts_utc\" DATETIME,\"batch_status\" VARCHAR(32),\"requested_by\" VARCHAR(32))";
Assertions.assertEquals(creationQuery, preActionsSql.get(0));

String dropMainTable = "DROP TABLE IF EXISTS \"mydb\".\"main\" CASCADE";
String deleteFromMetadataTable = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'";
String insertToAuditTable = "INSERT INTO sink_cleanup_audit (\"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE','lh_dev')";
Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0));

List<String> cleanupSql = result.cleanupSql();
Assertions.assertEquals(dropMainTable, cleanupSql.get(0));
Assertions.assertEquals(deleteFromMetadataTable, cleanupSql.get(1));
Assertions.assertEquals(insertToAuditTable, cleanupSql.get(2));
Assertions.assertEquals(dropMainTableQuery, cleanupSql.get(0));
Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(1));
Assertions.assertEquals(insertToAuditTableQuery, cleanupSql.get(2));
}

@Test
void testGenerateOperationsForSinkCleanupWithConcurrencyFlagAndUpperCase()
void testGenerateOperationsForSinkCleanupWithConcurrencyFlag()
{
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(AnsiSqlSink.get())
.mainDataset(mainTable)
.executionTimestampClock(fixedClock_2000_01_01)
.enableConcurrentSafety(true)
.lockInfoDataset(lockTable)
.caseConversion(CaseConversion.TO_UPPER)
.metadataDataset(metadata)
.requestedBy("lh_dev")
.build();
SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup();

List<String> preActionsSql = result.preActionsSql();
String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS SINK_CLEANUP_AUDIT(\"TABLE_NAME\" VARCHAR(255),\"BATCH_START_TS_UTC\" DATETIME,\"BATCH_END_TS_UTC\" DATETIME,\"BATCH_STATUS\" VARCHAR(32),\"REQUESTED_BY\" VARCHAR(32))";
String lockTableCreationQuery = "CREATE TABLE IF NOT EXISTS LOCK_INFO(\"INSERT_TS_UTC\" DATETIME,\"LAST_USED_TS_UTC\" DATETIME,\"TABLE_NAME\" VARCHAR UNIQUE)";
String lockTableCreationQuery = "CREATE TABLE IF NOT EXISTS lock_info(\"insert_ts_utc\" DATETIME,\"last_used_ts_utc\" DATETIME,\"table_name\" VARCHAR UNIQUE)";
Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0));
Assertions.assertEquals(lockTableCreationQuery, preActionsSql.get(1));

//todo : table name (main) in small ?
String initializeLockQuery = "INSERT INTO LOCK_INFO (\"INSERT_TS_UTC\", \"TABLE_NAME\") (SELECT '2000-01-01 00:00:00.000000','main' WHERE NOT (EXISTS (SELECT * FROM LOCK_INFO as lock_info)))";
String acquireLockQuery = "UPDATE LOCK_INFO as lock_info SET lock_info.\"LAST_USED_TS_UTC\" = '2000-01-01 00:00:00.000000'";
String initializeLockQuery = "INSERT INTO lock_info (\"insert_ts_utc\", \"table_name\") (SELECT '2000-01-01 00:00:00.000000','main' WHERE NOT (EXISTS (SELECT * FROM lock_info as lock_info)))";
String acquireLockQuery = "UPDATE lock_info as lock_info SET lock_info.\"last_used_ts_utc\" = '2000-01-01 00:00:00.000000'";
Assertions.assertEquals(initializeLockQuery, result.initializeLockSql().get(0));
Assertions.assertEquals(acquireLockQuery, result.acquireLockSql().get(0));

String dropMainTableQuery = "DROP TABLE IF EXISTS \"MYDB\".\"MAIN\" CASCADE";
String deleteFromMetadataTableQuery = "DELETE FROM BATCH_METADATA as batch_metadata WHERE UPPER(batch_metadata.\"TABLE_NAME\") = 'MAIN'";
String insertToAuditTableQuery = "INSERT INTO SINK_CLEANUP_AUDIT (\"TABLE_NAME\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"REQUESTED_BY\") (SELECT 'main','2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE','lh_dev')";
List<String> cleanupSql = result.cleanupSql();
Assertions.assertEquals(dropMainTableQuery, cleanupSql.get(0));
Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.TransformOptions;
import org.finos.legend.engine.persistence.components.transformer.Transformer;
import org.finos.legend.engine.persistence.components.util.*;
import org.finos.legend.engine.persistence.components.util.LockInfoDataset;
import org.finos.legend.engine.persistence.components.util.LockInfoUtils;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.finos.legend.engine.persistence.components.util.SinkCleanupAuditDataset;
import org.immutables.value.Value;
import org.immutables.value.Value.Default;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,24 +72,14 @@ public SinkCleanupAuditDataset auditDataset()
return SinkCleanupAuditDataset.builder().build();
}

@Default
public MetadataDataset metadataDataset()
{
return MetadataDataset.builder().build();
}
public abstract MetadataDataset metadataDataset();

@Default
public Clock executionTimestampClock()
{
return Clock.systemUTC();
}

@Default
public CaseConversion caseConversion()
{
return CaseConversion.NONE;
}

@Default
public boolean enableConcurrentSafety()
{
Expand All @@ -97,12 +89,7 @@ public boolean enableConcurrentSafety()
@Value.Derived
protected TransformOptions transformOptions()
{
TransformOptions.Builder builder = TransformOptions.builder()
.executionTimestampClock(executionTimestampClock());

relationalSink().optimizerForCaseConversion(caseConversion()).ifPresent(builder::addOptimizers);

return builder.build();
return TransformOptions.builder().executionTimestampClock(executionTimestampClock()).build();
}

// ---------- Private Fields ----------
Expand Down Expand Up @@ -154,7 +141,6 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne
initExecutor(connection);

// 2. Generate sink cleanup Operations
// todo : need to enrich datastes? (enrichedDatasets = ApiUtils.enrichAndApplyCase(datasets, caseConversion());)
LOGGER.info("Generating SQL's for sink cleanup");
SinkCleanupGeneratorResult result = generateOperationsForSinkCleanup();

Expand All @@ -181,11 +167,13 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne
executor.commit();
ingestorResult = SinkCleanupIngestorResult.builder().status(IngestStatus.SUCCEEDED).build();
}
//todo : throw exception vs return failure status?
catch (Exception e)
{
executor.revert();
throw e;
ingestorResult = SinkCleanupIngestorResult.builder()
.status(IngestStatus.FAILED)
.message(e.toString())
.build();
}
finally
{
Expand All @@ -195,7 +183,6 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne
}

// ---------- UTILITY METHODS ----------

private Executor initExecutor(RelationalConnection connection)
{
LOGGER.info("Invoked initExecutor method, will initialize the executor");
Expand Down Expand Up @@ -225,7 +212,7 @@ private LogicalPlan buildLogicalPlanForCleanupActions()

private Operation buildDeleteCondition()
{
StringValue mainTableName = getMainTableName();
StringValue mainTableName = getMainTable();
FieldValue tableNameFieldValue = FieldValue.builder().datasetRef(metadataDataset().get().datasetReference()).fieldName(metadataDataset().tableNameField()).build();
FunctionImpl tableNameInUpperCase = FunctionImpl.builder().functionName(FunctionName.UPPER).addValue(tableNameFieldValue).build();
StringValue mainTableNameInUpperCase = StringValue.builder().value(mainTableName.value().map(field -> field.toUpperCase()))
Expand All @@ -251,16 +238,16 @@ private Operation buildInsertCondition()
fieldsToInsert.add(requestedBy);

List<org.finos.legend.engine.persistence.components.logicalplan.values.Value> selectFields = new ArrayList<>();
selectFields.add(getMainTableName());
selectFields.add(getMainTable());
selectFields.add(BatchStartTimestamp.INSTANCE);
selectFields.add(BatchEndTimestamp.INSTANCE);
selectFields.add(StringValue.of(MetadataUtils.MetaTableStatus.DONE.toString()));
selectFields.add(StringValue.of(IngestStatus.SUCCEEDED.name()));
selectFields.add(StringValue.of(requestedBy()));

return Insert.of(auditDataset().get(), Selection.builder().addAllFields(selectFields).build(), fieldsToInsert);
}

private StringValue getMainTableName()
private StringValue getMainTable()
{
return StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.finos.legend.engine.persistence.components.relational.h2.H2Sink;
import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection;
import org.finos.legend.engine.persistence.components.util.LockInfoDataset;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -34,14 +35,16 @@ public class RelationalSinkCleanerTest extends BaseTest
@Test
void testExecuteSinkCleanup()
{
MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build();
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
createSampleMainTableWithData();
createBatchMetadataTableWithData();
createSampleMainTableWithData(mainTable.name());
createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name());
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(H2Sink.get())
.mainDataset(mainTable)
.executionTimestampClock(fixedClock_2000_01_01)
.requestedBy("lh_dev")
.metadataDataset(metadata)
.build();

//Table counts before sink cleanup
Expand All @@ -61,17 +64,19 @@ void testExecuteSinkCleanup()
@Test
void testExecuteSinkCleanupWithConcurrency()
{
MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build();
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
LockInfoDataset lockTable = LockInfoDataset.builder().name("lock_info").build();
createSampleMainTableWithData();
createBatchMetadataTableWithData();
createSampleMainTableWithData(mainTable.name());
createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name());
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(H2Sink.get())
.mainDataset(mainTable)
.executionTimestampClock(fixedClock_2000_01_01)
.enableConcurrentSafety(true)
.lockInfoDataset(lockTable)
.requestedBy("lh_dev")
.metadataDataset(metadata)
.build();

//Table counts before sink cleanup
Expand All @@ -88,29 +93,59 @@ void testExecuteSinkCleanupWithConcurrency()
Assertions.assertEquals(tableAfterSinkCleanup.get(0).get("batch_metadata_count"), 0L);
}

private void createBatchMetadataTableWithData()
@Test
void testExecuteSinkCleanupWithFailureStatus()
{
List<String> list = new ArrayList<>();
list.add("CREATE TABLE IF NOT EXISTS batch_metadata" +
"(`table_name` VARCHAR(255)," +
MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build();
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
createSampleMainTableWithData(mainTable.name());
createBatchMetadataTableWithData(metadata.metadataDatasetName(), mainTable.name());
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(H2Sink.get())
.mainDataset(mainTable)
.executionTimestampClock(fixedClock_2000_01_01)
.requestedBy("lh_dev")
.metadataDataset(metadata.withMetadataDatasetName("batch_metadata2"))
.build();

SinkCleanupIngestorResult result = sinkCleaner.executeOperationsForSinkCleanup(JdbcConnection.of(h2Sink.connection()));
Assertions.assertEquals(result.status(), IngestStatus.FAILED);
System.out.println(result.message().get());
}

private void createBatchMetadataTableWithData(String metaTableName, String mainTableName)
{
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + metaTableName +
" (`table_name` VARCHAR(255)," +
"`batch_start_ts_utc` DATETIME," +
"`batch_end_ts_utc` DATETIME," +
"`batch_status` VARCHAR(32)," +
"`table_batch_id` INTEGER," +
"`batch_source_info` JSON," +
"`ADDITIONAL_METADATA` JSON)");
list.add("INSERT INTO batch_metadata " +
"(\"table_name\", \"table_batch_id\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\")" +
" (SELECT 'main',1,'2000-01-01 00:00:00.000000','2000-01-01 00:00:00.000000','DONE')");
"`ADDITIONAL_METADATA` JSON)";
String insertData = "INSERT INTO " + metaTableName +
" (\"table_name\", \"table_batch_id\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\")" +
" (SELECT '" + mainTableName + "',1,'2000-01-01 00:00:00.000000','2000-01-01 00:00:00.000000','DONE')";
List<String> list = new ArrayList<>();
if (metaTableName.toUpperCase().equals(metaTableName))
{
list.add(createMetaTable.toUpperCase());
list.add(insertData.toUpperCase());
}
else
{
list.add(createMetaTable);
list.add(insertData);
}
h2Sink.executeStatements(list);
}

private void createSampleMainTableWithData()
private void createSampleMainTableWithData(String tableName)
{
List<String> list = new ArrayList<>();
list.add("CREATE TABLE main(ID INT PRIMARY KEY, NAME VARCHAR(255), BIRTH DATETIME)");
list.add("INSERT INTO main VALUES (1, 'A', '2020-01-01 00:00:00')");
list.add("INSERT INTO main VALUES (2, 'B', '2021-01-01 00:00:00')");
list.add("CREATE TABLE " + tableName + " (ID INT PRIMARY KEY, NAME VARCHAR(255), BIRTH DATETIME)");
list.add("INSERT INTO " + tableName + " VALUES (1, 'A', '2020-01-01 00:00:00')");
list.add("INSERT INTO " + tableName + " VALUES (2, 'B', '2021-01-01 00:00:00')");
h2Sink.executeStatements(list);
}
}

0 comments on commit 332b6b3

Please sign in to comment.