diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java index 8f11a6bfa94..32280e1e4f4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java @@ -102,7 +102,12 @@ public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets) return LogicalPlan.builder().addOps(selection).build(); } - public static LogicalPlan getLogicalPlanForIdempotencyCheck(MetadataDataset metadataDataset, String ingestRequestId, String mainDatasetName) + /* + SELECT * FROM batch_metadata as batch_metadata WHERE + (batch_metadata."ingest_request_id" = '') + AND (batch_metadata."table_name" = '')"; + */ + public static LogicalPlan getLogicalPlanForBatchMetaRowsWithExistingIngestRequestId(MetadataDataset metadataDataset, String ingestRequestId, String mainDatasetName) { Dataset dataset = metadataDataset.get(); FieldValue ingestRequestField = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(metadataDataset.ingestRequestIdField()).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/LogicalPlanFactoryTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/LogicalPlanFactoryTest.java index e98c063ab37..7ef28954942 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/LogicalPlanFactoryTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/LogicalPlanFactoryTest.java @@ -101,7 +101,7 @@ public void testLogicalPlanForIdempotencyCheck() { MetadataDataset dataset = MetadataDataset.builder().build(); RelationalTransformer transformer = new RelationalTransformer(AnsiSqlSink.get()); - LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForIdempotencyCheck(dataset, "123xyz", "main"); + LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForBatchMetaRowsWithExistingIngestRequestId(dataset, "123xyz", "main"); SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan); List list = physicalPlan.getSqlList(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 075b1bac308..b377e6a6c10 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -578,7 +578,7 @@ private List verifyIfRequestAlreadyProcessedPreviously(SchemaEvo if (enableConcurrentSafety() && enableIdempotencyCheck()) { LOGGER.info("Perform Idempotency Check"); - LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForIdempotencyCheck( + LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForBatchMetaRowsWithExistingIngestRequestId( enrichedDatasets.metadataDataset().orElseThrow(IllegalStateException::new), ingestRequestId().orElseThrow(IllegalStateException::new), enrichedDatasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new)); @@ -591,11 +591,11 @@ private List verifyIfRequestAlreadyProcessedPreviously(SchemaEvo for (Map metadata: metadataResults) { Timestamp ingestionTimestampUTC = (Timestamp) metadata.get(metadataDataset.batchStartTimeField()); - String batchStatus = (String) metadata.get(metadataDataset.batchStatusField()); + String batchStatus = String.valueOf(metadata.get(metadataDataset.batchStatusField())); batchStatus = batchStatus.equalsIgnoreCase(batchSuccessStatusValue()) ? IngestStatus.SUCCEEDED.name() : batchStatus; IngestorResult ingestorResult = IngestorResult.builder() .batchId((int) metadata.get(metadataDataset.tableBatchIdField())) - .putAllStatisticByName(readValueAsMap((String) metadata.get(metadataDataset.batchStatisticsField()))) + .putAllStatisticByName(readValueAsMap(String.valueOf(metadata.get(metadataDataset.batchStatisticsField())))) .status(IngestStatus.valueOf(batchStatus)) .updatedDatasets(enrichedDatasets) .schemaEvolutionSql(schemaEvolutionResult.schemaEvolutionSql())