Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Jan 12, 2024
1 parent 67208a1 commit af65436
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

public class OrderStatusService
extends OrderStatusServiceRestate.OrderStatusServiceRestateImplBase {
private static final StateKey<String> ORDER_STATUS = StateKey.of("order-status", CoreSerdes.STRING_UTF8);
private static final StateKey<String> ORDER_STATUS =
StateKey.of("order-status", CoreSerdes.STRING_UTF8);
private static final StateKey<Long> ORDER_ETA = StateKey.of("order-eta", CoreSerdes.LONG);

/** Gets called by the webUI frontend to display the status of an order. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,52 +32,52 @@ public void handleOrderCreationEvent(RestateContext ctx, KafkaOrderEvent event)
throws TerminalException {
var orderStatusSend = OrderStatusServiceRestate.newClient(ctx);

ObjectMapper mapper = new ObjectMapper();
OrderRequest order;
try {
ObjectMapper mapper = new ObjectMapper();
OrderRequest order = mapper.readValue(event.getPayload().toStringUtf8(), OrderRequest.class);
String id = order.getOrderId();

// 1. Set status
orderStatusSend.oneWay().setStatus(statusToProto(id, StatusEnum.CREATED));
order = mapper.readValue(event.getPayload().toStringUtf8(), OrderRequest.class);
} catch (JsonProcessingException e) {
throw new TerminalException("Parsing raw JSON order failed: " + e.getMessage());
}
String id = order.getOrderId();

// 2. Handle payment
String token = ctx.sideEffect(CoreSerdes.STRING_UTF8, () -> UUID.randomUUID().toString());
boolean paid =
ctx.sideEffect(
CoreSerdes.BOOLEAN, () -> paymentClnt.charge(id, token, order.getTotalCost()));
// 1. Set status
orderStatusSend.oneWay().setStatus(statusToProto(id, StatusEnum.CREATED));

if (!paid) {
orderStatusSend.oneWay().setStatus(statusToProto(id, StatusEnum.REJECTED));
return;
}
// 2. Handle payment
String token = ctx.sideEffect(CoreSerdes.STRING_UTF8, () -> UUID.randomUUID().toString());
boolean paid =
ctx.sideEffect(
CoreSerdes.BOOLEAN, () -> paymentClnt.charge(id, token, order.getTotalCost()));

// 3. Schedule preparation
orderStatusSend.setStatus(statusToProto(order.getOrderId(), StatusEnum.SCHEDULED));
ctx.sleep(Duration.ofMillis(order.getDeliveryDelay()));
if (!paid) {
orderStatusSend.oneWay().setStatus(statusToProto(id, StatusEnum.REJECTED));
return;
}

// 4. Trigger preparation
var preparationAwakeable = ctx.awakeable(CoreSerdes.VOID);
ctx.sideEffect(() -> restaurant.prepare(id, preparationAwakeable.id()));
orderStatusSend.setStatus(statusToProto(id, StatusEnum.IN_PREPARATION));
// 3. Schedule preparation
orderStatusSend.setStatus(statusToProto(order.getOrderId(), StatusEnum.SCHEDULED));
ctx.sleep(Duration.ofMillis(order.getDeliveryDelay()));

preparationAwakeable.await();
orderStatusSend.setStatus(statusToProto(id, StatusEnum.SCHEDULING_DELIVERY));
// 4. Trigger preparation
var preparationAwakeable = ctx.awakeable(CoreSerdes.VOID);
ctx.sideEffect(() -> restaurant.prepare(id, preparationAwakeable.id()));
orderStatusSend.setStatus(statusToProto(id, StatusEnum.IN_PREPARATION));

// 5. Find a driver and start delivery
var deliveryAwakeable = ctx.awakeable(CoreSerdes.VOID);
preparationAwakeable.await();
orderStatusSend.setStatus(statusToProto(id, StatusEnum.SCHEDULING_DELIVERY));

var deliveryRequest =
DeliveryRequest.newBuilder()
.setOrderId(id)
.setRestaurantId(order.getRestaurantId())
.setCallback(deliveryAwakeable.id())
.build();
DeliveryManagerRestate.newClient(ctx).oneWay().start(deliveryRequest);
deliveryAwakeable.await();
orderStatusSend.setStatus(statusToProto(order.getOrderId(), StatusEnum.DELIVERED));
// 5. Find a driver and start delivery
var deliveryAwakeable = ctx.awakeable(CoreSerdes.VOID);

} catch (JsonProcessingException e) {
throw new TerminalException("Parsing raw JSON order failed: " + e.getMessage());
}
var deliveryRequest =
DeliveryRequest.newBuilder()
.setOrderId(id)
.setRestaurantId(order.getRestaurantId())
.setCallback(deliveryAwakeable.id())
.build();
DeliveryManagerRestate.newClient(ctx).oneWay().start(deliveryRequest);
deliveryAwakeable.await();
orderStatusSend.setStatus(statusToProto(order.getOrderId(), StatusEnum.DELIVERED));
}
}

0 comments on commit af65436

Please sign in to comment.