diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/DeriveMainDatasetSchemaFromStaging.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/DeriveMainDatasetSchemaFromStaging.java index cf333ca3b41..b92a06436fc 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/DeriveMainDatasetSchemaFromStaging.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/DeriveMainDatasetSchemaFromStaging.java @@ -149,7 +149,7 @@ public Dataset visitBulkLoad(BulkLoadAbstract bulkLoad) } Field batchIdField = Field.builder() .name(bulkLoad.batchIdField()) - .type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty())) + .type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())) .primaryKey(false) .build(); mainSchemaFields.add(batchIdField); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java index 33ae2fcad34..b16e938fbc8 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java @@ -15,7 +15,8 @@ package org.finos.legend.engine.persistence.components.logicalplan; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition; +import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad; +import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; @@ -32,12 +33,13 @@ import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue; import org.finos.legend.engine.persistence.components.logicalplan.values.TabularValues; import org.finos.legend.engine.persistence.components.logicalplan.values.Value; +import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset; +import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataUtils; import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils; import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.finos.legend.engine.persistence.components.util.MetadataUtils; import java.util.List; -import java.util.Optional; public class LogicalPlanFactory { @@ -91,14 +93,23 @@ public static LogicalPlan getLogicalPlanForConstantStats(String stats, Long valu .build(); } - public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets) + public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets, IngestMode ingestMode) { StringValue mainTable = StringValue.of(datasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new)); - MetadataDataset metadataDataset = datasets.metadataDataset().isPresent() - ? datasets.metadataDataset().get() - : MetadataDataset.builder().build(); - MetadataUtils metadataUtils = new MetadataUtils(metadataDataset); - Selection selection = metadataUtils.getBatchId(mainTable).selection(); + Selection selection; + if (ingestMode instanceof BulkLoad) + { + BulkLoadMetadataDataset bulkLoadMetadataDataset = datasets.bulkLoadMetadataDataset().orElse(BulkLoadMetadataDataset.builder().build()); + BulkLoadMetadataUtils bulkLoadMetadataUtils = new BulkLoadMetadataUtils(bulkLoadMetadataDataset); + selection = bulkLoadMetadataUtils.getBatchId(mainTable).selection(); + } + else + { + MetadataDataset metadataDataset = datasets.metadataDataset().orElse(MetadataDataset.builder().build()); + MetadataUtils metadataUtils = new MetadataUtils(metadataDataset); + selection = metadataUtils.getBatchId(mainTable).selection(); + } + return LogicalPlan.builder().addOps(selection).build(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/BulkLoadBatchIdValueAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/BulkLoadBatchIdValueAbstract.java deleted file mode 100644 index 9e7b5001aad..00000000000 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/BulkLoadBatchIdValueAbstract.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2023 Goldman Sachs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.finos.legend.engine.persistence.components.logicalplan.values; - -import static org.immutables.value.Value.Immutable; -import static org.immutables.value.Value.Style; - -@Immutable -@Style( - typeAbstract = "*Abstract", - typeImmutable = "*", - jdkOnly = true, - optionalAcceptNullable = true, - strictBuilder = true -) -public interface BulkLoadBatchIdValueAbstract extends Value -{ - BulkLoadBatchIdValue INSTANCE = BulkLoadBatchIdValue.builder().build(); -} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java index 6d2c38027c3..8a49dd3a5fe 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java @@ -27,8 +27,8 @@ import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection; -import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete; import org.finos.legend.engine.persistence.components.logicalplan.operations.Drop; import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert; import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl; @@ -46,7 +46,6 @@ import org.finos.legend.engine.persistence.components.logicalplan.values.Value; import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp; import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; -import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue; import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset; import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataUtils; import org.finos.legend.engine.persistence.components.util.Capability; @@ -66,17 +65,20 @@ class BulkLoadPlanner extends Planner private Dataset tempDataset; private StagedFilesDataset stagedFilesDataset; private BulkLoadMetadataDataset bulkLoadMetadataDataset; + private Optional bulkLoadTaskIdValue; BulkLoadPlanner(Datasets datasets, BulkLoad ingestMode, PlannerOptions plannerOptions, Set capabilities) { super(datasets, ingestMode, plannerOptions, capabilities); // validation + validateNoPrimaryKeysInStageAndMain(); if (!(datasets.stagingDataset() instanceof StagedFilesDataset)) { throw new IllegalArgumentException("Only StagedFilesDataset are allowed under Bulk Load"); } + bulkLoadTaskIdValue = plannerOptions.bulkLoadTaskIdValue(); stagedFilesDataset = (StagedFilesDataset) datasets.stagingDataset(); bulkLoadMetadataDataset = bulkLoadMetadataDataset().orElseThrow(IllegalStateException::new); @@ -93,6 +95,15 @@ class BulkLoadPlanner extends Planner } } + private void validateNoPrimaryKeysInStageAndMain() + { + List primaryKeysFromMain = mainDataset().schema().fields().stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toList()); + validatePrimaryKeysIsEmpty(primaryKeysFromMain); + + List primaryKeysFromStage = stagingDataset().schema().fields().stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toList()); + validatePrimaryKeysIsEmpty(primaryKeysFromStage); + } + @Override protected BulkLoad ingestMode() { @@ -122,7 +133,7 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources) // Add batch_id field fieldsToInsert.add(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(ingestMode().batchIdField()).build()); - fieldsToSelect.add(BulkLoadBatchIdValue.INSTANCE); + fieldsToSelect.add(new BulkLoadMetadataUtils(bulkLoadMetadataDataset).getBatchId(StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new)))); // Add auditing if (ingestMode().auditing().accept(AUDIT_ENABLED)) @@ -146,23 +157,23 @@ private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources) // Operation 2: Transfer from temp table into target table, adding extra columns at the same time - List fieldsToSelectFromTemp = new ArrayList<>(tempDataset.schemaReference().fieldValues()); + List fieldsToSelect = new ArrayList<>(tempDataset.schemaReference().fieldValues()); List fieldsToInsertIntoMain = new ArrayList<>(tempDataset.schemaReference().fieldValues()); // Add digest - ingestMode().digestGenStrategy().accept(new DigestGeneration(mainDataset(), tempDataset, fieldsToSelectFromTemp, fieldsToInsertIntoMain)); + ingestMode().digestGenStrategy().accept(new DigestGeneration(mainDataset(), tempDataset, fieldsToSelect, fieldsToInsertIntoMain)); // Add batch_id field fieldsToInsertIntoMain.add(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(ingestMode().batchIdField()).build()); - fieldsToSelectFromTemp.add(BulkLoadBatchIdValue.INSTANCE); + fieldsToSelect.add(new BulkLoadMetadataUtils(bulkLoadMetadataDataset).getBatchId(StringValue.of(mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new)))); // Add auditing if (ingestMode().auditing().accept(AUDIT_ENABLED)) { - addAuditing(fieldsToInsertIntoMain, fieldsToSelectFromTemp); + addAuditing(fieldsToInsertIntoMain, fieldsToSelect); } - operations.add(Insert.of(mainDataset(), Selection.builder().source(tempDataset).addAllFields(fieldsToSelectFromTemp).build(), fieldsToInsertIntoMain)); + operations.add(Insert.of(mainDataset(), Selection.builder().source(tempDataset).addAllFields(fieldsToSelect).build(), fieldsToInsertIntoMain)); return LogicalPlan.of(operations); @@ -192,11 +203,8 @@ public LogicalPlan buildLogicalPlanForPreActions(Resources resources) @Override public LogicalPlan buildLogicalPlanForPostActions(Resources resources) { + // there is no need to delete from the temp table for big query because we always use "overwrite" when loading List operations = new ArrayList<>(); - if (!transformWhileCopy) - { - operations.add(Delete.builder().dataset(tempDataset).build()); - } return LogicalPlan.of(operations); } @@ -251,9 +259,10 @@ private Selection getRowsBasedOnAppendTimestamp(Dataset dataset, String field, S private String jsonifyBatchSourceInfo(StagedFilesDatasetProperties stagedFilesDatasetProperties) { - List files = stagedFilesDatasetProperties.files(); Map batchSourceMap = new HashMap(); + List files = stagedFilesDatasetProperties.files(); batchSourceMap.put("files", files); + bulkLoadTaskIdValue.ifPresent(taskId -> batchSourceMap.put("task_id", taskId)); ObjectMapper objectMapper = new ObjectMapper(); try { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index a083e1dd370..6123b86eec8 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -97,6 +97,8 @@ default boolean enableConcurrentSafety() { return false; } + + Optional bulkLoadTaskIdValue(); } private final Datasets datasets; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java index 007e3f3054e..da241423103 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java @@ -177,7 +177,7 @@ protected Insert sqlToUpsertRows() sink."batch_id_out" = 999999999 and not exists ( - sink."digest" <> stage."digest" and sink.primaryKeys = stage.primaryKeys + sink."digest" = stage."digest" and sink.primaryKeys = stage.primaryKeys ) Partition : diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataDatasetAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataDatasetAbstract.java index 65054f86e39..3cfea00878f 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataDatasetAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataDatasetAbstract.java @@ -95,7 +95,7 @@ default Dataset get() .group(group()) .name(name()) .schema(SchemaDefinition.builder() - .addFields(Field.builder().name(batchIdField()).type(FieldType.of(DataType.VARCHAR, 255, null)).build()) + .addFields(Field.builder().name(batchIdField()).type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).build()) .addFields(Field.builder().name(tableNameField()).type(FieldType.of(DataType.VARCHAR, 255, null)).build()) .addFields(Field.builder().name(batchStartTimeField()).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).build()) .addFields(Field.builder().name(batchEndTimeField()).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).build()) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataUtils.java index 161a25e345b..0ff58bbbcdc 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/BulkLoadMetadataUtils.java @@ -14,17 +14,23 @@ package org.finos.legend.engine.persistence.components.util; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert; +import org.finos.legend.engine.persistence.components.logicalplan.values.BatchIdValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName; +import org.finos.legend.engine.persistence.components.logicalplan.values.NumericalValue; import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue; import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp; import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp; import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.SumBinaryValueOperator; import org.finos.legend.engine.persistence.components.logicalplan.values.Value; import org.finos.legend.engine.persistence.components.logicalplan.values.ParseJsonFunction; -import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue; import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchStatusValue; import java.util.ArrayList; @@ -41,6 +47,27 @@ public BulkLoadMetadataUtils(BulkLoadMetadataDataset bulkLoadMetadataDataset) this.dataset = bulkLoadMetadataDataset.get(); } + /* + SELECT COALESCE(MAX("table_batch_id"),0)+1 FROM batch_metadata WHERE "table_name" = mainTableName + */ + public BatchIdValue getBatchId(StringValue mainTableName) + { + FieldValue tableNameFieldValue = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(bulkLoadMetadataDataset.tableNameField()).build(); + FunctionImpl tableNameInUpperCase = FunctionImpl.builder().functionName(FunctionName.UPPER).addValue(tableNameFieldValue).build(); + StringValue mainTableNameInUpperCase = StringValue.builder().value(mainTableName.value().map(field -> field.toUpperCase())) + .alias(mainTableName.alias()).build(); + Condition whereCondition = Equals.of(tableNameInUpperCase, mainTableNameInUpperCase); + FieldValue tableBatchIdFieldValue = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(bulkLoadMetadataDataset.batchIdField()).build(); + FunctionImpl maxBatchId = FunctionImpl.builder().functionName(FunctionName.MAX).addValue(tableBatchIdFieldValue).build(); + FunctionImpl coalesce = FunctionImpl.builder().functionName(FunctionName.COALESCE).addValue(maxBatchId, NumericalValue.of(0L)).build(); + + return BatchIdValue.of(Selection.builder() + .source(dataset) + .condition(whereCondition) + .addFields(SumBinaryValueOperator.of(coalesce, NumericalValue.of(1L))) + .build()); + } + /* INSERT INTO batch_metadata ("batchIdField", "tableNameField", "batchStartTimeField", "batchEndTimeField", "batchStatusField","batchSourceInfoField") @@ -63,7 +90,7 @@ public Insert insertMetaData(StringValue tableNameValue, StringValue batchSource List metaSelectFields = new ArrayList<>(); metaInsertFields.add(batchId); - metaSelectFields.add(BulkLoadBatchIdValue.INSTANCE); + metaSelectFields.add(getBatchId(tableNameValue)); metaInsertFields.add(tableName); metaSelectFields.add(tableNameValue); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/AbstractTransformer.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/AbstractTransformer.java index e9cab94d2d7..7f7e667044f 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/AbstractTransformer.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/AbstractTransformer.java @@ -69,7 +69,6 @@ protected VisitorContext createContext(TransformOptions options) .batchStartTimestamp(options.batchStartTimestampValue()) .batchIdPattern(options.batchIdPattern()) .infiniteBatchIdValue(options.infiniteBatchIdValue()) - .bulkLoadBatchIdValue(options.bulkLoadBatchIdValue()) .bulkLoadBatchStatusPattern(options.bulkLoadBatchStatusPattern()) .addAllOptimizers(options.optimizers()) .quoteIdentifier(sink.quoteIdentifier()) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/LogicalPlanVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/LogicalPlanVisitor.java index 7b59312d55f..5801b636aa0 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/LogicalPlanVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/LogicalPlanVisitor.java @@ -51,8 +51,6 @@ interface VisitorContextAbstract Optional infiniteBatchIdValue(); - Optional bulkLoadBatchIdValue(); - Optional bulkLoadBatchStatusPattern(); List optimizers(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/Transformer.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/Transformer.java index be6f7a440d6..3e663deb408 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/Transformer.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/transformer/Transformer.java @@ -57,8 +57,6 @@ public Clock executionTimestampClock() public abstract Optional infiniteBatchIdValue(); - public abstract Optional bulkLoadBatchIdValue(); - public abstract Optional bulkLoadBatchStatusPattern(); public abstract List optimizers(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/AnsiSqlSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/AnsiSqlSink.java index 63a2d109bb4..f7e3d5e6ac4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/AnsiSqlSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/AnsiSqlSink.java @@ -61,7 +61,6 @@ import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp; import org.finos.legend.engine.persistence.components.logicalplan.values.BatchIdValue; import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp; -import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue; import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchStatusValue; import org.finos.legend.engine.persistence.components.logicalplan.values.Case; import org.finos.legend.engine.persistence.components.logicalplan.values.DatetimeValue; @@ -95,7 +94,6 @@ import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BatchEndTimestampVisitor; import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BatchIdValueVisitor; import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BatchStartTimestampVisitor; -import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BulkLoadBatchIdValueVisitor; import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.BulkLoadBatchStatusValueVisitor; import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.CaseVisitor; import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DatasetAdditionalPropertiesVisitor; @@ -235,7 +233,6 @@ public class AnsiSqlSink extends RelationalSink logicalPlanVisitorByClass.put(Show.class, new ShowVisitor()); logicalPlanVisitorByClass.put(BatchIdValue.class, new BatchIdValueVisitor()); logicalPlanVisitorByClass.put(InfiniteBatchIdValue.class, new InfiniteBatchIdValueVisitor()); - logicalPlanVisitorByClass.put(BulkLoadBatchIdValue.class, new BulkLoadBatchIdValueVisitor()); logicalPlanVisitorByClass.put(BulkLoadBatchStatusValue.class, new BulkLoadBatchStatusValueVisitor()); LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/BulkLoadBatchIdValueVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/BulkLoadBatchIdValueVisitor.java deleted file mode 100644 index faf24aac182..00000000000 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/BulkLoadBatchIdValueVisitor.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2023 Goldman Sachs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors; - -import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchIdValue; -import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; -import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.StringValue; -import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; -import org.finos.legend.engine.persistence.components.transformer.VisitorContext; - -public class BulkLoadBatchIdValueVisitor implements LogicalPlanVisitor -{ - @Override - public VisitorResult visit(PhysicalPlanNode prev, BulkLoadBatchIdValue current, VisitorContext context) - { - prev.push(new StringValue(context.bulkLoadBatchIdValue().orElseThrow(IllegalStateException::new), context.quoteIdentifier())); - return new VisitorResult(); - } -} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsAnsiTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsAnsiTest.java index 67a3337de07..2a6d727b17f 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsAnsiTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsAnsiTest.java @@ -24,13 +24,13 @@ public String getExpectedSqlForMetadata() { return "INSERT INTO bulk_load_batch_metadata " + "(\"batch_id\", \"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"batch_source_info\")" + - " (SELECT 'batch_id_123','appeng_log_table_name','2000-01-01 00:00:00',CURRENT_TIMESTAMP(),'',PARSE_JSON('my_lineage_value'))"; + " (SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'APPENG_LOG_TABLE_NAME'),'appeng_log_table_name','2000-01-01 00:00:00',CURRENT_TIMESTAMP(),'',PARSE_JSON('my_lineage_value'))"; } public String getExpectedSqlForMetadataUpperCase() { return "INSERT INTO BULK_LOAD_BATCH_METADATA (\"BATCH_ID\", \"TABLE_NAME\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"BATCH_SOURCE_INFO\") " + - "(SELECT 'batch_id_123','BULK_LOAD_TABLE_NAME','2000-01-01 00:00:00',CURRENT_TIMESTAMP(),'',PARSE_JSON('my_lineage_value'))"; + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"TABLE_NAME\") = 'BULK_LOAD_TABLE_NAME'),'BULK_LOAD_TABLE_NAME','2000-01-01 00:00:00',CURRENT_TIMESTAMP(),'',PARSE_JSON('my_lineage_value'))"; } public RelationalSink getRelationalSink() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsTest.java index 65e5861a277..6e563621a28 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsTest.java @@ -37,7 +37,6 @@ public abstract class BulkLoadDatasetUtilsTest private final TransformOptions transformOptions = TransformOptions .builder() .executionTimestampClock(Clock.fixed(executionZonedDateTime.toInstant(), ZoneOffset.UTC)) - .bulkLoadBatchIdValue("batch_id_123") .bulkLoadBatchStatusPattern("") .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java index 9ea03a4b618..62c3df96ff0 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java @@ -82,6 +82,7 @@ import java.util.Optional; import java.util.Set; +import static org.finos.legend.engine.persistence.components.relational.api.RelationalIngestorAbstract.BATCH_ID_PATTERN; import static org.finos.legend.engine.persistence.components.relational.api.RelationalIngestorAbstract.BATCH_START_TS_PATTERN; public class BigQuerySink extends AnsiSqlSink @@ -263,7 +264,8 @@ public IngestorResult performBulkLoad(Datasets datasets, Executor preActionsSql = operations.preActionsSql(); List ingestSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS `my_db`.`my_name`" + - "(`col_int` INT64 NOT NULL PRIMARY KEY NOT ENFORCED,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`batch_id` STRING,`append_time` DATETIME)"; + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`batch_id` INT64,`append_time` DATETIME)"; String expectedCopySql = "LOAD DATA OVERWRITE `my_db`.`my_name_legend_persistence_temp` " + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON) " + @@ -136,12 +137,16 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledNoExtraOptions() String expectedInsertSql = "INSERT INTO `my_db`.`my_name` " + "(`col_int`, `col_string`, `col_decimal`, `col_datetime`, `col_variant`, `batch_id`, `append_time`) " + - "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,'xyz123',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + + "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,{NEXT_BATCH_ID},PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + "FROM `my_db`.`my_name_legend_persistence_temp` as legend_persistence_temp)"; + String expectedMetadataIngestSql = "INSERT INTO bulk_load_batch_metadata (`batch_id`, `table_name`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`, `batch_source_info`) " + + "(SELECT {NEXT_BATCH_ID},'my_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00'),CURRENT_DATETIME(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"],\"task_id\":\"xyz123\"}'))"; + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedCopySql, ingestSql.get(0)); Assertions.assertEquals(expectedInsertSql, ingestSql.get(1)); + Assertions.assertEquals(expectedMetadataIngestSql, metadataIngestSql.get(0)); Assertions.assertEquals("SELECT 0 as `rowsDeleted`", statsSql.get(ROWS_DELETED)); Assertions.assertEquals("SELECT 0 as `rowsTerminated`", statsSql.get(ROWS_TERMINATED)); @@ -150,7 +155,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledNoExtraOptions() } @Test - public void testBulkLoadWithDigestNotGeneratedAuditEnabledAllOptions() + public void testBulkLoadWithDigestNotGeneratedAuditEnabledAllOptionsNoTaskId() { BulkLoad bulkLoad = BulkLoad.builder() .batchIdField(BATCH_ID) @@ -185,17 +190,17 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledAllOptions() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); List preActionsSql = operations.preActionsSql(); List ingestSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS `my_db`.`my_name`" + - "(`col_int` INT64 NOT NULL PRIMARY KEY NOT ENFORCED,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`batch_id` STRING,`append_time` DATETIME)"; + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`batch_id` INT64,`append_time` DATETIME)"; String expectedCopySql = "LOAD DATA OVERWRITE `my_db`.`my_name_legend_persistence_temp` " + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON) " + @@ -204,12 +209,17 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledAllOptions() String expectedInsertSql = "INSERT INTO `my_db`.`my_name` " + "(`col_int`, `col_string`, `col_decimal`, `col_datetime`, `col_variant`, `batch_id`, `append_time`) " + - "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,'xyz123',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + + "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,(SELECT COALESCE(MAX(bulk_load_batch_metadata.`batch_id`),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`table_name`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + "FROM `my_db`.`my_name_legend_persistence_temp` as legend_persistence_temp)"; + String expectedMetadataIngestSql = "INSERT INTO bulk_load_batch_metadata (`batch_id`, `table_name`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`, `batch_source_info`) " + + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.`batch_id`),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`table_name`) = 'MY_NAME'),'my_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00'),CURRENT_DATETIME(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}'," + + "PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedCopySql, ingestSql.get(0)); Assertions.assertEquals(expectedInsertSql, ingestSql.get(1)); + Assertions.assertEquals(expectedMetadataIngestSql, metadataIngestSql.get(0)); Assertions.assertEquals("SELECT 0 as `rowsDeleted`", statsSql.get(ROWS_DELETED)); Assertions.assertEquals("SELECT 0 as `rowsTerminated`", statsSql.get(ROWS_TERMINATED)); @@ -244,7 +254,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabledNoExtraOptions() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE) .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -254,7 +264,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabledNoExtraOptions() Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS `my_db`.`my_name`" + - "(`col_int` INT64 NOT NULL PRIMARY KEY NOT ENFORCED,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`batch_id` STRING)"; + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`batch_id` INT64)"; String expectedCopySql = "LOAD DATA OVERWRITE `my_db`.`my_name_legend_persistence_temp` " + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON) " + @@ -262,7 +272,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabledNoExtraOptions() String expectedInsertSql = "INSERT INTO `my_db`.`my_name` " + "(`col_int`, `col_string`, `col_decimal`, `col_datetime`, `col_variant`, `batch_id`) " + - "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,'xyz123' " + + "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,(SELECT COALESCE(MAX(bulk_load_batch_metadata.`batch_id`),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`table_name`) = 'MY_NAME') " + "FROM `my_db`.`my_name_legend_persistence_temp` as legend_persistence_temp)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -302,7 +312,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptions() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE) .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -312,7 +322,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptions() Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS `my_db`.`my_name`" + - "(`col_int` INT64 NOT NULL PRIMARY KEY NOT ENFORCED,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`digest` STRING,`batch_id` STRING,`append_time` DATETIME)"; + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON,`digest` STRING,`batch_id` INT64,`append_time` DATETIME)"; String expectedCopySql = "LOAD DATA OVERWRITE `my_db`.`my_name_legend_persistence_temp` " + "(`col_int` INT64,`col_string` STRING,`col_decimal` NUMERIC(5,2),`col_datetime` DATETIME,`col_variant` JSON) " + @@ -320,7 +330,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptions() String expectedInsertSql = "INSERT INTO `my_db`.`my_name` " + "(`col_int`, `col_string`, `col_decimal`, `col_datetime`, `col_variant`, `digest`, `batch_id`, `append_time`) " + - "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,LAKEHOUSE_MD5(TO_JSON(legend_persistence_temp)),'xyz123',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + + "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`,LAKEHOUSE_MD5(TO_JSON(legend_persistence_temp)),(SELECT COALESCE(MAX(bulk_load_batch_metadata.`batch_id`),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`table_name`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + "FROM `my_db`.`my_name_legend_persistence_temp` as legend_persistence_temp)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -360,7 +370,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptionsUpperCase() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE) .caseConversion(CaseConversion.TO_UPPER) .build(); @@ -371,7 +381,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptionsUpperCase() Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS `MY_DB`.`MY_NAME`" + - "(`COL_INT` INT64 NOT NULL PRIMARY KEY NOT ENFORCED,`COL_STRING` STRING,`COL_DECIMAL` NUMERIC(5,2),`COL_DATETIME` DATETIME,`COL_VARIANT` JSON,`DIGEST` STRING,`BATCH_ID` STRING,`APPEND_TIME` DATETIME)"; + "(`COL_INT` INT64,`COL_STRING` STRING,`COL_DECIMAL` NUMERIC(5,2),`COL_DATETIME` DATETIME,`COL_VARIANT` JSON,`DIGEST` STRING,`BATCH_ID` INT64,`APPEND_TIME` DATETIME)"; String expectedCopySql = "LOAD DATA OVERWRITE `MY_DB`.`MY_NAME_LEGEND_PERSISTENCE_TEMP` " + "(`COL_INT` INT64,`COL_STRING` STRING,`COL_DECIMAL` NUMERIC(5,2),`COL_DATETIME` DATETIME,`COL_VARIANT` JSON) " + @@ -379,7 +389,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptionsUpperCase() String expectedInsertSql = "INSERT INTO `MY_DB`.`MY_NAME` " + "(`COL_INT`, `COL_STRING`, `COL_DECIMAL`, `COL_DATETIME`, `COL_VARIANT`, `DIGEST`, `BATCH_ID`, `APPEND_TIME`) " + - "(SELECT legend_persistence_temp.`COL_INT`,legend_persistence_temp.`COL_STRING`,legend_persistence_temp.`COL_DECIMAL`,legend_persistence_temp.`COL_DATETIME`,legend_persistence_temp.`COL_VARIANT`,LAKEHOUSE_MD5(TO_JSON(legend_persistence_temp)),'xyz123',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + + "(SELECT legend_persistence_temp.`COL_INT`,legend_persistence_temp.`COL_STRING`,legend_persistence_temp.`COL_DECIMAL`,legend_persistence_temp.`COL_DATETIME`,legend_persistence_temp.`COL_VARIANT`,LAKEHOUSE_MD5(TO_JSON(legend_persistence_temp)),(SELECT COALESCE(MAX(BULK_LOAD_BATCH_METADATA.`BATCH_ID`),0)+1 FROM BULK_LOAD_BATCH_METADATA as BULK_LOAD_BATCH_METADATA WHERE UPPER(BULK_LOAD_BATCH_METADATA.`TABLE_NAME`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00') " + "FROM `MY_DB`.`MY_NAME_LEGEND_PERSISTENCE_TEMP` as legend_persistence_temp)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -452,7 +462,7 @@ public void testBulkLoadStagedFilesDatasetNotProvided() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(bulkLoad) .relationalSink(BigQuerySink.get()) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsBigQueryTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsBigQueryTest.java index efcf49965fc..739b22c7274 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsBigQueryTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsBigQueryTest.java @@ -24,14 +24,14 @@ public String getExpectedSqlForMetadata() { return "INSERT INTO bulk_load_batch_metadata " + "(`batch_id`, `table_name`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`, `batch_source_info`) " + - "(SELECT 'batch_id_123','appeng_log_table_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00'),CURRENT_DATETIME(),'',PARSE_JSON('my_lineage_value'))"; + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.`batch_id`),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`table_name`) = 'APPENG_LOG_TABLE_NAME'),'appeng_log_table_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00'),CURRENT_DATETIME(),'',PARSE_JSON('my_lineage_value'))"; } public String getExpectedSqlForMetadataUpperCase() { return "INSERT INTO BULK_LOAD_BATCH_METADATA " + "(`BATCH_ID`, `TABLE_NAME`, `BATCH_START_TS_UTC`, `BATCH_END_TS_UTC`, `BATCH_STATUS`, `BATCH_SOURCE_INFO`) " + - "(SELECT 'batch_id_123','BULK_LOAD_TABLE_NAME',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00'),CURRENT_DATETIME(),'',PARSE_JSON('my_lineage_value'))"; + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.`BATCH_ID`),0)+1 FROM BULK_LOAD_BATCH_METADATA as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`TABLE_NAME`) = 'BULK_LOAD_TABLE_NAME'),'BULK_LOAD_TABLE_NAME',PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2000-01-01 00:00:00'),CURRENT_DATETIME(),'',PARSE_JSON('my_lineage_value'))"; } public RelationalSink getRelationalSink() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table1.csv index 259d7359904..e7a4d4b5f4b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table1.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table1.csv @@ -1,7 +1,7 @@ -0,Candy,999.99,2022-01-15T00:00:00,xyz123,2000-01-01T00:00:00 -1,Andy,5.2,2022-01-11T00:00:00,xyz123,2000-01-01T00:00:00 -1,Andy,5.2,2022-01-11T00:00:00,xyz123,2000-01-01T00:00:00 -2,Bella,99.99,2022-01-12T00:00:00,xyz123,2000-01-01T00:00:00 -2,Bella,99.99,2022-01-12T00:00:00,xyz123,2000-01-01T00:00:00 -49,Sandy,123.45,2022-01-13T00:00:00,xyz123,2000-01-01T00:00:00 -50,Mindy,0,2022-01-14T00:00:00,xyz123,2000-01-01T00:00:00 \ No newline at end of file +0,Candy,999.99,2022-01-15T00:00:00,1,2000-01-01T00:00:00 +1,Andy,5.2,2022-01-11T00:00:00,1,2000-01-01T00:00:00 +1,Andy,5.2,2022-01-11T00:00:00,1,2000-01-01T00:00:00 +2,Bella,99.99,2022-01-12T00:00:00,1,2000-01-01T00:00:00 +2,Bella,99.99,2022-01-12T00:00:00,1,2000-01-01T00:00:00 +49,Sandy,123.45,2022-01-13T00:00:00,1,2000-01-01T00:00:00 +50,Mindy,0,2022-01-14T00:00:00,1,2000-01-01T00:00:00 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table2.csv index 4dfc256dd31..c1da46d0fb6 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table2.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/resources/expected/bulk_load/expected_table2.csv @@ -1,4 +1,4 @@ -1,Andy,5.2,2022-01-11T00:00:00,xyz123,2000-01-01T00:00:00 -2,Bella,99.99,2022-01-12T00:00:00,xyz123,2000-01-01T00:00:00 -11,Success,123.45,2022-01-13T00:00:00,xyz123,2000-01-01T00:00:00 -49,Sandy,123.45,2022-01-13T00:00:00,xyz123,2000-01-01T00:00:00 \ No newline at end of file +1,Andy,5.2,2022-01-11T00:00:00,1,2000-01-01T00:00:00 +2,Bella,99.99,2022-01-12T00:00:00,1,2000-01-01T00:00:00 +11,Success,123.45,2022-01-13T00:00:00,1,2000-01-01T00:00:00 +49,Sandy,123.45,2022-01-13T00:00:00,1,2000-01-01T00:00:00 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java index 24689071f88..e597d6451bb 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java @@ -114,11 +114,7 @@ public boolean enableConcurrentSafety() public abstract Optional infiniteBatchIdValue(); - @Default - public String bulkLoadBatchIdValue() - { - return UUID.randomUUID().toString(); - } + public abstract Optional bulkLoadTaskIdValue(); @Default public String bulkLoadBatchStatusPattern() @@ -141,6 +137,7 @@ protected PlannerOptions plannerOptions() .enableSchemaEvolution(enableSchemaEvolution()) .createStagingDataset(createStagingDataset()) .enableConcurrentSafety(enableConcurrentSafety()) + .bulkLoadTaskIdValue(bulkLoadTaskIdValue()) .build(); } @@ -152,7 +149,6 @@ protected TransformOptions transformOptions() .batchStartTimestampPattern(batchStartTimestampPattern()) .batchEndTimestampPattern(batchEndTimestampPattern()) .infiniteBatchIdValue(infiniteBatchIdValue()) - .bulkLoadBatchIdValue(bulkLoadBatchIdValue()) .bulkLoadBatchStatusPattern(bulkLoadBatchStatusPattern()) .batchIdPattern(batchIdPattern()); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index c2d31164dfe..c7f1f7ab612 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -77,7 +77,6 @@ import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; -import java.util.UUID; import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MAX_OF_FIELD; import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MIN_OF_FIELD; @@ -98,8 +97,8 @@ public abstract class RelationalIngestorAbstract private static final String STAGING = "staging"; private static final String UNDERSCORE = "_"; private static final String SINGLE_QUOTE = "'"; - private static final String BATCH_ID_PATTERN = "{NEXT_BATCH_ID_PATTERN}"; + public static final String BATCH_ID_PATTERN = "{NEXT_BATCH_ID_PATTERN}"; public static final String BATCH_START_TS_PATTERN = "{BATCH_START_TIMESTAMP_PLACEHOLDER}"; private static final String BATCH_END_TS_PATTERN = "{BATCH_END_TIMESTAMP_PLACEHOLDER}"; @@ -161,18 +160,14 @@ public Set schemaEvolutionCapabilitySet() return Collections.emptySet(); } - @Default - public String bulkLoadBatchIdValue() - { - return UUID.randomUUID().toString(); - } - //---------- FIELDS ---------- public abstract IngestMode ingestMode(); public abstract RelationalSink relationalSink(); + public abstract Optional bulkLoadTaskIdValue(); + @Derived protected PlannerOptions plannerOptions() { @@ -182,6 +177,7 @@ protected PlannerOptions plannerOptions() .enableSchemaEvolution(enableSchemaEvolution()) .createStagingDataset(createStagingDataset()) .enableConcurrentSafety(enableConcurrentSafety()) + .bulkLoadTaskIdValue(bulkLoadTaskIdValue()) .build(); } @@ -494,7 +490,7 @@ private void init(Datasets datasets) .batchStartTimestampPattern(BATCH_START_TS_PATTERN) .batchEndTimestampPattern(BATCH_END_TS_PATTERN) .batchIdPattern(BATCH_ID_PATTERN) - .bulkLoadBatchIdValue(bulkLoadBatchIdValue()) + .bulkLoadTaskIdValue(bulkLoadTaskIdValue()) .build(); planner = Planners.get(enrichedDatasets, enrichedIngestMode, plannerOptions(), relationalSink().capabilities()); @@ -701,9 +697,9 @@ else if (lowerBound instanceof Number) private Optional getNextBatchId(Datasets datasets, Executor executor, Transformer transformer, IngestMode ingestMode) { - if (ingestMode.accept(IngestModeVisitors.IS_INGEST_MODE_TEMPORAL)) + if (ingestMode.accept(IngestModeVisitors.IS_INGEST_MODE_TEMPORAL) || ingestMode instanceof BulkLoad) { - LogicalPlan logicalPlanForNextBatchId = LogicalPlanFactory.getLogicalPlanForNextBatchId(datasets); + LogicalPlan logicalPlanForNextBatchId = LogicalPlanFactory.getLogicalPlanForNextBatchId(datasets, ingestMode); List tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForNextBatchId)); Optional nextBatchId = Optional.ofNullable(tabularData.stream() .findFirst() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java index 528fb07dec6..9b8ac1db944 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java @@ -77,6 +77,7 @@ import java.util.Map; import java.util.Set; +import static org.finos.legend.engine.persistence.components.relational.api.RelationalIngestorAbstract.BATCH_ID_PATTERN; import static org.finos.legend.engine.persistence.components.relational.api.RelationalIngestorAbstract.BATCH_START_TS_PATTERN; public class H2Sink extends AnsiSqlSink @@ -226,6 +227,7 @@ public IngestorResult performBulkLoad(Datasets datasets, Executor statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"main\"" + - "(\"col_int\" INTEGER NOT NULL PRIMARY KEY,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"batch_id\" VARCHAR,\"append_time\" TIMESTAMP)"; + "(\"col_int\" INTEGER,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"batch_id\" INTEGER,\"append_time\" TIMESTAMP)"; String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"batch_id\", \"append_time\") " + "SELECT CONVERT(\"col_int\",INTEGER),CONVERT(\"col_string\",VARCHAR),CONVERT(\"col_decimal\",DECIMAL(5,2)),CONVERT(\"col_datetime\",TIMESTAMP)," + - "'xyz123','2000-01-01 00:00:00' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file1.csv'," + + "{NEXT_BATCH_ID_PATTERN},'2000-01-01 00:00:00' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file1.csv'," + "'col_int,col_string,col_decimal,col_datetime',NULL)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -156,11 +156,11 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabled() throws Exception String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table1.csv"; - RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE); + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.empty()); executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false); Map appendMetadata = h2Sink.executeQuery("select * from bulk_load_batch_metadata").get(0); - verifyBulkLoadMetadata(appendMetadata, filePath); + verifyBulkLoadMetadata(appendMetadata, filePath, 1, Optional.empty()); } @Test @@ -195,7 +195,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabled() throws Exception .relationalSink(H2Sink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE_1) .build(); GeneratorResult operations = generator.generateOperations(datasets); @@ -205,12 +205,12 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabled() throws Exception Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"main\"" + - "(\"col_int\" INTEGER NOT NULL PRIMARY KEY,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"batch_id\" VARCHAR)"; + "(\"col_int\" INTEGER,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"batch_id\" INTEGER)"; String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"batch_id\") " + "SELECT CONVERT(\"col_int\",INTEGER),CONVERT(\"col_string\",VARCHAR),CONVERT(\"col_decimal\",DECIMAL(5,2)),CONVERT(\"col_datetime\",TIMESTAMP)," + - "'xyz123' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file2.csv','col_int,col_string,col_decimal,col_datetime',NULL)"; + "(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MAIN') FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file2.csv','col_int,col_string,col_decimal,col_datetime',NULL)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -227,10 +227,10 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabled() throws Exception String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table2.csv"; - RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE); + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.of(TASK_ID_VALUE_1)); executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false); Map appendMetadata = h2Sink.executeQuery("select * from bulk_load_batch_metadata").get(0); - verifyBulkLoadMetadata(appendMetadata, filePath); + verifyBulkLoadMetadata(appendMetadata, filePath, 1, Optional.of(TASK_ID_VALUE_1)); } @Test @@ -267,7 +267,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabled() throws Exception .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) .collectStatistics(true) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE_1) .executionTimestampClock(fixedClock_2000_01_01) .build(); @@ -278,13 +278,13 @@ public void testBulkLoadWithDigestGeneratedAuditEnabled() throws Exception Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"main\"" + - "(\"col_int\" INTEGER NOT NULL PRIMARY KEY,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"digest\" VARCHAR,\"batch_id\" VARCHAR,\"append_time\" TIMESTAMP)"; + "(\"col_int\" INTEGER,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" TIMESTAMP)"; String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"digest\", \"batch_id\", \"append_time\") " + "SELECT CONVERT(\"col_int\",INTEGER),CONVERT(\"col_string\",VARCHAR),CONVERT(\"col_decimal\",DECIMAL(5,2)),CONVERT(\"col_datetime\",TIMESTAMP)," + "LAKEHOUSE_MD5(ARRAY['col_int','col_string','col_decimal','col_datetime'],ARRAY[\"col_int\",\"col_string\",\"col_decimal\",\"col_datetime\"])," + - "'xyz123','2000-01-01 00:00:00' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file3.csv','col_int,col_string,col_decimal,col_datetime',NULL)"; + "(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MAIN'),'2000-01-01 00:00:00' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file3.csv','col_int,col_string,col_decimal,col_datetime',NULL)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -302,10 +302,10 @@ public void testBulkLoadWithDigestGeneratedAuditEnabled() throws Exception String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table3.csv"; - RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE); + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.of(TASK_ID_VALUE_1)); executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false); Map appendMetadata = h2Sink.executeQuery("select * from bulk_load_batch_metadata").get(0); - verifyBulkLoadMetadata(appendMetadata, filePath); + verifyBulkLoadMetadata(appendMetadata, filePath, 1, Optional.of(TASK_ID_VALUE_1)); } @Test @@ -342,7 +342,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledUpperCase() throws Except .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) .collectStatistics(true) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE_1) .executionTimestampClock(fixedClock_2000_01_01) .caseConversion(CaseConversion.TO_UPPER) .build(); @@ -354,13 +354,13 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledUpperCase() throws Except Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"MAIN\"" + - "(\"COL_INT\" INTEGER NOT NULL PRIMARY KEY,\"COL_STRING\" VARCHAR,\"COL_DECIMAL\" DECIMAL(5,2),\"COL_DATETIME\" TIMESTAMP,\"DIGEST\" VARCHAR,\"BATCH_ID\" VARCHAR,\"APPEND_TIME\" TIMESTAMP)"; + "(\"COL_INT\" INTEGER,\"COL_STRING\" VARCHAR,\"COL_DECIMAL\" DECIMAL(5,2),\"COL_DATETIME\" TIMESTAMP,\"DIGEST\" VARCHAR,\"BATCH_ID\" INTEGER,\"APPEND_TIME\" TIMESTAMP)"; String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"MAIN\" " + "(\"COL_INT\", \"COL_STRING\", \"COL_DECIMAL\", \"COL_DATETIME\", \"DIGEST\", \"BATCH_ID\", \"APPEND_TIME\") " + "SELECT CONVERT(\"COL_INT\",INTEGER),CONVERT(\"COL_STRING\",VARCHAR),CONVERT(\"COL_DECIMAL\",DECIMAL(5,2)),CONVERT(\"COL_DATETIME\",TIMESTAMP)," + "LAKEHOUSE_MD5(ARRAY['COL_INT','COL_STRING','COL_DECIMAL','COL_DATETIME'],ARRAY[\"COL_INT\",\"COL_STRING\",\"COL_DECIMAL\",\"COL_DATETIME\"])," + - "'xyz123','2000-01-01 00:00:00' " + + "(SELECT COALESCE(MAX(BULK_LOAD_BATCH_METADATA.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as BULK_LOAD_BATCH_METADATA WHERE UPPER(BULK_LOAD_BATCH_METADATA.\"TABLE_NAME\") = 'MAIN'),'2000-01-01 00:00:00' " + "FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file4.csv','COL_INT,COL_STRING,COL_DECIMAL,COL_DATETIME',NULL)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -379,10 +379,64 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledUpperCase() throws Except String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table4.csv"; - RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.TO_UPPER); + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.TO_UPPER, Optional.of(TASK_ID_VALUE_1)); executePlansAndVerifyForCaseConversion(ingestor, datasets, schema, expectedDataPath, expectedStats); Map appendMetadata = h2Sink.executeQuery("select * from BULK_LOAD_BATCH_METADATA").get(0); - verifyBulkLoadMetadataForUpperCase(appendMetadata, filePath); + verifyBulkLoadMetadataForUpperCase(appendMetadata, filePath, 1, Optional.of(TASK_ID_VALUE_1)); + } + + @Test + public void testBulkLoadWithDigestNotGeneratedAuditDisabledTwoBatches() throws Exception + { + String filePath = "src/test/resources/data/bulk-load/input/staged_file2.csv"; + + BulkLoad bulkLoad = BulkLoad.builder() + .digestGenStrategy(NoDigestGenStrategy.builder().build()) + .auditing(NoAuditing.builder().build()) + .batchIdField(BATCH_ID) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + H2StagedFilesDatasetProperties.builder() + .fileFormat(FileFormat.CSV) + .addAllFiles(Collections.singletonList(filePath)).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database(testDatabaseName).group(testSchemaName).name(mainTableName).alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); + + Datasets datasets = Datasets.of(mainDataset, stagedFilesDataset); + + + // Verify execution using ingestor (first batch) + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{COL_INT, COL_STRING, COL_DECIMAL, COL_DATETIME, BATCH_ID}; + + Map expectedStats = new HashMap<>(); + expectedStats.put(StatisticName.FILES_LOADED.name(), 1); + expectedStats.put(StatisticName.ROWS_WITH_ERRORS.name(), 0); + + String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table2.csv"; + + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.of(TASK_ID_VALUE_1)); + executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false); + Map appendMetadata = h2Sink.executeQuery("select * from bulk_load_batch_metadata").get(0); + verifyBulkLoadMetadata(appendMetadata, filePath, 1, Optional.of(TASK_ID_VALUE_1)); + + + // Verify execution using ingestor (second batch) + expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table5.csv"; + + ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.of(TASK_ID_VALUE_2)); + executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false); + appendMetadata = h2Sink.executeQuery("select * from bulk_load_batch_metadata").get(0); + verifyBulkLoadMetadata(appendMetadata, filePath, 1, Optional.of(TASK_ID_VALUE_1)); + appendMetadata = h2Sink.executeQuery("select * from bulk_load_batch_metadata").get(1); + verifyBulkLoadMetadata(appendMetadata, filePath, 2, Optional.of(TASK_ID_VALUE_2)); } @Test @@ -445,7 +499,7 @@ public void testBulkLoadStagedFilesDatasetNotProvided() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(TASK_ID_VALUE_1) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) .build(); @@ -459,6 +513,100 @@ public void testBulkLoadStagedFilesDatasetNotProvided() } } + @Test + public void testBulkLoadStageHasPrimaryKey() + { + try + { + Field pkCol = Field.builder() + .name("some_pk") + .type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())) + .primaryKey(true) + .build(); + + BulkLoad bulkLoad = BulkLoad.builder() + .batchIdField(BATCH_ID) + .digestGenStrategy(NoDigestGenStrategy.builder().build()) + .auditing(NoAuditing.builder().build()) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + H2StagedFilesDatasetProperties.builder() + .fileFormat(FileFormat.CSV) + .addAllFiles(Collections.singletonList("src/test/resources/data/bulk-load/input/staged_file1.csv")).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, pkCol)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); + + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(bulkLoad) + .relationalSink(H2Sink.get()) + .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .build(); + + GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); + Assertions.fail("Exception was not thrown"); + } + catch (Exception e) + { + Assertions.assertTrue(e.getMessage().contains("Primary key list must be empty")); + } + } + + @Test + public void testBulkLoadMainHasPrimaryKey() + { + try + { + Field pkCol = Field.builder() + .name("some_pk") + .type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())) + .primaryKey(true) + .build(); + + BulkLoad bulkLoad = BulkLoad.builder() + .batchIdField(BATCH_ID) + .digestGenStrategy(NoDigestGenStrategy.builder().build()) + .auditing(NoAuditing.builder().build()) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + H2StagedFilesDatasetProperties.builder() + .fileFormat(FileFormat.CSV) + .addAllFiles(Collections.singletonList("src/test/resources/data/bulk-load/input/staged_file1.csv")).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, pkCol)).build()) + .build(); + + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(bulkLoad) + .relationalSink(H2Sink.get()) + .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .build(); + + GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); + Assertions.fail("Exception was not thrown"); + } + catch (Exception e) + { + Assertions.assertTrue(e.getMessage().contains("Primary key list must be empty")); + } + } + @Test public void testBulkLoadMoreThanOneFile() { @@ -499,7 +647,7 @@ public void testBulkLoadNotCsvFile() } } - RelationalIngestor getRelationalIngestor(IngestMode ingestMode, PlannerOptions options, Clock executionTimestampClock, CaseConversion caseConversion) + RelationalIngestor getRelationalIngestor(IngestMode ingestMode, PlannerOptions options, Clock executionTimestampClock, CaseConversion caseConversion, Optional taskId) { return RelationalIngestor.builder() .ingestMode(ingestMode) @@ -507,30 +655,46 @@ RelationalIngestor getRelationalIngestor(IngestMode ingestMode, PlannerOptions o .executionTimestampClock(executionTimestampClock) .cleanupStagingData(options.cleanupStagingData()) .collectStatistics(options.collectStatistics()) - .bulkLoadBatchIdValue(BATCH_ID_VALUE) + .bulkLoadTaskIdValue(taskId) .enableConcurrentSafety(true) .caseConversion(caseConversion) .build(); } - private void verifyBulkLoadMetadata(Map appendMetadata, String fileName) + private void verifyBulkLoadMetadata(Map appendMetadata, String fileName, int batchId, Optional taskId) { - Assertions.assertEquals("xyz123", appendMetadata.get("batch_id")); + Assertions.assertEquals(batchId, appendMetadata.get("batch_id")); Assertions.assertEquals("SUCCEEDED", appendMetadata.get("batch_status")); Assertions.assertEquals("main", appendMetadata.get("table_name")); - Assertions.assertEquals(String.format("{\"files\":[\"%s\"]}", fileName), appendMetadata.get("batch_source_info")); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("batch_start_ts_utc").toString()); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("batch_end_ts_utc").toString()); + Assertions.assertTrue(appendMetadata.get("batch_source_info").toString().contains(String.format("\"files\":[\"%s\"]", fileName))); + if (taskId.isPresent()) + { + Assertions.assertTrue(appendMetadata.get("batch_source_info").toString().contains(String.format("\"task_id\":\"%s\"", taskId.get()))); + } + else + { + Assertions.assertFalse(appendMetadata.get("batch_source_info").toString().contains("\"task_id\"")); + } } - private void verifyBulkLoadMetadataForUpperCase(Map appendMetadata, String fileName) + private void verifyBulkLoadMetadataForUpperCase(Map appendMetadata, String fileName, int batchId, Optional taskId) { - Assertions.assertEquals("xyz123", appendMetadata.get("BATCH_ID")); + Assertions.assertEquals(batchId, appendMetadata.get("BATCH_ID")); Assertions.assertEquals("SUCCEEDED", appendMetadata.get("BATCH_STATUS")); Assertions.assertEquals("MAIN", appendMetadata.get("TABLE_NAME")); - Assertions.assertEquals(String.format("{\"files\":[\"%s\"]}", fileName), appendMetadata.get("BATCH_SOURCE_INFO")); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("BATCH_START_TS_UTC").toString()); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("BATCH_END_TS_UTC").toString()); + Assertions.assertTrue(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains(String.format("\"files\":[\"%s\"]", fileName))); + if (taskId.isPresent()) + { + Assertions.assertTrue(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains(String.format("\"task_id\":\"%s\"", taskId.get()))); + } + else + { + Assertions.assertFalse(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains("\"task_id\"")); + } } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table1.csv index b68e9aa646b..022020ba331 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table1.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table1.csv @@ -1,3 +1,3 @@ -1,Andy,5.20,2022-01-11 00:00:00.0,xyz123,2000-01-01 00:00:00.0 -2,Bella,99.99,2022-01-12 00:00:00.0,xyz123,2000-01-01 00:00:00.0 -49,Sandy,123.45,2022-01-13 00:00:00.0,xyz123,2000-01-01 00:00:00.0 \ No newline at end of file +1,Andy,5.20,2022-01-11 00:00:00.0,1,2000-01-01 00:00:00.0 +2,Bella,99.99,2022-01-12 00:00:00.0,1,2000-01-01 00:00:00.0 +49,Sandy,123.45,2022-01-13 00:00:00.0,1,2000-01-01 00:00:00.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table2.csv index c807b1c4764..92b02b8f19c 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table2.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table2.csv @@ -1,3 +1,3 @@ -1,Andy,5.20,2022-01-11 00:00:00.0,xyz123 -2,Bella,99.99,2022-01-12 00:00:00.0,xyz123 -49,Sandy,123.45,2022-01-13 00:00:00.0,xyz123 \ No newline at end of file +1,Andy,5.20,2022-01-11 00:00:00.0,1 +2,Bella,99.99,2022-01-12 00:00:00.0,1 +49,Sandy,123.45,2022-01-13 00:00:00.0,1 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table3.csv index c6774c43774..b9421520b4a 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table3.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table3.csv @@ -1,3 +1,3 @@ -1,Andy,5.20,2022-01-11 00:00:00.0,9fc62c73317227ab0760aed72f4fee17,xyz123,2000-01-01 00:00:00.0 -2,Bella,99.99,2022-01-12 00:00:00.0,b0383f1a479eb2a6c5186f045af4c51f,xyz123,2000-01-01 00:00:00.0 -49,Sandy,123.45,2022-01-13 00:00:00.0,dc170980c8540e2a667753e793dad94c,xyz123,2000-01-01 00:00:00.0 \ No newline at end of file +1,Andy,5.20,2022-01-11 00:00:00.0,9fc62c73317227ab0760aed72f4fee17,1,2000-01-01 00:00:00.0 +2,Bella,99.99,2022-01-12 00:00:00.0,b0383f1a479eb2a6c5186f045af4c51f,1,2000-01-01 00:00:00.0 +49,Sandy,123.45,2022-01-13 00:00:00.0,dc170980c8540e2a667753e793dad94c,1,2000-01-01 00:00:00.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table4.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table4.csv index 7888259500d..0b162ed75bd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table4.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table4.csv @@ -1,3 +1,3 @@ -1,Andy,5.20,2022-01-11 00:00:00.0,e7dc92b208f2244b9ece45d706474f55,xyz123,2000-01-01 00:00:00.0 -2,Bella,99.99,2022-01-12 00:00:00.0,278cf3ee2c2981bb8aeade81cc21e87a,xyz123,2000-01-01 00:00:00.0 -49,Sandy,123.45,2022-01-13 00:00:00.0,e8ff35a6699515eaca0a798a7f989978,xyz123,2000-01-01 00:00:00.0 \ No newline at end of file +1,Andy,5.20,2022-01-11 00:00:00.0,e7dc92b208f2244b9ece45d706474f55,1,2000-01-01 00:00:00.0 +2,Bella,99.99,2022-01-12 00:00:00.0,278cf3ee2c2981bb8aeade81cc21e87a,1,2000-01-01 00:00:00.0 +49,Sandy,123.45,2022-01-13 00:00:00.0,e8ff35a6699515eaca0a798a7f989978,1,2000-01-01 00:00:00.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table5.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table5.csv index 7d90d71c952..a20715af7c5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table5.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/expected/expected_table5.csv @@ -1,3 +1,6 @@ -1,Andy,5.20,2022-01-11 00:00:00.0,4B39799C7A1FB5EFC4BC328966A159E0,2000-01-01 00:00:00.0,src/test/resources/data/bulk-load/input/staged_file5.csv -2,Bella,99.99,2022-01-12 00:00:00.0,58467B440BCED7607369DC8A260B0607,2000-01-01 00:00:00.0,src/test/resources/data/bulk-load/input/staged_file5.csv -49,Sandy,123.45,2022-01-13 00:00:00.0,29B8C8A6CD28B069290372E6B54B6C72,2000-01-01 00:00:00.0,src/test/resources/data/bulk-load/input/staged_file5.csv \ No newline at end of file +1,Andy,5.20,2022-01-11 00:00:00.0,1 +2,Bella,99.99,2022-01-12 00:00:00.0,1 +49,Sandy,123.45,2022-01-13 00:00:00.0,1 +1,Andy,5.20,2022-01-11 00:00:00.0,2 +2,Bella,99.99,2022-01-12 00:00:00.0,2 +49,Sandy,123.45,2022-01-13 00:00:00.0,2 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/input/staged_file5.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/input/staged_file5.csv deleted file mode 100644 index dd2941bedb8..00000000000 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bulk-load/input/staged_file5.csv +++ /dev/null @@ -1,3 +0,0 @@ -1,Andy,5.20,2022-01-11 00:00:00.0 -2,Bella,99.99,2022-01-12 00:00:00.0 -49,Sandy,123.45,2022-01-13 00:00:00.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java index ea78819d696..46465426e97 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java @@ -85,6 +85,7 @@ import java.util.Objects; import java.util.ArrayList; +import static org.finos.legend.engine.persistence.components.relational.api.RelationalIngestorAbstract.BATCH_ID_PATTERN; import static org.finos.legend.engine.persistence.components.relational.api.RelationalIngestorAbstract.BATCH_START_TS_PATTERN; public class SnowflakeSink extends AnsiSqlSink @@ -264,7 +265,8 @@ public IngestorResult performBulkLoad(Datasets datasets, Executor metadataIngestSql = operations.metadataIngestSql(); Map statsSql = operations.postIngestStatisticsSql(); - String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER NOT NULL PRIMARY KEY,\"col_integer\" INTEGER,\"batch_id\" VARCHAR,\"append_time\" DATETIME)"; + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + "(\"col_int\", \"col_integer\", \"batch_id\", \"append_time\") " + "FROM " + - "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\",'batch123','2000-01-01 00:00:00' " + + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\",{NEXT_BATCH_ID},'2000-01-01 00:00:00' " + "FROM my_location (FILE_FORMAT => 'my_file_format', PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage)" + " on_error = 'ABORT_STATEMENT'"; String expectedMetadataIngestSql = "INSERT INTO bulk_load_batch_metadata (\"batch_id\", \"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"batch_source_info\") " + - "(SELECT 'batch123','my_name','2000-01-01 00:00:00',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; + "(SELECT {NEXT_BATCH_ID},'my_name','2000-01-01 00:00:00',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"],\"task_id\":\"task123\"}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -163,7 +161,7 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() .ingestMode(bulkLoad) .relationalSink(SnowflakeSink.get()) .collectStatistics(true) - .bulkLoadBatchIdValue("batch123") + .bulkLoadTaskIdValue("task123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -172,11 +170,11 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() List ingestSql = operations.ingestSql(); Map statsSql = operations.postIngestStatisticsSql(); - String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_bigint\" BIGINT,\"col_variant\" VARIANT,\"batch_id\" VARCHAR)"; + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_bigint\" BIGINT,\"col_variant\" VARIANT,\"batch_id\" INTEGER)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + "(\"col_bigint\", \"col_variant\", \"batch_id\") " + "FROM " + - "(SELECT t.$4 as \"col_bigint\",TO_VARIANT(PARSE_JSON(t.$5)) as \"col_variant\",'batch123' " + + "(SELECT t.$4 as \"col_bigint\",TO_VARIANT(PARSE_JSON(t.$5)) as \"col_variant\",(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MY_NAME') " + "FROM my_location (FILE_FORMAT => 'my_file_format', PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as t) " + "on_error = 'ABORT_STATEMENT'"; @@ -189,7 +187,7 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() } @Test - public void testBulkLoadWithUpperCaseConversionAndDefaultBatchId() + public void testBulkLoadWithUpperCaseConversionAndNoTaskId() { BulkLoad bulkLoad = BulkLoad.builder() .batchIdField("batch_id") @@ -223,31 +221,28 @@ public void testBulkLoadWithUpperCaseConversionAndDefaultBatchId() List preActionsSql = operations.preActionsSql(); List ingestSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); Map statsSql = operations.postIngestStatisticsSql(); - // Extract the generated UUID - Pattern pattern = Pattern.compile("[a-f0-9]{8}(?:-[a-f0-9]{4}){4}[a-f0-9]{8}"); - Matcher matcher = pattern.matcher(ingestSql.get(0)); - String uuid = ""; - if (matcher.find()) - { - uuid = matcher.group(); - } - - String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"MY_DB\".\"MY_NAME\"(\"COL_INT\" INTEGER NOT NULL PRIMARY KEY," + - "\"COL_INTEGER\" INTEGER,\"DIGEST\" VARCHAR,\"BATCH_ID\" VARCHAR,\"APPEND_TIME\" DATETIME)"; + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"MY_DB\".\"MY_NAME\"(\"COL_INT\" INTEGER," + + "\"COL_INTEGER\" INTEGER,\"DIGEST\" VARCHAR,\"BATCH_ID\" INTEGER,\"APPEND_TIME\" DATETIME)"; String expectedIngestSql = "COPY INTO \"MY_DB\".\"MY_NAME\" " + "(\"COL_INT\", \"COL_INTEGER\", \"DIGEST\", \"BATCH_ID\", \"APPEND_TIME\") " + "FROM " + "(SELECT legend_persistence_stage.$1 as \"COL_INT\",legend_persistence_stage.$2 as \"COL_INTEGER\"," + "LAKEHOUSE_MD5(OBJECT_CONSTRUCT('COL_INT',legend_persistence_stage.$1,'COL_INTEGER',legend_persistence_stage.$2))," + - "'%s','2000-01-01 00:00:00' " + + "(SELECT COALESCE(MAX(BULK_LOAD_BATCH_METADATA.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as BULK_LOAD_BATCH_METADATA WHERE UPPER(BULK_LOAD_BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME'),'2000-01-01 00:00:00' " + "FROM my_location (FILE_FORMAT => 'my_file_format', " + "PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage) " + "on_error = 'ABORT_STATEMENT'"; + String expectedMetadataIngestSql = "INSERT INTO BULK_LOAD_BATCH_METADATA (\"BATCH_ID\", \"TABLE_NAME\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"BATCH_SOURCE_INFO\") " + + "(SELECT (SELECT COALESCE(MAX(BULK_LOAD_BATCH_METADATA.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as BULK_LOAD_BATCH_METADATA WHERE UPPER(BULK_LOAD_BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME')," + + "'MY_NAME','2000-01-01 00:00:00',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); - Assertions.assertEquals(String.format(expectedIngestSql, uuid), ingestSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + Assertions.assertEquals(expectedMetadataIngestSql, metadataIngestSql.get(0)); Assertions.assertEquals("SELECT 0 as \"ROWSDELETED\"", statsSql.get(ROWS_DELETED)); Assertions.assertEquals("SELECT 0 as \"ROWSTERMINATED\"", statsSql.get(ROWS_TERMINATED)); @@ -317,7 +312,7 @@ public void testBulkLoadStagedFilesDatasetNotProvided() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue("batch123") + .bulkLoadTaskIdValue("batch123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagingDataset)); @@ -357,7 +352,7 @@ public void testBulkLoadWithDigest() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadBatchIdValue("batch123") + .bulkLoadTaskIdValue("task123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -366,14 +361,14 @@ public void testBulkLoadWithDigest() List ingestSql = operations.ingestSql(); Map statsSql = operations.postIngestStatisticsSql(); - String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER NOT NULL PRIMARY KEY,\"col_integer\" INTEGER,\"digest\" VARCHAR,\"batch_id\" VARCHAR,\"append_time\" DATETIME)"; + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + "FROM " + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + "LAKEHOUSE_UDF(OBJECT_CONSTRUCT('col_int',legend_persistence_stage.$1,'col_integer',legend_persistence_stage.$2))," + - "'batch123','2000-01-01 00:00:00' " + + "(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00' " + "FROM my_location (FILE_FORMAT => 'my_file_format', " + "PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage) " + "on_error = 'ABORT_STATEMENT'"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsSnowflakeTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsSnowflakeTest.java index 8ad9c6351ef..4a5a9dd4992 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsSnowflakeTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/util/BulkLoadDatasetUtilsSnowflakeTest.java @@ -24,13 +24,13 @@ public String getExpectedSqlForMetadata() { return "INSERT INTO bulk_load_batch_metadata " + "(\"batch_id\", \"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"batch_source_info\") " + - "(SELECT 'batch_id_123','appeng_log_table_name','2000-01-01 00:00:00',SYSDATE(),'',PARSE_JSON('my_lineage_value'))"; + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'APPENG_LOG_TABLE_NAME'),'appeng_log_table_name','2000-01-01 00:00:00',SYSDATE(),'',PARSE_JSON('my_lineage_value'))"; } public String getExpectedSqlForMetadataUpperCase() { return "INSERT INTO BULK_LOAD_BATCH_METADATA (\"BATCH_ID\", \"TABLE_NAME\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"BATCH_SOURCE_INFO\") " + - "(SELECT 'batch_id_123','BULK_LOAD_TABLE_NAME','2000-01-01 00:00:00',SYSDATE(),'',PARSE_JSON('my_lineage_value'))"; + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"TABLE_NAME\") = 'BULK_LOAD_TABLE_NAME'),'BULK_LOAD_TABLE_NAME','2000-01-01 00:00:00',SYSDATE(),'',PARSE_JSON('my_lineage_value'))"; } public RelationalSink getRelationalSink()