Skip to content

Commit

Permalink
Addressed code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Jun 6, 2024
1 parent 356e48d commit 878a957
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
return LogicalPlan.builder().addOps(selection).build();
}

public static LogicalPlan getLogicalPlanForIdempotencyCheck(MetadataDataset metadataDataset, String ingestRequestId, String mainDatasetName)
/*
SELECT * FROM batch_metadata as batch_metadata WHERE
(batch_metadata."ingest_request_id" = '<ingestRequestId>')
AND (batch_metadata."table_name" = '<mainDatasetName>')";
*/
public static LogicalPlan getLogicalPlanForBatchMetaRowsWithExistingIngestRequestId(MetadataDataset metadataDataset, String ingestRequestId, String mainDatasetName)
{
Dataset dataset = metadataDataset.get();
FieldValue ingestRequestField = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(metadataDataset.ingestRequestIdField()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testLogicalPlanForIdempotencyCheck()
{
MetadataDataset dataset = MetadataDataset.builder().build();
RelationalTransformer transformer = new RelationalTransformer(AnsiSqlSink.get());
LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForIdempotencyCheck(dataset, "123xyz", "main");
LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForBatchMetaRowsWithExistingIngestRequestId(dataset, "123xyz", "main");
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ private List<IngestorResult> verifyIfRequestAlreadyProcessedPreviously(SchemaEvo
if (enableConcurrentSafety() && enableIdempotencyCheck())
{
LOGGER.info("Perform Idempotency Check");
LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForIdempotencyCheck(
LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForBatchMetaRowsWithExistingIngestRequestId(
enrichedDatasets.metadataDataset().orElseThrow(IllegalStateException::new),
ingestRequestId().orElseThrow(IllegalStateException::new),
enrichedDatasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
Expand All @@ -591,11 +591,11 @@ private List<IngestorResult> verifyIfRequestAlreadyProcessedPreviously(SchemaEvo
for (Map<String, Object> metadata: metadataResults)
{
Timestamp ingestionTimestampUTC = (Timestamp) metadata.get(metadataDataset.batchStartTimeField());
String batchStatus = (String) metadata.get(metadataDataset.batchStatusField());
String batchStatus = String.valueOf(metadata.get(metadataDataset.batchStatusField()));
batchStatus = batchStatus.equalsIgnoreCase(batchSuccessStatusValue()) ? IngestStatus.SUCCEEDED.name() : batchStatus;
IngestorResult ingestorResult = IngestorResult.builder()
.batchId((int) metadata.get(metadataDataset.tableBatchIdField()))
.putAllStatisticByName(readValueAsMap((String) metadata.get(metadataDataset.batchStatisticsField())))
.putAllStatisticByName(readValueAsMap(String.valueOf(metadata.get(metadataDataset.batchStatisticsField()))))
.status(IngestStatus.valueOf(batchStatus))
.updatedDatasets(enrichedDatasets)
.schemaEvolutionSql(schemaEvolutionResult.schemaEvolutionSql())
Expand Down

0 comments on commit 878a957

Please sign in to comment.