Skip to content

Commit

Permalink
Refactoring delete dataset functionality (#2717)
Browse files Browse the repository at this point in the history
  • Loading branch information
rengam32 authored Mar 22, 2024
1 parent a519add commit bcfc296
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 152 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ public class RelationalSinkCleanerTest extends IngestModeTest
.addFields(batchIdOut)
.build();
private final MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build();
private final String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"execution_ts_utc\" DATETIME,\"status\" VARCHAR(32),\"requested_by\" VARCHAR(64))";
private final String dropMainTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main\"";
private final String dropLockTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main_legend_persistence_lock\"";
private final String deleteFromMetadataTableQuery = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'";
private final String insertToAuditTableQuery = "INSERT INTO sink_cleanup_audit (\"table_name\", \"execution_ts_utc\", \"status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000','SUCCEEDED','lh_dev')";


@BeforeEach
Expand All @@ -64,13 +63,30 @@ void testGenerateOperationsForSinkCleanup()
.build();
SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup();

List<String> preActionsSql = result.preActionsSql();
List<String> cleanupSql = result.cleanupSql();
Assertions.assertEquals(2, result.dropSql().size());
Assertions.assertEquals(dropMainTableQuery, result.dropSql().get(0));
Assertions.assertEquals(dropLockTableQuery, result.dropSql().get(1));
Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(0));
}

Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0));
@Test
void testGenerateOperationsForSinkCleanupWithLockTable()
{
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(AnsiSqlSink.get())
.mainDataset(mainTable)
.metadataDataset(metadata)
.executionTimestampClock(fixedClock_2000_01_01)
.requestedBy("lh_dev")
.lockDataset(LockInfoDataset.builder().name("lock_table").group("mydb").build())
.build();
SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup();

List<String> cleanupSql = result.cleanupSql();
Assertions.assertEquals(2, result.dropSql().size());
Assertions.assertEquals(dropMainTableQuery, result.dropSql().get(0));
Assertions.assertEquals("DROP TABLE IF EXISTS \"mydb\".\"lock_table\"", result.dropSql().get(1));
Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(0));
Assertions.assertEquals(insertToAuditTableQuery, cleanupSql.get(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

public class ApiUtils
{
private static final String LOCK_INFO_DATASET_SUFFIX = "_legend_persistence_lock";
public static final String LOCK_INFO_DATASET_SUFFIX = "_legend_persistence_lock";

public static Dataset deriveMainDatasetFromStaging(Datasets datasets, IngestMode ingestMode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
Expand All @@ -30,8 +28,8 @@
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.TransformOptions;
import org.finos.legend.engine.persistence.components.transformer.Transformer;
import org.finos.legend.engine.persistence.components.util.LockInfoDataset;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.finos.legend.engine.persistence.components.util.SinkCleanupAuditDataset;
import org.immutables.value.Value;
import org.immutables.value.Value.Default;
import org.slf4j.Logger;
Expand All @@ -40,6 +38,9 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.finos.legend.engine.persistence.components.relational.api.ApiUtils.LOCK_INFO_DATASET_SUFFIX;

@Value.Immutable
@Value.Style(
Expand All @@ -60,14 +61,10 @@ public abstract class RelationalSinkCleanerAbstract

public abstract String requestedBy();

@Default
public SinkCleanupAuditDataset auditDataset()
{
return SinkCleanupAuditDataset.builder().build();
}

public abstract MetadataDataset metadataDataset();

public abstract Optional<LockInfoDataset> lockDataset();

@Default
public Clock executionTimestampClock()
{
Expand All @@ -89,17 +86,12 @@ public SinkCleanupGeneratorResult generateOperationsForSinkCleanup()
{
Transformer<SqlGen, SqlPlan> transformer = new RelationalTransformer(relationalSink(), transformOptions());

// Pre-action SQL
LogicalPlan preActionsLogicalPlan = buildLogicalPlanForPreActions();
SqlPlan preActionsSqlPlan = transformer.generatePhysicalPlan(preActionsLogicalPlan);

//Sink clean-up SQL's
LogicalPlan dropLogicalPlan = buildLogicalPlanForDropActions();
SqlPlan dropSqlPlan = transformer.generatePhysicalPlan(dropLogicalPlan);
LogicalPlan cleanupLogicalPlan = buildLogicalPlanForCleanupAndAuditActions();
LogicalPlan cleanupLogicalPlan = buildLogicalPlanForMetadataCleanup();
SqlPlan cleanupSqlPlan = transformer.generatePhysicalPlan(cleanupLogicalPlan);
return SinkCleanupGeneratorResult.builder()
.preActionsSqlPlan(preActionsSqlPlan)
.cleanupSqlPlan(cleanupSqlPlan)
.dropSqlPlan(dropSqlPlan)
.build();
Expand All @@ -114,11 +106,7 @@ public SinkCleanupIngestorResult executeOperationsForSinkCleanup(RelationalConne
LOGGER.info("Generating SQL's for sink cleanup");
SinkCleanupGeneratorResult result = generateOperationsForSinkCleanup();

//3. Create datasets
LOGGER.info("Creating the datasets");
executor.executePhysicalPlan(result.preActionsSqlPlan());

//4. Execute sink cleanup operations
//3. Execute sink cleanup operations
return executeSinkCleanup(result);
}

Expand All @@ -145,25 +133,38 @@ public Executor initExecutor(RelationalConnection connection)

// ---------- UTILITY METHODS ----------

private LogicalPlan buildLogicalPlanForPreActions()
private LogicalPlan buildLogicalPlanForDropActions()
{
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, auditDataset().get()));
operations.add(Drop.of(true, mainDataset(), false));
operations.add(buildDropPlanForLockTable());
return LogicalPlan.of(operations);
}

private LogicalPlan buildLogicalPlanForDropActions()
private Operation buildDropPlanForLockTable()
{
List<Operation> operations = new ArrayList<>();
operations.add(Drop.of(true, mainDataset(), false));
return LogicalPlan.of(operations);
LockInfoDataset lockInfoDataset;
if (lockDataset().isPresent())
{
lockInfoDataset = lockDataset().get();
}
else
{
String datasetName = mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new);
String lockDatasetName = datasetName + LOCK_INFO_DATASET_SUFFIX;
lockInfoDataset = LockInfoDataset.builder()
.database(mainDataset().datasetReference().database())
.group(mainDataset().datasetReference().group())
.name(lockDatasetName)
.build();
}
return Drop.of(true, lockInfoDataset.get(), false);
}

private LogicalPlan buildLogicalPlanForCleanupAndAuditActions()
private LogicalPlan buildLogicalPlanForMetadataCleanup()
{
List<Operation> operations = new ArrayList<>();
operations.add(buildDeleteCondition());
operations.add(buildInsertCondition(AuditTableStatus.SUCCEEDED));
return LogicalPlan.of(operations);
}

Expand All @@ -179,29 +180,6 @@ private Operation buildDeleteCondition()
return Delete.of(metadataDataset().get(), whereCondition);
}

private Operation buildInsertCondition(AuditTableStatus status)
{
DatasetReference auditTableRef = this.auditDataset().get().datasetReference();
FieldValue tableName = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().tableNameField()).build();
FieldValue executionTs = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().executionTimeField()).build();
FieldValue auditStatus = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().statusField()).build();
FieldValue requestedBy = FieldValue.builder().datasetRef(auditTableRef).fieldName(auditDataset().requestedBy()).build();

List<org.finos.legend.engine.persistence.components.logicalplan.values.Value> fieldsToInsert = new ArrayList<>();
fieldsToInsert.add(tableName);
fieldsToInsert.add(executionTs);
fieldsToInsert.add(auditStatus);
fieldsToInsert.add(requestedBy);

List<org.finos.legend.engine.persistence.components.logicalplan.values.Value> selectFields = new ArrayList<>();
selectFields.add(getMainTable());
selectFields.add(BatchStartTimestamp.INSTANCE);
selectFields.add(StringValue.of(status.name()));
selectFields.add(StringValue.of(requestedBy()));

return Insert.of(auditDataset().get(), Selection.builder().addAllFields(selectFields).build(), fieldsToInsert);
}

private StringValue getMainTable()
{
return StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,11 @@
)
public abstract class SinkCleanupGeneratorResultAbstract
{
public abstract SqlPlan preActionsSqlPlan();

public abstract SqlPlan dropSqlPlan();

public abstract SqlPlan cleanupSqlPlan();


public List<String> preActionsSql()
{
return preActionsSqlPlan().getSqlList();
}

public List<String> cleanupSql()
{
return cleanupSqlPlan().getSqlList();
Expand Down
Loading

0 comments on commit bcfc296

Please sign in to comment.