Skip to content

Commit

Permalink
Adding the API's and logic for sink clean-up (#2674)
Browse files Browse the repository at this point in the history
  • Loading branch information
rengam32 authored Mar 15, 2024
1 parent 0fecc5f commit 2346a00
Show file tree
Hide file tree
Showing 8 changed files with 638 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class LogicalPlanUtils
{
public static final String INFINITE_BATCH_TIME = "9999-12-31 23:59:59";
public static final String DEFAULT_META_TABLE = "batch_metadata";
public static final String DEFAULT_SINK_CLEAN_UP_AUDIT_TABLE = "sink_cleanup_audit";
public static final String DATA_SPLIT_LOWER_BOUND_PLACEHOLDER = "{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}";
public static final String DATA_SPLIT_UPPER_BOUND_PLACEHOLDER = "{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}";
public static final String UNDERSCORE = "_";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package org.finos.legend.engine.persistence.components.util;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;

import java.util.Optional;

import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.DEFAULT_SINK_CLEAN_UP_AUDIT_TABLE;
import static org.immutables.value.Value.*;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface SinkCleanupAuditDatasetAbstract
{
Optional<String> auditDatasetDatabaseName();

Optional<String> auditDatasetGroupName();

@Default
default String auditDatasetName()
{
return DEFAULT_SINK_CLEAN_UP_AUDIT_TABLE;
}

@Default
default String tableNameField()
{
return "table_name";
}

@Default
default String requestedBy()
{
return "requested_by";
}

@Default
default String executionTimeField()
{
return "execution_ts_utc";
}

@Default
default String statusField()
{
return "status";
}


@Derived
default Dataset get()
{
return DatasetDefinition.builder()
.database(auditDatasetDatabaseName())
.group(auditDatasetGroupName())
.name(auditDatasetName())
.schema(SchemaDefinition.builder()
.addFields(Field.builder().name(tableNameField()).type(FieldType.of(DataType.VARCHAR, 255, null)).build())
.addFields(Field.builder().name(executionTimeField()).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).build())
.addFields(Field.builder().name(statusField()).type(FieldType.of(DataType.VARCHAR, 32, null)).build())
.addFields(Field.builder().name(requestedBy()).type(FieldType.of(DataType.VARCHAR, 64, null)).build())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package org.finos.legend.engine.persistence.components;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;
import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink;
import org.finos.legend.engine.persistence.components.relational.api.RelationalSinkCleaner;
import org.finos.legend.engine.persistence.components.relational.api.SinkCleanupGeneratorResult;
import org.finos.legend.engine.persistence.components.util.LockInfoDataset;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

public class RelationalSinkCleanerTest extends IngestModeTest
{
private static DatasetDefinition mainTable;
protected SchemaDefinition mainTableSchema = SchemaDefinition.builder()
.addFields(id)
.addFields(name)
.addFields(batchIdIn)
.addFields(batchIdOut)
.build();
private final MetadataDataset metadata = MetadataDataset.builder().metadataDatasetName("batch_metadata").build();
private final String auditTableCreationQuery = "CREATE TABLE IF NOT EXISTS sink_cleanup_audit(\"table_name\" VARCHAR(255),\"execution_ts_utc\" DATETIME,\"status\" VARCHAR(32),\"requested_by\" VARCHAR(64))";
private final String dropMainTableQuery = "DROP TABLE IF EXISTS \"mydb\".\"main\"";
private final String deleteFromMetadataTableQuery = "DELETE FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'";
private final String insertToAuditTableQuery = "INSERT INTO sink_cleanup_audit (\"table_name\", \"execution_ts_utc\", \"status\", \"requested_by\") (SELECT 'main','2000-01-01 00:00:00.000000','SUCCEEDED','lh_dev')";


@BeforeEach
void initializeTables()
{
mainTable = DatasetDefinition.builder()
.database(mainDbName).name(mainTableName).alias(mainTableAlias)
.schema(mainTableSchema)
.build();
}

@Test
void testGenerateOperationsForSinkCleanup()
{
RelationalSinkCleaner sinkCleaner = RelationalSinkCleaner.builder()
.relationalSink(AnsiSqlSink.get())
.mainDataset(mainTable)
.metadataDataset(metadata)
.executionTimestampClock(fixedClock_2000_01_01)
.requestedBy("lh_dev")
.build();
SinkCleanupGeneratorResult result = sinkCleaner.generateOperationsForSinkCleanup();

List<String> preActionsSql = result.preActionsSql();

Assertions.assertEquals(auditTableCreationQuery, preActionsSql.get(0));

List<String> cleanupSql = result.cleanupSql();
Assertions.assertEquals(dropMainTableQuery, result.dropSql().get(0));
Assertions.assertEquals(deleteFromMetadataTableQuery, cleanupSql.get(0));
Assertions.assertEquals(insertToAuditTableQuery, cleanupSql.get(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.api;

public enum AuditTableStatus
{
INITIALIZED, SUCCEEDED, FAILED
}
Loading

0 comments on commit 2346a00

Please sign in to comment.