Skip to content

Commit

Permalink
Move Java patterns together
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 19, 2024
1 parent 41a5cc3 commit 1bfc3a7
Show file tree
Hide file tree
Showing 51 changed files with 1,723 additions and 96 deletions.
11 changes: 0 additions & 11 deletions java/patterns-use-cases/async-tasks-parallelize-work/README.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import my.example.types.PaymentRequest;
import my.example.utils.PaymentUtils;
import my.example.utils.StripeUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -53,6 +52,8 @@ public class PaymentService {
intent -> intent.toJson().getBytes(),
bytes -> ApiResource.GSON.fromJson(new String(bytes), PaymentIntent.class));

public record PaymentRequest(Long amount, String paymentMethodId, boolean delayedStatus) {}

@Handler
public void processPayment(Context ctx, PaymentRequest request) {
PaymentUtils.verifyPaymentRequest(request);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import my.example.types.SocialMediaPost;

import java.time.Duration;

Expand All @@ -16,6 +15,8 @@
@VirtualObject
public class UserFeed {

public record SocialMediaPost(String content, String metadata) {}

// Connect a handler to a Kafka topic. Restate manages the Kafka subscription and offsets.
// Events are pushed in order to the Virtual Object (Kafka key = object key).
@Handler
Expand Down

This file was deleted.

Empty file.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
@Workflow
public class BookingWorkflow {

public record BookingRequest(
Flights.FlightBookingRequest flights,
CarRentals.CarRentalRequest car,
PaymentClient.PaymentInfo paymentInfo
) {}

@Workflow
public void run(WorkflowContext ctx, BookingRequest req) throws TerminalException {
// create a list of undo actions
Expand Down Expand Up @@ -78,4 +84,12 @@ public void run(WorkflowContext ctx, BookingRequest req) throws TerminalExceptio
throw e;
}
}

public static void main(String[] args) {
RestateHttpEndpointBuilder.builder()
.bind(new BookingWorkflow())
.bind(new CarRentals())
.bind(new Flights())
.buildAndListen();
}
}

This file was deleted.

467 changes: 467 additions & 0 deletions java/patterns-use-cases/patterns-use-cases/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.16.1")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1")

// Stripe
implementation("com.stripe:stripe-java:25.7.0")
implementation("com.google.code.gson:gson:2.10.1")

// Logging (optional)
implementation("org.apache.logging.log4j:log4j-core:2.24.1")
}

// Set main class
application {
mainClass.set(project.property("mainClass") as String)
mainClass.set("my.example.FanOutWorker")
}

tasks.withType<JavaCompile> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package my.example.dataupload;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.SharedWorkflowContext;
import dev.restate.sdk.WorkflowContext;
import dev.restate.sdk.annotation.Shared;
import dev.restate.sdk.annotation.Workflow;
import dev.restate.sdk.common.DurablePromiseKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import my.example.dataupload.utils.EmailClient;

import static my.example.dataupload.utils.DataOperations.createS3Bucket;
import static my.example.dataupload.utils.DataOperations.uploadData;

@Workflow
public class DataUploadService {

private static final DurablePromiseKey<String> URL_PROMISE =
DurablePromiseKey.of("url", JsonSerdes.STRING);

@Workflow
public String run(WorkflowContext ctx) {
String url = ctx.run(JsonSerdes.STRING, () -> createS3Bucket());
ctx.run(() -> uploadData(url));

ctx.promiseHandle(URL_PROMISE).resolve(url);
return url;
}

@Shared
public void resultAsEmail(SharedWorkflowContext ctx, String email) {
String url = ctx.promise(URL_PROMISE).awaitable().await();
ctx.run(() -> EmailClient.send(url, email));
}

public static void main(String[] args) {
RestateHttpEndpointBuilder.builder().bind(new DataUploadService()).buildAndListen(9082);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package my.example.dataupload;

import dev.restate.sdk.client.Client;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

// The upload client calls the data upload workflow and awaits the result for 5 seconds.
// If the workflow doesn't complete within that time, it asks the
// workflow to send the upload url via email instead.
public class UploadClient {
private static final Logger logger = LogManager.getLogger(UploadClient.class);

private static final String RESTATE_URL = "http://localhost:8080";

public void uploadData(String userId, String email) {
logger.info("Uploading data for user {}", userId);

// Submit the workflow
Client restateClient = Client.connect(RESTATE_URL);
var uploadClient = DataUploadServiceClient.fromClient(restateClient, userId);
uploadClient.submit();

String url;
try {
// Wait for the workflow to complete or timeout
url = uploadClient.workflowHandle().attachAsync()
.orTimeout(5, TimeUnit.SECONDS)
.join();
} catch (Exception e) {
if (e.getCause() instanceof TimeoutException) {
logger.info("Slow upload... Mail the link later");
uploadClient.resultAsEmail(email);
return;
}
throw e;
}

// ... process directly ...
logger.info("Fast upload... URL was {}", url);
}

//--------------------------------------------------------------------------------
// This client would be used in some other part of the system.
// For the sake of this example, we are calling it here from the main method, so you can test the example.
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Specify the userId as the argument: " +
"./gradlew run -PmainClass=my.example.UploadClient --args=\"userId123\"");
System.exit(1);
}
new UploadClient().uploadData(args[0], args[0] + "@example.com");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package my.example.dataupload.utils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DataOperations {
private static final Logger logger = LogManager.getLogger(DataOperations.class);

public static String createS3Bucket(){
String bucket = String.valueOf((long) (Math.random() * 1_000_000_000));
String bucketUrl = "https://s3-eu-central-1.amazonaws.com/" + bucket + "/";
logger.info("Creating bucket with URL {}", bucketUrl);
return bucketUrl;
}

public static void uploadData(String url){
long timeRemaining = Math.random() < 0.5 ? 1500 : 10000;
logger.info("Uploading data to target {}. ETA: {} ms", url, timeRemaining);
try {
Thread.sleep(timeRemaining);
} catch (InterruptedException e) {
logger.error("Upload failed: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package my.example.dataupload.utils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EmailClient {
private static final Logger logger = LogManager.getLogger(EmailClient.class);

public static void send(String email, String url){
logger.info("Sending email to {} with url {}", email, url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package my.example.durablerpc;

import dev.restate.sdk.client.CallRequestOptions;
import dev.restate.sdk.client.Client;

public class MyClient {

public static String RESTATE_URL = "http://localhost:8080";

public boolean reserveProduct(String productId, String reservationId) {
Client restateClient = Client.connect(RESTATE_URL);

// Durable RPC call to the product service
// Restate registers the request and makes sure runs to completion exactly once
boolean reserved = ProductServiceClient.fromClient(restateClient, productId)
// Restate deduplicates requests with the same idempotency key
.reserve(CallRequestOptions.DEFAULT.withIdempotency(reservationId));

return reserved;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package my.example.durablerpc;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/*
* Have a look at the ProductServiceClient class to see how to call Restate handlers
* programmatically from another process.
*/
@VirtualObject
public class ProductService {
private static final Logger logger = LogManager.getLogger(ProductService.class);

private static final StateKey<Boolean> RESERVED = StateKey.of("reserved", JsonSerdes.BOOLEAN);


@Handler
public boolean reserve(ObjectContext ctx) {
if (ctx.get(RESERVED).orElse(false)){
logger.info("Product already reserved");
return false;
}
logger.info("Reserving product");
ctx.set(RESERVED, true);
return true;
}

public static void main(String[] args) {
RestateHttpEndpointBuilder.builder()
.bind(new ProductService())
.buildAndListen();
}
}
Loading

0 comments on commit 1bfc3a7

Please sign in to comment.