Skip to content

Commit

Permalink
Reformat the lock method
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Jun 4, 2024
1 parent 2dc4bad commit 356e48d
Showing 1 changed file with 41 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,17 @@ public List<IngestorResult> ingest()
LOGGER.info("Invoked ingest method, will perform the ingestion");
validateDatasetsInitialization();
SchemaEvolutionResult schemaEvolutionResult = SchemaEvolutionResult.builder().updatedDatasets(enrichedDatasets).build();
List<IngestorResult> result = acquireLockAndIdempotencyCheck(schemaEvolutionResult);
acquireLock();

// idempotency Check
List<IngestorResult> result = verifyIfRequestAlreadyProcessedPreviously(schemaEvolutionResult);
if (result.isEmpty())
{
dedupAndVersion();
List<DataSplitRange> dataSplitRanges = ApiUtils.getDataSplitRanges(executor, planner, transformer, ingestMode());
result = ingest(dataSplitRanges, schemaEvolutionResult);
LOGGER.info("Ingestion completed");
}
LOGGER.info("Ingestion completed");
return result;
}

Expand Down Expand Up @@ -558,45 +561,48 @@ private void initializeLock()
}
}

private List<IngestorResult> acquireLockAndIdempotencyCheck(SchemaEvolutionResult schemaEvolutionResult)
private void acquireLock()
{
List<IngestorResult> result = new ArrayList<>();
if (enableConcurrentSafety())
{
LOGGER.info("Concurrent safety is enabled, Acquiring lock");
Map<String, PlaceholderValue> placeHolderKeyValues = new HashMap<>();
placeHolderKeyValues.put(BATCH_START_TS_PATTERN, PlaceholderValue.of(LocalDateTime.now(executionTimestampClock()).format(DATE_TIME_FORMATTER), false));
executor.executePhysicalPlan(generatorResult.acquireLockSqlPlan().orElseThrow(IllegalStateException::new), placeHolderKeyValues);
}
}

if (enableIdempotencyCheck())
private List<IngestorResult> verifyIfRequestAlreadyProcessedPreviously(SchemaEvolutionResult schemaEvolutionResult)
{
List<IngestorResult> result = new ArrayList<>();
if (enableConcurrentSafety() && enableIdempotencyCheck())
{
LOGGER.info("Perform Idempotency Check");
LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForIdempotencyCheck(
enrichedDatasets.metadataDataset().orElseThrow(IllegalStateException::new),
ingestRequestId().orElseThrow(IllegalStateException::new),
enrichedDatasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<TabularData> tabularDataList = executor.executePhysicalPlanAndGetResults(physicalPlan);
MetadataDataset metadataDataset = enrichedDatasets.metadataDataset().get();
if (!tabularDataList.isEmpty())
{
LOGGER.info("Perform Idempotency Check");
LogicalPlan logicalPlan = LogicalPlanFactory.getLogicalPlanForIdempotencyCheck(
enrichedDatasets.metadataDataset().orElseThrow(IllegalStateException::new),
ingestRequestId().orElseThrow(IllegalStateException::new),
enrichedDatasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<TabularData> tabularDataList = executor.executePhysicalPlanAndGetResults(physicalPlan);
MetadataDataset metadataDataset = enrichedDatasets.metadataDataset().get();
if (!tabularDataList.isEmpty())
List<Map<String, Object>> metadataResults = tabularDataList.get(0).getData();
for (Map<String, Object> metadata: metadataResults)
{
List<Map<String, Object>> metadataResults = tabularDataList.get(0).getData();
for (Map<String, Object> metadata: metadataResults)
{
Timestamp ingestionTimestampUTC = (Timestamp) metadata.get(metadataDataset.batchStartTimeField());
String batchStatus = (String) metadata.get(metadataDataset.batchStatusField());
batchStatus = batchStatus.equalsIgnoreCase(batchSuccessStatusValue()) ? IngestStatus.SUCCEEDED.name() : batchStatus;
IngestorResult ingestorResult = IngestorResult.builder()
.batchId((int) metadata.get(metadataDataset.tableBatchIdField()))
.putAllStatisticByName(readValueAsMap((String) metadata.get(metadataDataset.batchStatisticsField())))
.status(IngestStatus.valueOf(batchStatus))
.updatedDatasets(enrichedDatasets)
.schemaEvolutionSql(schemaEvolutionResult.schemaEvolutionSql())
.ingestionTimestampUTC(ingestionTimestampUTC.toLocalDateTime().format(DATE_TIME_FORMATTER))
.previouslyProcessed(true)
.build();
result.add(ingestorResult);
}
Timestamp ingestionTimestampUTC = (Timestamp) metadata.get(metadataDataset.batchStartTimeField());
String batchStatus = (String) metadata.get(metadataDataset.batchStatusField());
batchStatus = batchStatus.equalsIgnoreCase(batchSuccessStatusValue()) ? IngestStatus.SUCCEEDED.name() : batchStatus;
IngestorResult ingestorResult = IngestorResult.builder()
.batchId((int) metadata.get(metadataDataset.tableBatchIdField()))
.putAllStatisticByName(readValueAsMap((String) metadata.get(metadataDataset.batchStatisticsField())))
.status(IngestStatus.valueOf(batchStatus))
.updatedDatasets(enrichedDatasets)
.schemaEvolutionSql(schemaEvolutionResult.schemaEvolutionSql())
.ingestionTimestampUTC(ingestionTimestampUTC.toLocalDateTime().format(DATE_TIME_FORMATTER))
.previouslyProcessed(true)
.build();
result.add(ingestorResult);
}
}
}
Expand Down Expand Up @@ -661,7 +667,10 @@ private List<IngestorResult> performFullIngestion(RelationalConnection connectio
try
{
executor.begin();
result = acquireLockAndIdempotencyCheck(schemaEvolutionResult);
acquireLock();

// idempotency Check
result = verifyIfRequestAlreadyProcessedPreviously(schemaEvolutionResult);
if (result.isEmpty())
{
// Dedup and Version
Expand Down

0 comments on commit 356e48d

Please sign in to comment.