Skip to content

Commit

Permalink
Release 4.12.0
Browse files Browse the repository at this point in the history
  • Loading branch information
quim3ra committed Jun 13, 2022
1 parent 57e2ca6 commit 566b065
Show file tree
Hide file tree
Showing 107 changed files with 3,091 additions and 970 deletions.
101 changes: 51 additions & 50 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
springBoot = 2.6.6
dependencyManagement = 1.0.11.RELEASE
projectVersion = 4.11.0
projectVersion = 4.12.0
paypalHyperwalletDockerRepository = hyperwallet-mirakl-connector
org.gradle.jvmargs = -XX:PermSize=1024M -XX:MaxPermSize=1024M
2 changes: 2 additions & 0 deletions infrastructure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies {
testImplementation 'com.callibrity.logging:log-tracker:1.0.1'
testImplementation 'org.awaitility:awaitility'

testAnnotationProcessor "org.mapstruct:mapstruct-processor:1.4.2.Final"

runtimeOnly 'net.minidev:json-smart:2.3'
api 'com.hyperwallet:sdk:2.2.5'
api 'com.mirakl:mmp-sdk-operator:4.10.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.Collection;
import java.util.Optional;

/**
* Base class for all batch jobs. Managed the job execution main flow including batch
Expand All @@ -15,6 +16,18 @@ public abstract class AbstractBatchJob<C extends BatchJobContext, T extends Batc

protected abstract BatchJobItemsExtractor<C, T> getBatchJobItemsExtractor();

protected Optional<BatchJobItemValidator<C, T>> getBatchJobItemValidator() {
return Optional.empty();
}

protected Optional<BatchJobPreProcessor<C, T>> getBatchJobPreProcessor() {
return Optional.empty();
}

protected Optional<BatchJobItemEnricher<C, T>> getBatchJobItemEnricher() {
return Optional.empty();
}

/**
* {@inheritDoc}
*/
Expand All @@ -31,4 +44,26 @@ public void processItem(final C ctx, final T jobItem) {
getBatchJobItemProcessor().processItem(ctx, jobItem);
}

@Override
public void prepareForItemProcessing(C ctx, Collection<T> itemsToBeProcessed) {
getBatchJobPreProcessor().ifPresent(it -> it.prepareForProcessing(ctx, itemsToBeProcessed));
}

@Override
public T enrichItem(C ctx, T jobItem) {
BatchJobItemEnricher<C, T> batchJobItemEnricher = getBatchJobItemEnricher().orElse(null);
return batchJobItemEnricher != null ? batchJobItemEnricher.enrichItem(ctx, jobItem) : jobItem;
}

@Override
public BatchJobItemValidationResult validateItem(C ctx, T jobItem) {
BatchJobItemValidator<C, T> batchJobItemValidator = getBatchJobItemValidator().orElse(null);
return batchJobItemValidator != null ? batchJobItemValidator.validateItem(ctx, jobItem)
: buildValidValidationResult();
}

private BatchJobItemValidationResult buildValidValidationResult() {
return BatchJobItemValidationResult.builder().status(BatchJobItemValidationStatus.VALID).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.paypal.infrastructure.batchjob;

/**
* Abstract class for all jobs of type Extract
*/
public abstract class AbstractExtractBatchJob<C extends BatchJobContext, T extends BatchJobItem<?>>
extends AbstractBatchJob<C, T> {

@Override
public BatchJobType getType() {
return BatchJobType.EXTRACT;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.paypal.infrastructure.batchjob;

/**
* Abstract class for all jobs of type Retry
*/
public abstract class AbstractRetryBatchJob<C extends BatchJobContext, T extends BatchJobItem<?>>
extends AbstractBatchJob<C, T> {

@Override
public BatchJobType getType() {
return BatchJobType.RETRY;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,23 @@ public interface BatchJob<C extends BatchJobContext, T extends BatchJobItem<?>>
*/
Collection<T> getItems(C ctx);

void prepareForItemProcessing(C ctx, Collection<T> itemsToBeProcessed);

T enrichItem(C ctx, final T jobItem);

BatchJobItemValidationResult validateItem(final C ctx, final T jobItem);

/**
* Process an item with the given context.
* @param ctx the batch job context.
* @param jobItem the batch job item.
*/
void processItem(final C ctx, final T jobItem);

/**
* Return the type of this job
* @return a {@link BatchJobType}+
*/
BatchJobType getType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public interface BatchJobContext {
*/
String getJobName();

/**
* Returns the job unique identifier.
* @return the job unique identifier.
*/
String getJobUuid();

/**
Expand Down Expand Up @@ -87,4 +91,10 @@ public interface BatchJobContext {
*/
JobExecutionContext getJobExecutionContext();

/**
* Returns the job which is being executed.
* @return the {@link BatchJob}.
*/
<C extends BatchJobContext, T extends BatchJobItem<?>> BatchJob<C, T> getBatchJob();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ public <C extends BatchJobContext, T extends BatchJobItem<?>> void execute(Batch
try {
reportBatchJobStarted(ctx);

retrieveBatchItems(job, ctx).forEach(i -> processItem(job, ctx, i));
Collection<T> itemsToBeProcessed = retrieveBatchItems(job, ctx);

prepareForProcessing(job, ctx, itemsToBeProcessed);

itemsToBeProcessed.forEach(i -> processItem(job, ctx, i));

reportBatchJobFinished(ctx);
}
Expand All @@ -47,12 +51,42 @@ private <C extends BatchJobContext, T extends BatchJobItem<?>> Collection<T> ret
}
}

private <T extends BatchJobItem<?>, C extends BatchJobContext> void prepareForProcessing(BatchJob<C, T> job,
C context, Collection<T> itemsToBeProcessed) {
try {
reportPreparationForProcessingStarted(context);

job.prepareForItemProcessing(context, itemsToBeProcessed);

reportPreparationForProcessingFinished(context);
}
catch (final RuntimeException e) {
reportPreparationForProcessingFailure(context, e);
}
}

private <C extends BatchJobContext, T extends BatchJobItem<?>> void processItem(BatchJob<C, T> job, final C context,
final T item) {
try {
reportItemProcessingStarted(context, item);
job.processItem(context, item);
reportItemProcessingFinished(context, item);

T enrichedItem = job.enrichItem(context, item);
BatchJobItemValidationResult validationResult = job.validateItem(context, enrichedItem);
switch (validationResult.getStatus()) {
case INVALID:
reportItemProcessingValidationFailure(context, item, validationResult);
reportItemProcessingFailure(context, item, null);
break;
case WARNING:
reportItemProcessingValidationFailure(context, item, validationResult);
job.processItem(context, enrichedItem);
reportItemProcessingFinished(context, item);
break;
case VALID:
job.processItem(context, enrichedItem);
reportItemProcessingFinished(context, item);
break;
}
}
catch (final RuntimeException e) {
reportItemProcessingFailure(context, item, e);
Expand Down Expand Up @@ -98,6 +132,18 @@ private <C extends BatchJobContext> void reportBatchJobFailure(final C ctx, fina
}
}

private <C extends BatchJobContext, T extends BatchJobItem<?>> void reportItemProcessingValidationFailure(C ctx,
T item, BatchJobItemValidationResult validationResult) {
for (final var batchJobProcessingListener : batchJobProcessingListeners) {
try {
batchJobProcessingListener.onItemProcessingValidationFailure(ctx, item, validationResult);
}
catch (final RuntimeException e) {
log.error(MSG_ERROR_WHILE_INVOKING_BATCH_JOB_LISTENER, e);
}
}
}

private <C extends BatchJobContext, T extends BatchJobItem<?>> void reportItemProcessingFailure(final C ctx,
final T item, final RuntimeException e) {
ctx.incrementFailedItems();
Expand Down Expand Up @@ -174,4 +220,39 @@ private <C extends BatchJobContext> void reportItemExtractionFailure(final C ctx
}
}

private <C extends BatchJobContext> void reportPreparationForProcessingStarted(C context) {
for (final var batchJobProcessingListener : batchJobProcessingListeners) {
try {
batchJobProcessingListener.onPreparationForProcessingStarted(context);
}
catch (final RuntimeException e) {
log.error(MSG_ERROR_WHILE_INVOKING_BATCH_JOB_LISTENER, e);
}
}

}

private <C extends BatchJobContext> void reportPreparationForProcessingFinished(C context) {
for (final var batchJobProcessingListener : batchJobProcessingListeners) {
try {
batchJobProcessingListener.onPreparationForProcessingFinished(context);
}
catch (final RuntimeException e) {
log.error(MSG_ERROR_WHILE_INVOKING_BATCH_JOB_LISTENER, e);
}
}
}

private <C extends BatchJobContext> void reportPreparationForProcessingFailure(C context, RuntimeException e) {
for (final var batchJobProcessingListener : batchJobProcessingListeners) {
try {
batchJobProcessingListener.onPreparationForProcessingFailure(context, e);
}
catch (final RuntimeException e1) {
log.error(MSG_ERROR_WHILE_INVOKING_BATCH_JOB_LISTENER, e1);
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.paypal.infrastructure.batchjob;

import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

Expand All @@ -21,6 +22,20 @@ public interface BatchJobFailedItemRepository extends JpaRepository<BatchJobFail
*/
List<BatchJobFailedItem> findByTypeAndStatus(String type, BatchJobFailedItemStatus status);

/**
* Retrieves a {@link List} of {@link BatchJobFailedItem} by the given type and
* {@link BatchJobFailedItemStatus} status ordered by
* {@link BatchJobFailedItem#getLastRetryTimestamp()} asc.
* @param type the {@link BatchJobFailedItem} type.
* @param status the {@link BatchJobFailedItem} status.
* @param pageable the {@link Pageable} pageable data.
* @return a {@link List} of {@link BatchJobFailedItem} by the given type and
* {@link BatchJobFailedItemStatus} status ordered by
* {@link BatchJobFailedItem#getLastRetryTimestamp()} asc.
*/
List<BatchJobFailedItem> findByTypeAndStatusOrderByLastRetryTimestampAsc(String type,
BatchJobFailedItemStatus status, Pageable pageable);

/**
* Retrieves a {@link List} of {@link BatchJobFailedItem} by the given type.
* @param type the {@link BatchJobFailedItem} type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import com.paypal.infrastructure.batchjob.entities.BatchJobItemTrackInfoEntity;
import com.paypal.infrastructure.mail.MailNotificationUtil;
import com.paypal.infrastructure.util.TimeMachine;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -26,6 +29,9 @@ public class BatchJobFailedItemServiceImpl implements BatchJobFailedItemService

private final MailNotificationUtil mailNotificationUtil;

@Value("${retry.maxFailedItemsToProcessed}")
private int maxNumberOfFailedItems;

public BatchJobFailedItemServiceImpl(final BatchJobFailedItemRepository failedItemRepository,
BatchJobTrackingService batchJobTrackingService,
List<BatchJobFailedItemRetryPolicy> batchJobFailedItemRetryPolicies,
Expand Down Expand Up @@ -90,8 +96,8 @@ public <T extends BatchJobItem<?>> void removeItemProcessed(final T item) {
*/
@Override
public List<BatchJobFailedItem> getFailedItemsForRetry(String itemType) {
List<BatchJobFailedItem> failedItems = failedItemRepository.findByTypeAndStatus(itemType,
BatchJobFailedItemStatus.RETRY_PENDING);
List<BatchJobFailedItem> failedItems = failedItemRepository.findByTypeAndStatusOrderByLastRetryTimestampAsc(
itemType, BatchJobFailedItemStatus.RETRY_PENDING, Pageable.ofSize(getMaxNumberOfFailedItems()));

Set<String> itemsBeingProcessedIds = batchJobTrackingService.getItemsBeingProcessedOrEnquedToProcess(itemType)
.stream().map(BatchJobItemTrackInfoEntity::getItemId).collect(Collectors.toSet());
Expand All @@ -112,7 +118,18 @@ public List<BatchJobFailedItem> getFailedItems(String itemType, BatchJobFailedIt

@Override
public <T extends BatchJobItem<?>> void checkUpdatedFailedItems(Collection<T> extractedItems) {
// Nothing to do

//@formatter:off
extractedItems.stream()
.map(batchJobItem -> new BatchJobFailedItemId(batchJobItem.getItemId(), batchJobItem.getItemType()))
.map(failedItemRepository::findById)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(batchJobFailedItem -> {
batchJobFailedItem.setNumberOfRetries(0);
failedItemRepository.save(batchJobFailedItem);
});
//@formatter:on
}

private boolean shouldRetryFailedItem(BatchJobFailedItem item) {
Expand All @@ -138,4 +155,8 @@ private BatchJobFailedItem updateFailedItem(BatchJobFailedItem failedItem) {
return failedItem;
}

protected int getMaxNumberOfFailedItems() {
return maxNumberOfFailedItems;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.paypal.infrastructure.batchjob;

/**
* BatchJobs will use classes implementing this interface for enriching items before
* processing them.
*
* @param <C> the job context type.
* @param <T> the item type.
*/
public interface BatchJobItemEnricher<C extends BatchJobContext, T extends BatchJobItem<?>> {

/**
* Enrichs the information of an item.
* @param ctx the batch job context.
* @param jobItem the item to be processed.
* @return the enriched item.
*/
T enrichItem(C ctx, T jobItem);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.paypal.infrastructure.batchjob;

import lombok.Builder;
import lombok.Value;

import java.util.Optional;

/**
* This class holds the result of a batch job item validation.
*/
@Value
@Builder
public class BatchJobItemValidationResult {

private BatchJobItemValidationStatus status;

@Builder.Default
private Optional<String> reason = Optional.empty();

}
Loading

0 comments on commit 566b065

Please sign in to comment.