Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added compensation tx #29

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package com.nashtech.common.event;

import com.nashtech.common.model.PaymentStatus;
import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class PaymentCancelledEvent {
String paymentId;
String orderId;
String paymentId;
Integer quantity;
String userId;
String reasonToFailed;
String productId;
PaymentStatus paymentStatus;

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,5 @@ public class ProductReserveCancelledEvent {
String orderId;
String userId;
Integer quantity;
String reasonToFailed;

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ public InventoryAggregate(CreateProductCommand createProductCommand) {
@CommandHandler
public void handle(ReserveProductCommand reserveProductCommand) {
log.info("ReserveProductCommand started with productId {}",reserveProductCommand.getProductId());
if(quantity < reserveProductCommand.getQuantity()) {
if(quantity<=0 || quantity < reserveProductCommand.getQuantity()) {
log.warn("Current stock is {} of the product {}", quantity, reserveProductCommand.getProductId());
ProductReserveFailedEvent productFailedEvent = ProductReserveFailedEvent.builder()
.productId(reserveProductCommand.getProductId())
.userId(reserveProductCommand.getUserId())
.orderId(reserveProductCommand.getOrderId())
.reasonToFailed("Insufficient number of items in stock").build();
.reasonToFailed("Insufficient number of items in stock for product "+reserveProductCommand.getProductId())
.build();
AggregateLifecycle.apply(productFailedEvent);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.nashtech.inventory.events.ProductCreatedEvent;
import com.nashtech.inventory.repository.ProductEntity;
import com.nashtech.inventory.repository.ProductsRepository;
import com.nashtech.inventory.repository.ProductsSoldEntity;
import com.nashtech.inventory.repository.ProductsSoldRepository;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
Expand All @@ -22,11 +20,9 @@
public class ProductEventsHandler {

private final ProductsRepository productsRepository;
private final ProductsSoldRepository productsSoldRepository;

public ProductEventsHandler(ProductsRepository productsRepository, ProductsSoldRepository productsSoldRepository) {
public ProductEventsHandler(ProductsRepository productsRepository) {
this.productsRepository = productsRepository;
this.productsSoldRepository = productsSoldRepository;
}

@EventHandler
Expand All @@ -42,33 +38,21 @@ public void on(ProductReservedEvent productReservedEvent) {
ProductEntity productEntity = productsRepository.findByProductId(productReservedEvent.getProductId());
productEntity.setQuantity(productEntity.getQuantity() - productReservedEvent.getQuantity());
productsRepository.save(productEntity);

ProductsSoldEntity soldProduct = new ProductsSoldEntity(productEntity.getProductId(), productReservedEvent.getOrderId(),
productReservedEvent.getUserId(), productReservedEvent.getQuantity(),productReservedEvent.getSubTotal(),
productReservedEvent.getTotal(),productReservedEvent.getTax());
productsSoldRepository.save(soldProduct);
}

@EventHandler
public void on(ProductReserveCancelledEvent productReservationCancelledEvent) {
log.info("ProductReservationCancelledEvent: Reversing product {} quantity {}",
log.info("ProductReservationCancelledEvent: Revert product {} quantity {}",
productReservationCancelledEvent.getQuantity(), productReservationCancelledEvent.getProductId());

ProductEntity currentlyStoredProductEntity = productsRepository.findByProductId(productReservationCancelledEvent.getOrderId());
ProductEntity currentlyStoredProductEntity = productsRepository.findByProductId(productReservationCancelledEvent.getProductId());
currentlyStoredProductEntity.setQuantity(currentlyStoredProductEntity.getQuantity() +
productReservationCancelledEvent.getQuantity());

productsRepository.save(currentlyStoredProductEntity);

ProductsSoldEntity currentlyStoredSoldProduct =
productsSoldRepository.findByProductIdAndOrderId(productReservationCancelledEvent.getProductId(),
productReservationCancelledEvent.getOrderId());
if (currentlyStoredSoldProduct != null) {
productsSoldRepository.delete(currentlyStoredSoldProduct);
}
}

@ExceptionHandler(resultType = Exception.class)
@ExceptionHandler
public void handle(Exception exception) throws Exception {
throw exception;
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jakarta.persistence.Table;
import lombok.Data;

import java.util.Date;


@Entity
Expand All @@ -21,5 +22,6 @@ public class ProductEntity {
private Double basePrice;
private Integer quantity;
private Float tax;
private Date timestamp = new Date();

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ public void on(OrderApprovedEvent event) {
public void handle(RejectOrderCommand rejectOrderCommand) {
OrderCancelledEvent orderCancelledEvent = OrderCancelledEvent.builder()
.orderId(rejectOrderCommand.getOrderId())
.productId(rejectOrderCommand.getProductId())
.paymentId(rejectOrderCommand.getPaymentId())
.shipmentId(rejectOrderCommand.getShipmentId())
.userId(rejectOrderCommand.getUserId())
.reasonToFailed(rejectOrderCommand.getReasonToFailed())
.orderStatus(rejectOrderCommand.getOrderStatus())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class OrderCancelledEvent {
String productId;
String paymentId;
String shipmentId;
String userId;
String reasonToFailed;
OrderStatus orderStatus;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ public class CompensateOrder {
private String paymentId;
private String shipmentId;
private String reasonToFailed;
private String errorMessage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ public void on(OrderCancelledEvent event) {
log.error("Order failed status did not persist {}",event.getOrderId());
return;
}
FailedOrderEntity order = orderOptional.get();
order.setProductId(event.getProductId());
order.setPaymentId(event.getPaymentId());
order.setShipmentId(event.getShipmentId());
order.setOrderStatus(event.getOrderStatus().toString());
order.setReasonToFailed(event.getReasonToFailed());
failedOrderRepository.save(order);
FailedOrderEntity failedOrder = orderOptional.get();
failedOrder.setProductId(event.getProductId());
failedOrder.setPaymentId(event.getPaymentId());
failedOrder.setShipmentId(event.getShipmentId());
failedOrder.setUserId(event.getUserId());
failedOrder.setOrderStatus(event.getOrderStatus().toString());
failedOrder.setReasonToFailed(event.getReasonToFailed());
failedOrderRepository.save(failedOrder);
}

@ExceptionHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.nashtech.order.query.FindOrderQuery;
import com.nashtech.order.restapi.response.OrderSummary;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.modelling.saga.EndSaga;
import org.axonframework.modelling.saga.SagaEventHandler;
Expand Down Expand Up @@ -57,9 +58,9 @@ private void handle(OrderCreatedEvent orderCreatedEvent) {
compensateOrder.setOrderId(reserveProductCommand.getOrderId());
compensateOrder.setProductId(orderCreatedEvent.getProductId());
compensateOrder.setUserId(orderCreatedEvent.getUserId());
compensateOrder.setReasonToFailed(OderFailure.INVENTORY_SERVICE_NOT_AVAILABLE.toString());
compensateOrder.setErrorMessage(commandResultMessage.exceptionResult().getMessage());
handleCompensatingTransaction(compensateOrder);
compensateOrder.setReasonToFailed(simplifyErrorMessage(commandResultMessage,
OderFailure.INVENTORY_SERVICE_NOT_AVAILABLE));
orderRejectedCommand(compensateOrder);
}
});
}
Expand Down Expand Up @@ -91,36 +92,32 @@ public void handle(ProductReservedEvent productReservedEvent) {
compensateOrder.setProductId(productReservedEvent.getProductId());
compensateOrder.setUserId(productReservedEvent.getUserId());
compensateOrder.setPaymentId(processPaymentCommand.getPaymentId());
compensateOrder.setReasonToFailed(OderFailure.PAYMENT_SERVICE_NOT_AVAILABLE.toString());
compensateOrder.setErrorMessage(commandResultMessage.exceptionResult().getMessage());
handleCompensatingTransaction(compensateOrder);
compensateOrder.setReasonToFailed(simplifyErrorMessage(commandResultMessage,
OderFailure.PAYMENT_SERVICE_NOT_AVAILABLE));

CancelProductReserveCommand cancelProductReserveCommand = CancelProductReserveCommand.builder()
.productId(productReservedEvent.getProductId())
.userId(productReservedEvent.getUserId())
.orderId(productReservedEvent.getOrderId())
.quantity(productReservedEvent.getQuantity())
.build();
commandGateway.send(cancelProductReserveCommand);

orderRejectedCommand(compensateOrder);
}
});
}

@SagaEventHandler(associationProperty = "orderId")
private void handle(ProductReserveCancelledEvent productReserveCancelledEvent) {
log.info("ProductReserveCancelledEvent started for orderId : {}", productReserveCancelledEvent.getOrderId());
// Start the compensating transaction
CompensateOrder compensateOrder = new CompensateOrder();
compensateOrder.setOrderId(productReserveCancelledEvent.getOrderId());
compensateOrder.setProductId(productReserveCancelledEvent.getProductId());
compensateOrder.setUserId(productReserveCancelledEvent.getUserId());
compensateOrder.setReasonToFailed(productReserveCancelledEvent.getReasonToFailed());

handleCompensatingTransaction(compensateOrder);
}
@SagaEventHandler(associationProperty = "orderId")
private void handle(ProductReserveFailedEvent productReserveFailedEvent) {
log.info("ProductReserveFailedEvent started for productId : {}", productReserveFailedEvent.getProductId());
// Start the compensating transaction
log.info("ProductReserveFailedEvent started for orderId : {}", productReserveFailedEvent.getOrderId());
CompensateOrder compensateOrder = new CompensateOrder();
compensateOrder.setOrderId(productReserveFailedEvent.getOrderId());
compensateOrder.setProductId(productReserveFailedEvent.getProductId());
compensateOrder.setUserId(productReserveFailedEvent.getUserId());
compensateOrder.setReasonToFailed(productReserveFailedEvent.getReasonToFailed());

handleCompensatingTransaction(compensateOrder);
orderRejectedCommand(compensateOrder);
}

@SagaEventHandler(associationProperty = "orderId")
Expand Down Expand Up @@ -152,9 +149,10 @@ private void handle(PaymentApprovedEvent paymentApprovedEvent) {
compensateOrder.setUserId(paymentApprovedEvent.getUser().getUserId());
compensateOrder.setPaymentId(paymentApprovedEvent.getPaymentId());
compensateOrder.setShipmentId(createShipmentCommand.getShipmentId());
compensateOrder.setReasonToFailed(OderFailure.SHIPMENT_SERVICE_NOT_AVAILABLE.toString());
compensateOrder.setErrorMessage(commandResultMessage.exceptionResult().getMessage());
handleCompensatingTransaction(compensateOrder);
compensateOrder.setReasonToFailed(simplifyErrorMessage(commandResultMessage,
OderFailure.SHIPMENT_SERVICE_NOT_AVAILABLE));

orderRejectedCommand(compensateOrder);
}
});
}
Expand All @@ -178,7 +176,8 @@ private void handle(PaymentCancelledEvent paymentCancelledEvent) {
compensateOrder.setPaymentId(paymentCancelledEvent.getPaymentId());
compensateOrder.setUserId(paymentCancelledEvent.getUserId());
compensateOrder.setReasonToFailed(paymentCancelledEvent.getReasonToFailed());
handleCompensatingTransaction(compensateOrder);

orderRejectedCommand(compensateOrder);
}

@SagaEventHandler(associationProperty = "orderId")
Expand Down Expand Up @@ -224,7 +223,8 @@ private void handle(ShipmentCancelledEvent shipmentCancelledEvent) {
compensateOrder.setShipmentId(shipmentCancelledEvent.getShipmentId());
compensateOrder.setUserId(shipmentCancelledEvent.getUserId());
compensateOrder.setReasonToFailed(shipmentCancelledEvent.getReasonToFailed());
handleCompensatingTransaction(compensateOrder);

orderRejectedCommand(compensateOrder);
}

@SagaEventHandler(associationProperty = "orderId")
Expand Down Expand Up @@ -252,11 +252,11 @@ public void handle(OrderCancelledEvent event) {
queryUpdateEmitter.emit(FindOrderQuery.class, query -> true, orderSummary);
}

private void orderRejectedCommand(CompensateOrder compensateOrder,String reason) {
private void orderRejectedCommand(CompensateOrder compensateOrder) {
RejectOrderCommand rejectOrderCommand = RejectOrderCommand.builder()
.orderId(compensateOrder.getOrderId())
.userId(compensateOrder.getUserId())
.reasonToFailed(reason)
.reasonToFailed(compensateOrder.getReasonToFailed())
.paymentId(compensateOrder.getPaymentId())
.shipmentId(compensateOrder.getShipmentId())
.orderStatus(OrderStatus.ORDER_NOT_APPROVED)
Expand All @@ -265,14 +265,9 @@ private void orderRejectedCommand(CompensateOrder compensateOrder,String reason)
commandGateway.send(rejectOrderCommand);
}

private void handleCompensatingTransaction(CompensateOrder compensateOrder) {
log.error("The resulted is exception {} . Initiating a compensating transaction", compensateOrder.getErrorMessage());
if (compensateOrder.getErrorMessage() != null &&
compensateOrder.getErrorMessage().contains("No Handler for command:")) {
orderRejectedCommand(compensateOrder, compensateOrder.getErrorMessage());
} else {
orderRejectedCommand(compensateOrder,compensateOrder.getReasonToFailed());
}
private String simplifyErrorMessage(CommandResultMessage commandResultMessage, OderFailure errorMessage) {
return commandResultMessage.exceptionResult().getMessage() != null
&& commandResultMessage.exceptionResult().getMessage().contains("No Handler for command:")?
errorMessage.toString():commandResultMessage.exceptionResult().getMessage();
}

}
Loading