Skip to content

Commit

Permalink
Bulk Load Batch ID and Task ID & PK Validation (#11)
Browse files Browse the repository at this point in the history
* Add PK validation in bulk load

* Resolve conflict

* Remove unnecessary delete

* Introduce bulk load batch id and bulk load task id

* Rename variable
  • Loading branch information
kumuwu authored Oct 4, 2023
1 parent 577082c commit 8d7316a
Show file tree
Hide file tree
Showing 35 changed files with 379 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Dataset visitBulkLoad(BulkLoadAbstract bulkLoad)
}
Field batchIdField = Field.builder()
.name(bulkLoad.batchIdField())
.type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty()))
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
.primaryKey(false)
.build();
mainSchemaFields.add(batchIdField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package org.finos.legend.engine.persistence.components.logicalplan;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad;
import org.finos.legend.engine.persistence.components.ingestmode.IngestMode;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
Expand All @@ -32,12 +33,13 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.TabularValues;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataUtils;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.finos.legend.engine.persistence.components.util.MetadataUtils;

import java.util.List;
import java.util.Optional;

public class LogicalPlanFactory
{
Expand Down Expand Up @@ -91,14 +93,23 @@ public static LogicalPlan getLogicalPlanForConstantStats(String stats, Long valu
.build();
}

public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets, IngestMode ingestMode)
{
StringValue mainTable = StringValue.of(datasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
MetadataDataset metadataDataset = datasets.metadataDataset().isPresent()
? datasets.metadataDataset().get()
: MetadataDataset.builder().build();
MetadataUtils metadataUtils = new MetadataUtils(metadataDataset);
Selection selection = metadataUtils.getBatchId(mainTable).selection();
Selection selection;
if (ingestMode instanceof BulkLoad)
{
BulkLoadMetadataDataset bulkLoadMetadataDataset = datasets.bulkLoadMetadataDataset().orElse(BulkLoadMetadataDataset.builder().build());
BulkLoadMetadataUtils bulkLoadMetadataUtils = new BulkLoadMetadataUtils(bulkLoadMetadataDataset);
selection = bulkLoadMetadataUtils.getBatchId(mainTable).selection();
}
else
{
MetadataDataset metadataDataset = datasets.metadataDataset().orElse(MetadataDataset.builder().build());
MetadataUtils metadataUtils = new MetadataUtils(metadataDataset);
selection = metadataUtils.getBatchId(mainTable).selection();
}

return LogicalPlan.builder().addOps(selection).build();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Drop;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
Expand All @@ -46,7 +46,6 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataUtils;
import org.finos.legend.engine.persistence.components.util.Capability;
Expand All @@ -66,17 +65,20 @@ class BulkLoadPlanner extends Planner
private Dataset tempDataset;
private StagedFilesDataset stagedFilesDataset;
private BulkLoadMetadataDataset bulkLoadMetadataDataset;
private Optional<String> bulkLoadTaskIdValue;

BulkLoadPlanner(Datasets datasets, BulkLoad ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions, capabilities);

// validation
validateNoPrimaryKeysInStageAndMain();
if (!(datasets.stagingDataset() instanceof StagedFilesDataset))
{
throw new IllegalArgumentException("Only StagedFilesDataset are allowed under Bulk Load");
}

bulkLoadTaskIdValue = plannerOptions.bulkLoadTaskIdValue();
stagedFilesDataset = (StagedFilesDataset) datasets.stagingDataset();
bulkLoadMetadataDataset = bulkLoadMetadataDataset().orElseThrow(IllegalStateException::new);

Expand All @@ -93,6 +95,15 @@ class BulkLoadPlanner extends Planner
}
}

private void validateNoPrimaryKeysInStageAndMain()
{
List<String> primaryKeysFromMain = mainDataset().schema().fields().stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toList());
validatePrimaryKeysIsEmpty(primaryKeysFromMain);

List<String> primaryKeysFromStage = stagingDataset().schema().fields().stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toList());
validatePrimaryKeysIsEmpty(primaryKeysFromStage);
}

@Override
protected BulkLoad ingestMode()
{
Expand Down Expand Up @@ -122,7 +133,7 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources)

// Add batch_id field
fieldsToInsert.add(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(ingestMode().batchIdField()).build());
fieldsToSelect.add(BulkLoadBatchIdValue.INSTANCE);
fieldsToSelect.add(new BulkLoadMetadataUtils(bulkLoadMetadataDataset).getBatchId(StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new))));

// Add auditing
if (ingestMode().auditing().accept(AUDIT_ENABLED))
Expand All @@ -146,23 +157,23 @@ private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources)


// Operation 2: Transfer from temp table into target table, adding extra columns at the same time
List<Value> fieldsToSelectFromTemp = new ArrayList<>(tempDataset.schemaReference().fieldValues());
List<Value> fieldsToSelect = new ArrayList<>(tempDataset.schemaReference().fieldValues());
List<Value> fieldsToInsertIntoMain = new ArrayList<>(tempDataset.schemaReference().fieldValues());

// Add digest
ingestMode().digestGenStrategy().accept(new DigestGeneration(mainDataset(), tempDataset, fieldsToSelectFromTemp, fieldsToInsertIntoMain));
ingestMode().digestGenStrategy().accept(new DigestGeneration(mainDataset(), tempDataset, fieldsToSelect, fieldsToInsertIntoMain));

// Add batch_id field
fieldsToInsertIntoMain.add(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(ingestMode().batchIdField()).build());
fieldsToSelectFromTemp.add(BulkLoadBatchIdValue.INSTANCE);
fieldsToSelect.add(new BulkLoadMetadataUtils(bulkLoadMetadataDataset).getBatchId(StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new))));

// Add auditing
if (ingestMode().auditing().accept(AUDIT_ENABLED))
{
addAuditing(fieldsToInsertIntoMain, fieldsToSelectFromTemp);
addAuditing(fieldsToInsertIntoMain, fieldsToSelect);
}

operations.add(Insert.of(mainDataset(), Selection.builder().source(tempDataset).addAllFields(fieldsToSelectFromTemp).build(), fieldsToInsertIntoMain));
operations.add(Insert.of(mainDataset(), Selection.builder().source(tempDataset).addAllFields(fieldsToSelect).build(), fieldsToInsertIntoMain));


return LogicalPlan.of(operations);
Expand Down Expand Up @@ -192,11 +203,8 @@ public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
@Override
public LogicalPlan buildLogicalPlanForPostActions(Resources resources)
{
// there is no need to delete from the temp table for big query because we always use "overwrite" when loading
List<Operation> operations = new ArrayList<>();
if (!transformWhileCopy)
{
operations.add(Delete.builder().dataset(tempDataset).build());
}
return LogicalPlan.of(operations);
}

Expand Down Expand Up @@ -251,9 +259,10 @@ private Selection getRowsBasedOnAppendTimestamp(Dataset dataset, String field, S

private String jsonifyBatchSourceInfo(StagedFilesDatasetProperties stagedFilesDatasetProperties)
{
List<String> files = stagedFilesDatasetProperties.files();
Map<String, Object> batchSourceMap = new HashMap();
List<String> files = stagedFilesDatasetProperties.files();
batchSourceMap.put("files", files);
bulkLoadTaskIdValue.ifPresent(taskId -> batchSourceMap.put("task_id", taskId));
ObjectMapper objectMapper = new ObjectMapper();
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ default boolean enableConcurrentSafety()
{
return false;
}

Optional<String> bulkLoadTaskIdValue();
}

private final Datasets datasets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected Insert sqlToUpsertRows()
sink."batch_id_out" = 999999999 and
not exists
(
sink."digest" <> stage."digest" and sink.primaryKeys = stage.primaryKeys
sink."digest" = stage."digest" and sink.primaryKeys = stage.primaryKeys
)
Partition :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ default Dataset get()
.group(group())
.name(name())
.schema(SchemaDefinition.builder()
.addFields(Field.builder().name(batchIdField()).type(FieldType.of(DataType.VARCHAR, 255, null)).build())
.addFields(Field.builder().name(batchIdField()).type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).build())
.addFields(Field.builder().name(tableNameField()).type(FieldType.of(DataType.VARCHAR, 255, null)).build())
.addFields(Field.builder().name(batchStartTimeField()).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).build())
.addFields(Field.builder().name(batchEndTimeField()).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@

package org.finos.legend.engine.persistence.components.util;

import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchIdValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
import org.finos.legend.engine.persistence.components.logicalplan.values.NumericalValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.SumBinaryValueOperator;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.logicalplan.values.ParseJsonFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchStatusValue;

import java.util.ArrayList;
Expand All @@ -41,6 +47,27 @@ public BulkLoadMetadataUtils(BulkLoadMetadataDataset bulkLoadMetadataDataset)
this.dataset = bulkLoadMetadataDataset.get();
}

/*
SELECT COALESCE(MAX("table_batch_id"),0)+1 FROM batch_metadata WHERE "table_name" = mainTableName
*/
public BatchIdValue getBatchId(StringValue mainTableName)
{
FieldValue tableNameFieldValue = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(bulkLoadMetadataDataset.tableNameField()).build();
FunctionImpl tableNameInUpperCase = FunctionImpl.builder().functionName(FunctionName.UPPER).addValue(tableNameFieldValue).build();
StringValue mainTableNameInUpperCase = StringValue.builder().value(mainTableName.value().map(field -> field.toUpperCase()))
.alias(mainTableName.alias()).build();
Condition whereCondition = Equals.of(tableNameInUpperCase, mainTableNameInUpperCase);
FieldValue tableBatchIdFieldValue = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(bulkLoadMetadataDataset.batchIdField()).build();
FunctionImpl maxBatchId = FunctionImpl.builder().functionName(FunctionName.MAX).addValue(tableBatchIdFieldValue).build();
FunctionImpl coalesce = FunctionImpl.builder().functionName(FunctionName.COALESCE).addValue(maxBatchId, NumericalValue.of(0L)).build();

return BatchIdValue.of(Selection.builder()
.source(dataset)
.condition(whereCondition)
.addFields(SumBinaryValueOperator.of(coalesce, NumericalValue.of(1L)))
.build());
}

/*
INSERT INTO batch_metadata ("batchIdField", "tableNameField", "batchStartTimeField", "batchEndTimeField",
"batchStatusField","batchSourceInfoField")
Expand All @@ -63,7 +90,7 @@ public Insert insertMetaData(StringValue tableNameValue, StringValue batchSource
List<Value> metaSelectFields = new ArrayList<>();

metaInsertFields.add(batchId);
metaSelectFields.add(BulkLoadBatchIdValue.INSTANCE);
metaSelectFields.add(getBatchId(tableNameValue));

metaInsertFields.add(tableName);
metaSelectFields.add(tableNameValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ protected VisitorContext createContext(TransformOptions options)
.batchStartTimestamp(options.batchStartTimestampValue())
.batchIdPattern(options.batchIdPattern())
.infiniteBatchIdValue(options.infiniteBatchIdValue())
.bulkLoadBatchIdValue(options.bulkLoadBatchIdValue())
.bulkLoadBatchStatusPattern(options.bulkLoadBatchStatusPattern())
.addAllOptimizers(options.optimizers())
.quoteIdentifier(sink.quoteIdentifier())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ interface VisitorContextAbstract

Optional<Long> infiniteBatchIdValue();

Optional<String> bulkLoadBatchIdValue();

Optional<String> bulkLoadBatchStatusPattern();

List<Optimizer> optimizers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public Clock executionTimestampClock()

public abstract Optional<Long> infiniteBatchIdValue();

public abstract Optional<String> bulkLoadBatchIdValue();

public abstract Optional<String> bulkLoadBatchStatusPattern();

public abstract List<Optimizer> optimizers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchIdValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchStatusValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Case;
import org.finos.legend.engine.persistence.components.logicalplan.values.DatetimeValue;
Expand Down Expand Up @@ -95,7 +94,6 @@
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BatchEndTimestampVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BatchIdValueVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BatchStartTimestampVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BulkLoadBatchIdValueVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BulkLoadBatchStatusValueVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.CaseVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DatasetAdditionalPropertiesVisitor;
Expand Down Expand Up @@ -235,7 +233,6 @@ public class AnsiSqlSink extends RelationalSink
logicalPlanVisitorByClass.put(Show.class, new ShowVisitor());
logicalPlanVisitorByClass.put(BatchIdValue.class, new BatchIdValueVisitor());
logicalPlanVisitorByClass.put(InfiniteBatchIdValue.class, new InfiniteBatchIdValueVisitor());
logicalPlanVisitorByClass.put(BulkLoadBatchIdValue.class, new BulkLoadBatchIdValueVisitor());
logicalPlanVisitorByClass.put(BulkLoadBatchStatusValue.class, new BulkLoadBatchStatusValueVisitor());

LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass);
Expand Down
Loading

0 comments on commit 8d7316a

Please sign in to comment.