From c5b3687c6827efd718f9e1485cb73436b4e0c64d Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 13 Aug 2024 12:51:44 +1000 Subject: [PATCH] feat: java lease support (#2333) Tests will come later as part of a larger testing effort --- .../block/ftl/deployment/FtlProcessor.java | 9 +++- .../main/java/xyz/block/ftl/LeaseClient.java | 11 ++++ .../xyz/block/ftl/LeaseFailedException.java | 26 ++++++++++ .../xyz/block/ftl/runtime/FTLController.java | 52 ++++++++++++++++++- .../xyz/block/ftl/runtime/FTLRecorder.java | 34 ++++++++++++ 5 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java create mode 100644 java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseFailedException.java diff --git a/java-runtime/ftl-runtime/deployment/src/main/java/xyz/block/ftl/deployment/FtlProcessor.java b/java-runtime/ftl-runtime/deployment/src/main/java/xyz/block/ftl/deployment/FtlProcessor.java index 529d4ed36..a0737d5e8 100644 --- a/java-runtime/ftl-runtime/deployment/src/main/java/xyz/block/ftl/deployment/FtlProcessor.java +++ b/java-runtime/ftl-runtime/deployment/src/main/java/xyz/block/ftl/deployment/FtlProcessor.java @@ -66,6 +66,7 @@ import xyz.block.ftl.Cron; import xyz.block.ftl.Export; import xyz.block.ftl.GeneratedRef; +import xyz.block.ftl.LeaseClient; import xyz.block.ftl.Retry; import xyz.block.ftl.Secret; import xyz.block.ftl.Subscription; @@ -120,6 +121,7 @@ class FtlProcessor { public static final DotName CONFIG = DotName.createSimple(Config.class); public static final DotName OFFSET_DATE_TIME = DotName.createSimple(OffsetDateTime.class.getName()); public static final DotName GENERATED_REF = DotName.createSimple(GeneratedRef.class); + public static final DotName LEASE_CLIENT = DotName.createSimple(LeaseClient.class); @BuildStep ModuleNameBuildItem moduleName(ApplicationInfoBuildItem applicationInfoBuildItem) { @@ -187,12 +189,12 @@ public ParameterExtractor handleCustomParameter(org.jboss.jandex.Type type, return new VerbRegistry.ConfigSupplier(name, paramType); } else if (topics.getTopics().containsKey(type.name())) { var topic = topics.getTopics().get(type.name()); - Class paramType = loadClass(type); return recorder.topicParamExtractor(topic.generatedProducer()); } else if (verbClients.getVerbClients().containsKey(type.name())) { var client = verbClients.getVerbClients().get(type.name()); - Class paramType = loadClass(type); return recorder.verbParamExtractor(client.generatedClient()); + } else if (LEASE_CLIENT.equals(type.name())) { + return recorder.leaseClientExtractor(); } return null; } catch (ClassNotFoundException e) { @@ -451,6 +453,9 @@ private void handleVerbMethod(ExtractionContext context, MethodInfo method, Stri parameterTypes.add(paramType); paramMappers.add(context.recorder().verbClientSupplier(client.generatedClient())); callsMetadata.addCalls(Ref.newBuilder().setName(client.name()).setModule(client.module()).build()); + } else if (LEASE_CLIENT.equals(param.type().name())) { + parameterTypes.add(LeaseClient.class); + paramMappers.add(context.recorder().leaseClientSupplier()); } else if (bodyType != BodyType.DISALLOWED && bodyParamType == null) { bodyParamType = param.type(); Class paramType = loadClass(param.type()); diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java new file mode 100644 index 000000000..4f5cc5124 --- /dev/null +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java @@ -0,0 +1,11 @@ +package xyz.block.ftl; + +import java.time.Duration; + +/** + * Client that can be used to acquire a FTL lease. If the lease cannot be acquired a {@link LeaseFailedException} is thrown. + */ +public interface LeaseClient { + + void acquireLease(Duration duration, String... keys) throws LeaseFailedException; +} diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseFailedException.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseFailedException.java new file mode 100644 index 000000000..0a9bb45ef --- /dev/null +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseFailedException.java @@ -0,0 +1,26 @@ +package xyz.block.ftl; + +/** + * Checked exception that is thrown when a lease cannot be acquired + */ +public class LeaseFailedException extends Exception { + + public LeaseFailedException() { + } + + public LeaseFailedException(String message) { + super(message); + } + + public LeaseFailedException(String message, Throwable cause) { + super(message, cause); + } + + public LeaseFailedException(Throwable cause) { + super(cause); + } + + public LeaseFailedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java index 28b65e3cd..25eb62fb6 100644 --- a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java @@ -1,8 +1,12 @@ package xyz.block.ftl.runtime; import java.net.URI; +import java.time.Duration; +import java.util.Arrays; +import java.util.Deque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; import jakarta.inject.Singleton; @@ -14,6 +18,10 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import io.quarkus.runtime.Startup; +import xyz.block.ftl.LeaseClient; +import xyz.block.ftl.LeaseFailedException; +import xyz.block.ftl.v1.AcquireLeaseRequest; +import xyz.block.ftl.v1.AcquireLeaseResponse; import xyz.block.ftl.v1.CallRequest; import xyz.block.ftl.v1.CallResponse; import xyz.block.ftl.v1.ModuleContextRequest; @@ -25,9 +33,11 @@ @Singleton @Startup -public class FTLController { +public class FTLController implements LeaseClient { private static final Logger log = Logger.getLogger(FTLController.class); final String moduleName; + private StreamObserver leaseClient; + private final Deque> leaseWaiters = new LinkedBlockingDeque<>(); private Throwable currentError; private volatile ModuleContextResponse moduleContextResponse; @@ -76,7 +86,29 @@ public FTLController(@ConfigProperty(name = "ftl.endpoint", defaultValue = "http var channel = channelBuilder.build(); verbService = VerbServiceGrpc.newStub(channel); verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver); + synchronized (this) { + this.leaseClient = verbService.acquireLease(new StreamObserver() { + @Override + public void onNext(AcquireLeaseResponse value) { + leaseWaiters.pop().complete(null); + } + + @Override + public void onError(Throwable t) { + leaseWaiters.pop().completeExceptionally(t); + } + @Override + public void onCompleted() { + synchronized (FTLController.this) { + while (!leaseWaiters.isEmpty()) { + leaseWaiters.pop().completeExceptionally(new RuntimeException("connection closed")); + } + leaseClient = verbService.acquireLease(this); + } + } + }); + } } public byte[] getSecret(String secretName) { @@ -97,6 +129,7 @@ public byte[] getConfig(String secretName) { public byte[] callVerb(String name, String module, byte[] payload) { CompletableFuture cf = new CompletableFuture<>(); + verbService.call(CallRequest.newBuilder().setVerb(Ref.newBuilder().setModule(module).setName(name)) .setBody(ByteString.copyFrom(payload)).build(), new StreamObserver<>() { @@ -154,6 +187,23 @@ public void onCompleted() { } } + public void acquireLease(Duration duration, String... keys) throws LeaseFailedException { + CompletableFuture cf = new CompletableFuture<>(); + synchronized (this) { + leaseWaiters.push(cf); + leaseClient.onNext(AcquireLeaseRequest.newBuilder().setModule(moduleName) + .addAllKey(Arrays.asList(keys)) + .setTtl(com.google.protobuf.Duration.newBuilder() + .setSeconds(duration.toSeconds())) + .build()); + } + try { + cf.get(); + } catch (Exception e) { + throw new LeaseFailedException(e); + } + } + private ModuleContextResponse getModuleContext() { var moduleContext = moduleContextResponse; if (moduleContext != null) { diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java index 6b839518c..ed9b901d5 100644 --- a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java @@ -12,6 +12,7 @@ import io.quarkus.arc.Arc; import io.quarkus.runtime.annotations.Recorder; +import xyz.block.ftl.LeaseClient; import xyz.block.ftl.v1.CallRequest; @Recorder @@ -71,6 +72,20 @@ public Object apply(ObjectMapper mapper, CallRequest callRequest) { } } + public BiFunction leaseClientSupplier() { + return new BiFunction() { + volatile LeaseClient leaseClient; + + @Override + public Object apply(ObjectMapper mapper, CallRequest callRequest) { + if (leaseClient == null) { + leaseClient = Arc.container().instance(LeaseClient.class).get(); + } + return leaseClient; + } + }; + } + public ParameterExtractor topicParamExtractor(String className) { try { @@ -107,4 +122,23 @@ public Object extractParameter(ResteasyReactiveRequestContext context) { throw new RuntimeException(e); } } + + public ParameterExtractor leaseClientExtractor() { + try { + return new ParameterExtractor() { + + volatile LeaseClient leaseClient; + + @Override + public Object extractParameter(ResteasyReactiveRequestContext context) { + if (leaseClient == null) { + leaseClient = Arc.container().instance(LeaseClient.class).get(); + } + return leaseClient; + } + }; + } catch (Exception e) { + throw new RuntimeException(e); + } + } }