Skip to content

Commit

Permalink
feat: java lease support (#2333)
Browse files Browse the repository at this point in the history
Tests will come later as part of a larger testing effort
  • Loading branch information
stuartwdouglas authored Aug 13, 2024
1 parent 7b6a421 commit c5b3687
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import xyz.block.ftl.Cron;
import xyz.block.ftl.Export;
import xyz.block.ftl.GeneratedRef;
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.Retry;
import xyz.block.ftl.Secret;
import xyz.block.ftl.Subscription;
Expand Down Expand Up @@ -120,6 +121,7 @@ class FtlProcessor {
public static final DotName CONFIG = DotName.createSimple(Config.class);
public static final DotName OFFSET_DATE_TIME = DotName.createSimple(OffsetDateTime.class.getName());
public static final DotName GENERATED_REF = DotName.createSimple(GeneratedRef.class);
public static final DotName LEASE_CLIENT = DotName.createSimple(LeaseClient.class);

@BuildStep
ModuleNameBuildItem moduleName(ApplicationInfoBuildItem applicationInfoBuildItem) {
Expand Down Expand Up @@ -187,12 +189,12 @@ public ParameterExtractor handleCustomParameter(org.jboss.jandex.Type type,
return new VerbRegistry.ConfigSupplier(name, paramType);
} else if (topics.getTopics().containsKey(type.name())) {
var topic = topics.getTopics().get(type.name());
Class<?> paramType = loadClass(type);
return recorder.topicParamExtractor(topic.generatedProducer());
} else if (verbClients.getVerbClients().containsKey(type.name())) {
var client = verbClients.getVerbClients().get(type.name());
Class<?> paramType = loadClass(type);
return recorder.verbParamExtractor(client.generatedClient());
} else if (LEASE_CLIENT.equals(type.name())) {
return recorder.leaseClientExtractor();
}
return null;
} catch (ClassNotFoundException e) {
Expand Down Expand Up @@ -451,6 +453,9 @@ private void handleVerbMethod(ExtractionContext context, MethodInfo method, Stri
parameterTypes.add(paramType);
paramMappers.add(context.recorder().verbClientSupplier(client.generatedClient()));
callsMetadata.addCalls(Ref.newBuilder().setName(client.name()).setModule(client.module()).build());
} else if (LEASE_CLIENT.equals(param.type().name())) {
parameterTypes.add(LeaseClient.class);
paramMappers.add(context.recorder().leaseClientSupplier());
} else if (bodyType != BodyType.DISALLOWED && bodyParamType == null) {
bodyParamType = param.type();
Class<?> paramType = loadClass(param.type());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package xyz.block.ftl;

import java.time.Duration;

/**
* Client that can be used to acquire a FTL lease. If the lease cannot be acquired a {@link LeaseFailedException} is thrown.
*/
public interface LeaseClient {

void acquireLease(Duration duration, String... keys) throws LeaseFailedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package xyz.block.ftl;

/**
* Checked exception that is thrown when a lease cannot be acquired
*/
public class LeaseFailedException extends Exception {

public LeaseFailedException() {
}

public LeaseFailedException(String message) {
super(message);
}

public LeaseFailedException(String message, Throwable cause) {
super(message, cause);
}

public LeaseFailedException(Throwable cause) {
super(cause);
}

public LeaseFailedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package xyz.block.ftl.runtime;

import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;

import jakarta.inject.Singleton;

Expand All @@ -14,6 +18,10 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.quarkus.runtime.Startup;
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.LeaseFailedException;
import xyz.block.ftl.v1.AcquireLeaseRequest;
import xyz.block.ftl.v1.AcquireLeaseResponse;
import xyz.block.ftl.v1.CallRequest;
import xyz.block.ftl.v1.CallResponse;
import xyz.block.ftl.v1.ModuleContextRequest;
Expand All @@ -25,9 +33,11 @@

@Singleton
@Startup
public class FTLController {
public class FTLController implements LeaseClient {
private static final Logger log = Logger.getLogger(FTLController.class);
final String moduleName;
private StreamObserver<AcquireLeaseRequest> leaseClient;
private final Deque<CompletableFuture<?>> leaseWaiters = new LinkedBlockingDeque<>();

private Throwable currentError;
private volatile ModuleContextResponse moduleContextResponse;
Expand Down Expand Up @@ -76,7 +86,29 @@ public FTLController(@ConfigProperty(name = "ftl.endpoint", defaultValue = "http
var channel = channelBuilder.build();
verbService = VerbServiceGrpc.newStub(channel);
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
synchronized (this) {
this.leaseClient = verbService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
@Override
public void onNext(AcquireLeaseResponse value) {
leaseWaiters.pop().complete(null);
}

@Override
public void onError(Throwable t) {
leaseWaiters.pop().completeExceptionally(t);
}

@Override
public void onCompleted() {
synchronized (FTLController.this) {
while (!leaseWaiters.isEmpty()) {
leaseWaiters.pop().completeExceptionally(new RuntimeException("connection closed"));
}
leaseClient = verbService.acquireLease(this);
}
}
});
}
}

public byte[] getSecret(String secretName) {
Expand All @@ -97,6 +129,7 @@ public byte[] getConfig(String secretName) {

public byte[] callVerb(String name, String module, byte[] payload) {
CompletableFuture<byte[]> cf = new CompletableFuture<>();

verbService.call(CallRequest.newBuilder().setVerb(Ref.newBuilder().setModule(module).setName(name))
.setBody(ByteString.copyFrom(payload)).build(), new StreamObserver<>() {

Expand Down Expand Up @@ -154,6 +187,23 @@ public void onCompleted() {
}
}

public void acquireLease(Duration duration, String... keys) throws LeaseFailedException {
CompletableFuture<?> cf = new CompletableFuture<>();
synchronized (this) {
leaseWaiters.push(cf);
leaseClient.onNext(AcquireLeaseRequest.newBuilder().setModule(moduleName)
.addAllKey(Arrays.asList(keys))
.setTtl(com.google.protobuf.Duration.newBuilder()
.setSeconds(duration.toSeconds()))
.build());
}
try {
cf.get();
} catch (Exception e) {
throw new LeaseFailedException(e);
}
}

private ModuleContextResponse getModuleContext() {
var moduleContext = moduleContextResponse;
if (moduleContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import io.quarkus.arc.Arc;
import io.quarkus.runtime.annotations.Recorder;
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.v1.CallRequest;

@Recorder
Expand Down Expand Up @@ -71,6 +72,20 @@ public Object apply(ObjectMapper mapper, CallRequest callRequest) {
}
}

public BiFunction<ObjectMapper, CallRequest, Object> leaseClientSupplier() {
return new BiFunction<ObjectMapper, CallRequest, Object>() {
volatile LeaseClient leaseClient;

@Override
public Object apply(ObjectMapper mapper, CallRequest callRequest) {
if (leaseClient == null) {
leaseClient = Arc.container().instance(LeaseClient.class).get();
}
return leaseClient;
}
};
}

public ParameterExtractor topicParamExtractor(String className) {

try {
Expand Down Expand Up @@ -107,4 +122,23 @@ public Object extractParameter(ResteasyReactiveRequestContext context) {
throw new RuntimeException(e);
}
}

public ParameterExtractor leaseClientExtractor() {
try {
return new ParameterExtractor() {

volatile LeaseClient leaseClient;

@Override
public Object extractParameter(ResteasyReactiveRequestContext context) {
if (leaseClient == null) {
leaseClient = Arc.container().instance(LeaseClient.class).get();
}
return leaseClient;
}
};
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit c5b3687

Please sign in to comment.