Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Jan 12, 2024
1 parent 09bfed1 commit e5a234a
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 237 deletions.
9 changes: 8 additions & 1 deletion java/food-ordering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ It also interacts with the delivery services to get the order delivered to the c

## Running locally with Docker compose

Build the docker containers:

```shell
cd app
./gradlew clean build jibDockerBuild
```

Launch the Docker compose setup:
```shell
docker compose up
Expand Down Expand Up @@ -90,4 +97,4 @@ Then run the example via Docker compose.

### Upgrading Restate runtime
The Docker Compose setup uses the latest Restate runtime version.
Test run the example via Docker compose.
Test run the example via Docker compose.
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package dev.restate.sdk.examples;

import static dev.restate.sdk.examples.generated.OrderProto.*;
import static dev.restate.sdk.examples.utils.TypeUtils.statusToProto;

import dev.restate.sdk.RestateContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.examples.generated.*;
import dev.restate.sdk.examples.generated.DeliveryManagerRestate;
import dev.restate.sdk.examples.generated.DriverDeliveryMatcherRestate;
import dev.restate.sdk.examples.generated.DriverDigitalTwinRestate;
import dev.restate.sdk.examples.generated.OrderStatusServiceRestate;
import dev.restate.sdk.examples.types.DeliveryInformation;
import dev.restate.sdk.examples.types.Location;
import dev.restate.sdk.examples.types.Status;
import dev.restate.sdk.examples.types.StatusEnum;
import dev.restate.sdk.examples.utils.GeoUtils;
import dev.restate.sdk.serde.jackson.JacksonSerdes;

Expand All @@ -23,19 +28,18 @@ public class DeliveryManager extends DeliveryManagerRestate.DeliveryManagerResta
StateKey<DeliveryInformation> DELIVERY_INFO =
StateKey.of("delivery-info", JacksonSerdes.of(DeliveryInformation.class));

private static final Serde<Location> locationSerde = JacksonSerdes.of(Location.class);

/**
* Finds a driver, assigns the delivery job to the driver, and updates the status of the order.
* Gets called by the OrderService when a new order has been prepared and needs to be delivered.
*/
@Override
public void start(RestateContext ctx, OrderProto.DeliveryRequest request)
throws TerminalException {
public void start(RestateContext ctx, DeliveryRequest request) throws TerminalException {

// Temporary placeholder: random location
var restaurantLocation =
ctx.sideEffect(JacksonSerdes.of(Location.class), () -> GeoUtils.randomLocation());
var customerLocation =
ctx.sideEffect(JacksonSerdes.of(Location.class), () -> GeoUtils.randomLocation());
var restaurantLocation = ctx.sideEffect(locationSerde, () -> GeoUtils.randomLocation());
var customerLocation = ctx.sideEffect(locationSerde, () -> GeoUtils.randomLocation());

// Store the delivery information in Restate's state store
DeliveryInformation deliveryInfo =
Expand All @@ -53,7 +57,7 @@ public void start(RestateContext ctx, OrderProto.DeliveryRequest request)
DriverDeliveryMatcherRestate.newClient(ctx)
.oneWay()
.requestDriverForDelivery(
OrderProto.DeliveryCallback.newBuilder()
DeliveryCallback.newBuilder()
.setRegion(GeoUtils.DEMO_REGION)
.setDeliveryCallbackId(driverAwakeable.id())
.build());
Expand All @@ -65,7 +69,7 @@ public void start(RestateContext ctx, OrderProto.DeliveryRequest request)
// Assign the driver to the job
DriverDigitalTwinRestate.newClient(ctx)
.assignDeliveryJob(
OrderProto.AssignDeliveryRequest.newBuilder()
AssignDeliveryRequest.newBuilder()
.setDriverId(driverId)
.setOrderId(request.getOrderId())
.setRestaurantId(request.getRestaurantId())
Expand All @@ -77,16 +81,15 @@ public void start(RestateContext ctx, OrderProto.DeliveryRequest request)
// Update the status of the order to "waiting for the driver"
OrderStatusServiceRestate.newClient(ctx)
.oneWay()
.setStatus(statusToProto(request.getOrderId(), Status.WAITING_FOR_DRIVER));
.setStatus(statusToProto(request.getOrderId(), StatusEnum.WAITING_FOR_DRIVER));
}

/**
* Notifies that the delivery was picked up. Gets called by the DriverService.NotifyDeliveryPickup
* when the driver has arrived at the restaurant.
*/
@Override
public void notifyDeliveryPickup(RestateContext ctx, OrderProto.OrderId request)
throws TerminalException {
public void notifyDeliveryPickup(RestateContext ctx, OrderId request) throws TerminalException {
// Retrieve the delivery information for this delivery
var delivery =
ctx.get(DELIVERY_INFO)
Expand All @@ -95,21 +98,21 @@ public void notifyDeliveryPickup(RestateContext ctx, OrderProto.OrderId request)
new TerminalException(
"Delivery was picked up but there is no ongoing delivery."));
// Update the status of the delivery to "picked up"
delivery.setOrderPickedUp(true);
delivery.notifyPickup();
ctx.set(DELIVERY_INFO, delivery);

// Update the status of the order to "in delivery"
OrderStatusServiceRestate.newClient(ctx)
.oneWay()
.setStatus(statusToProto(delivery.getOrderId(), Status.IN_DELIVERY));
.setStatus(statusToProto(delivery.getOrderId(), StatusEnum.IN_DELIVERY));
}

/**
* Notifies that the order was delivered. Gets called by the DriverService.NotifyDeliveryDelivered
* when the driver has delivered the order to the customer.
*/
@Override
public void notifyDeliveryDelivered(RestateContext ctx, OrderProto.OrderId request)
public void notifyDeliveryDelivered(RestateContext ctx, OrderId request)
throws TerminalException {
// Retrieve the delivery information for this delivery
var delivery =
Expand All @@ -131,8 +134,8 @@ public void notifyDeliveryDelivered(RestateContext ctx, OrderProto.OrderId reque
* has moved to a new location.
*/
@Override
public void handleDriverLocationUpdate(
RestateContext ctx, OrderProto.DeliveryLocationUpdate request) throws TerminalException {
public void handleDriverLocationUpdate(RestateContext ctx, DeliveryLocationUpdate request)
throws TerminalException {
// Retrieve the delivery information for this delivery
var delivery =
ctx.get(DELIVERY_INFO)
Expand All @@ -153,10 +156,6 @@ public void handleDriverLocationUpdate(
// Update the ETA of the order
OrderStatusServiceRestate.newClient(ctx)
.oneWay()
.setETA(
OrderProto.OrderStatus.newBuilder()
.setOrderId(delivery.getOrderId())
.setEta(eta)
.build());
.setETA(OrderStatus.newBuilder().setOrderId(delivery.getOrderId()).setEta(eta).build());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package dev.restate.sdk.examples;

import static dev.restate.sdk.examples.generated.OrderProto.*;

import com.fasterxml.jackson.core.type.TypeReference;
import dev.restate.sdk.RestateContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.examples.generated.DriverDeliveryMatcherRestate;
import dev.restate.sdk.examples.generated.OrderProto;
import dev.restate.sdk.serde.jackson.JacksonSerdes;
import java.util.LinkedList;
import java.util.Queue;
Expand All @@ -32,8 +33,7 @@ public class DriverDeliveryMatcher
* in line. If no pending deliveries, driver is added to the available driver pool
*/
@Override
public void setDriverAvailable(
RestateContext ctx, OrderProto.DriverPoolAvailableNotification request)
public void setDriverAvailable(RestateContext ctx, DriverPoolAvailableNotification request)
throws TerminalException {
var pendingDeliveries = ctx.get(PENDING_DELIVERIES).orElse(new LinkedList<>());

Expand All @@ -58,7 +58,7 @@ public void setDriverAvailable(
* available. If no available drivers, the delivery is added to the pending deliveries queue
*/
@Override
public void requestDriverForDelivery(RestateContext ctx, OrderProto.DeliveryCallback request)
public void requestDriverForDelivery(RestateContext ctx, DeliveryCallback request)
throws TerminalException {
var availableDrivers = ctx.get(AVAILABLE_DRIVERS).orElse(new LinkedList<>());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.restate.sdk.examples;

import static dev.restate.sdk.examples.generated.OrderProto.*;
import static dev.restate.sdk.examples.utils.TypeUtils.toOrderIdProto;

import com.google.protobuf.Empty;
Expand All @@ -9,17 +10,15 @@
import dev.restate.sdk.examples.generated.DeliveryManagerRestate;
import dev.restate.sdk.examples.generated.DriverDeliveryMatcherRestate;
import dev.restate.sdk.examples.generated.DriverDigitalTwinRestate;
import dev.restate.sdk.examples.generated.OrderProto;
import dev.restate.sdk.examples.generated.OrderProto.AssignDeliveryRequest;
import dev.restate.sdk.examples.types.AssignedDelivery;
import dev.restate.sdk.examples.types.DriverStatus;
import dev.restate.sdk.examples.types.Location;
import dev.restate.sdk.serde.jackson.JacksonSerdes;

/**
* Digital twin for the driver. Represents a driver and his status, assigned delivery, and location.
* Keyed by driver ID. The actual driver would have an application (mocked by DriverMobileAppSimulator
* ) that calls this service.
* Keyed by driver ID. The actual driver would have an application (mocked by
* DriverMobileAppSimulator ) that calls this service.
*/
public class DriverDigitalTwin extends DriverDigitalTwinRestate.DriverDigitalTwinRestateImplBase {

Expand All @@ -36,19 +35,19 @@ public class DriverDigitalTwin extends DriverDigitalTwinRestate.DriverDigitalTwi
StateKey.of("driver-location", JacksonSerdes.of(Location.class));

/**
* When the driver starts his work day or finishes a delivery, his application (DriverMobileAppSimulator)
* calls this method.
* When the driver starts his work day or finishes a delivery, his application
* (DriverMobileAppSimulator) calls this method.
*/
@Override
public void setDriverAvailable(RestateContext ctx, OrderProto.DriverAvailableNotification request)
public void setDriverAvailable(RestateContext ctx, DriverAvailableNotification request)
throws TerminalException {
expectStatus(ctx, DriverStatus.IDLE);

ctx.set(DRIVER_STATUS, DriverStatus.WAITING_FOR_WORK);
DriverDeliveryMatcherRestate.newClient(ctx)
.oneWay()
.setDriverAvailable(
OrderProto.DriverPoolAvailableNotification.newBuilder()
DriverPoolAvailableNotification.newBuilder()
.setRegion(request.getRegion())
.setDriverId(request.getDriverId())
.build());
Expand Down Expand Up @@ -82,7 +81,7 @@ public void assignDeliveryJob(RestateContext ctx, AssignDeliveryRequest request)
DeliveryManagerRestate.newClient(ctx)
.oneWay()
.handleDriverLocationUpdate(
OrderProto.DeliveryLocationUpdate.newBuilder()
DeliveryLocationUpdate.newBuilder()
.setOrderId(request.getOrderId())
.setLocation(loc.toProto())
.build()));
Expand All @@ -92,8 +91,7 @@ public void assignDeliveryJob(RestateContext ctx, AssignDeliveryRequest request)
* Gets called by the driver's mobile app when he has picked up the delivery from the restaurant.
*/
@Override
public void notifyDeliveryPickup(RestateContext ctx, OrderProto.DriverId request)
throws TerminalException {
public void notifyDeliveryPickup(RestateContext ctx, DriverId request) throws TerminalException {
expectStatus(ctx, DriverStatus.DELIVERING);

// Retrieve the ongoing delivery and update its status
Expand All @@ -109,14 +107,12 @@ public void notifyDeliveryPickup(RestateContext ctx, OrderProto.DriverId request
// Update the status of the delivery in the delivery manager
DeliveryManagerRestate.newClient(ctx)
.oneWay()
.notifyDeliveryPickup(toOrderIdProto(currentDelivery.orderId));
.notifyDeliveryPickup(toOrderIdProto(currentDelivery.getOrderId()));
}

/**
* Gets called by the driver's mobile app when he has delivered the order to the customer.
*/
/** Gets called by the driver's mobile app when he has delivered the order to the customer. */
@Override
public void notifyDeliveryDelivered(RestateContext ctx, OrderProto.DriverId request)
public void notifyDeliveryDelivered(RestateContext ctx, DriverId request)
throws TerminalException {
expectStatus(ctx, DriverStatus.DELIVERING);

Expand All @@ -133,16 +129,16 @@ public void notifyDeliveryDelivered(RestateContext ctx, OrderProto.DriverId requ
// Notify the delivery service that the delivery was delivered
DeliveryManagerRestate.newClient(ctx)
.oneWay()
.notifyDeliveryDelivered(toOrderIdProto(assignedDelivery.orderId));
.notifyDeliveryDelivered(toOrderIdProto(assignedDelivery.getOrderId()));

// Update the status of the driver to idle
ctx.set(DRIVER_STATUS, DriverStatus.IDLE);
}

/** Gets called by the driver's mobile app when he has moved to a new location. */
@Override
public void handleDriverLocationUpdateEvent(
RestateContext ctx, OrderProto.KafkaDriverLocationEvent request) throws TerminalException {
public void handleDriverLocationUpdateEvent(RestateContext ctx, KafkaDriverLocationEvent request)
throws TerminalException {
// Update the location of the driver
Location location = JacksonSerdes.of(Location.class).deserialize(request.getLocation());
ctx.set(DRIVER_LOCATION, location);
Expand All @@ -154,8 +150,8 @@ public void handleDriverLocationUpdateEvent(
DeliveryManagerRestate.newClient(ctx)
.oneWay()
.handleDriverLocationUpdate(
OrderProto.DeliveryLocationUpdate.newBuilder()
.setOrderId(delivery.orderId)
DeliveryLocationUpdate.newBuilder()
.setOrderId(delivery.getOrderId())
.setLocation(location.toProto())
.build()));
}
Expand All @@ -166,27 +162,24 @@ public void handleDriverLocationUpdateEvent(
* got assigned to him.
*/
@Override
public OrderProto.AssignedDeliveryResponse getAssignedDelivery(
RestateContext ctx, OrderProto.DriverId request) throws TerminalException {
public AssignedDeliveryResponse getAssignedDelivery(RestateContext ctx, DriverId request)
throws TerminalException {
var assignedDelivery = ctx.get(ASSIGNED_DELIVERY);

return assignedDelivery
.map(
delivery ->
OrderProto.AssignedDeliveryResponse.newBuilder()
AssignedDeliveryResponse.newBuilder()
.setDelivery(
OrderProto.Delivery.newBuilder()
.setDriverId(delivery.driverId)
.setOrderId(delivery.orderId)
.setRestaurantId(delivery.restaurantId)
.setCustomerLocation(delivery.customerLocation.toProto())
.setRestaurantLocation(delivery.restaurantLocation.toProto())
Delivery.newBuilder()
.setDriverId(delivery.getDriverId())
.setOrderId(delivery.getOrderId())
.setRestaurantId(delivery.getRestaurantId())
.setCustomerLocation(delivery.getCustomerLocation().toProto())
.setRestaurantLocation(delivery.getRestaurantLocation().toProto())
.build())
.build())
.orElse(
OrderProto.AssignedDeliveryResponse.newBuilder()
.setEmpty(Empty.getDefaultInstance())
.build());
.orElse(AssignedDeliveryResponse.newBuilder().setEmpty(Empty.getDefaultInstance()).build());
}

// Utility function to check if the driver is in the expected state
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package dev.restate.sdk.examples;

import static dev.restate.sdk.examples.generated.OrderProto.*;

import dev.restate.sdk.RestateContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.examples.generated.OrderProto;
import dev.restate.sdk.examples.generated.OrderStatusServiceRestate;
import dev.restate.sdk.examples.types.Status;
import dev.restate.sdk.examples.types.StatusEnum;

public class OrderStatusService
extends OrderStatusServiceRestate.OrderStatusServiceRestateImplBase {
Expand All @@ -15,26 +16,24 @@ public class OrderStatusService

/** Gets called by the webUI frontend to display the status of an order. */
@Override
public OrderProto.OrderStatus get(RestateContext ctx, OrderProto.OrderId request)
throws TerminalException {
public OrderStatus get(RestateContext ctx, OrderId request) throws TerminalException {
var orderStatusState = ctx.get(ORDER_STATUS).orElse("NEW");
var status = Status.valueOf(orderStatusState);
var status = StatusEnum.valueOf(orderStatusState);
var eta = ctx.get(ORDER_ETA).orElse(-1L);
return OrderProto.OrderStatus.newBuilder()
return OrderStatus.newBuilder()
.setOrderId(request.getOrderId())
.setStatus(OrderProto.Status.forNumber(status.getValue()))
.setStatus(Status.forNumber(status.getValue()))
.setEta(eta)
.build();
}

@Override
public void setStatus(RestateContext ctx, OrderProto.OrderStatus request)
throws TerminalException {
public void setStatus(RestateContext ctx, OrderStatus request) throws TerminalException {
ctx.set(ORDER_STATUS, request.getStatus().name());
}

@Override
public void setETA(RestateContext ctx, OrderProto.OrderStatus request) throws TerminalException {
public void setETA(RestateContext ctx, OrderStatus request) throws TerminalException {
ctx.set(ORDER_ETA, request.getEta());
}
}
Loading

0 comments on commit e5a234a

Please sign in to comment.