Skip to content

Commit

Permalink
fix: fix Java leases and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Aug 14, 2024
1 parent adfa922 commit 2ce1deb
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 49 deletions.
5 changes: 4 additions & 1 deletion backend/controller/leases/lease_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (

func TestLease(t *testing.T) {
in.Run(t,
in.WithLanguages("go", "java"),
in.WithJava(),
in.CopyModule("leases"),
in.Build("leases"),
// checks if leases work in a unit test environment
in.ExecModuleTest("leases"),
in.IfLanguage("go", in.ExecModuleTest("leases")),
in.Deploy("leases"),
// checks if it leases work with a real controller
func(t testing.TB, ic in.TestContext) {
Expand All @@ -34,6 +36,7 @@ func TestLease(t *testing.T) {
Verb: &schemapb.Ref{Module: "leases", Name: "acquire"},
Body: []byte("{}"),
}))
assert.NoError(t, err)
if respErr := resp.Msg.GetError(); respErr != nil {
return fmt.Errorf("received error on first call: %v", respErr)
}
Expand Down
2 changes: 2 additions & 0 deletions backend/controller/leases/testdata/java/leases/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "leases"
language = "java"
141 changes: 141 additions & 0 deletions backend/controller/leases/testdata/java/leases/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>xyz.block.ftl.examples</groupId>
<artifactId>leases</artifactId>
<version>1.0.0-SNAPSHOT</version>

<properties>
<ftl.version>1.0-SNAPSHOT</ftl.version>
<compiler-plugin.version>3.13.0</compiler-plugin.version>
<kotlin.version>2.0.0</kotlin.version>
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.12.3</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.2.5</surefire-plugin.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>xyz.block</groupId>
<artifactId>ftl-java-runtime</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>kotlin-extensions</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>native-image-agent</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<systemPropertyVariables>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<skipITs>false</skipITs>
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package xyz.block.ftl.java.test.leases;

import io.quarkus.logging.Log;
import xyz.block.ftl.Export;
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.Verb;

import java.time.Duration;

public class TestLeases {

@Export
@Verb
public void acquire(LeaseClient leaseClient) throws Exception {
Log.info("Acquiring lease");
try (var lease = leaseClient.acquireLease(Duration.ofSeconds(10), "lease")) {
Log.info("Acquired lease");
Thread.sleep(5000);
}
}

}
13 changes: 13 additions & 0 deletions integration/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/url"
"os"
"path/filepath"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -514,6 +515,18 @@ func HttpCall(method string, path string, headers map[string][]string, body []by
}
}

func IfLanguage(language string, action Action) Action {
return IfLanguages(action, language)
}

func IfLanguages(action Action, languages ...string) Action {
return func(t testing.TB, ic TestContext) {
if slices.Contains(languages, ic.language) {
action(t, ic)
}
}
}

// Run "go test" in the given module.
func ExecModuleTest(module string) Action {
return Chdir(module, Exec("go", "test", "./..."))
Expand Down
5 changes: 5 additions & 0 deletions integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
})

for _, language := range opts.languages {
ctx, done := context.WithCancel(ctx)
t.Run(language, func(t *testing.T) {
verbs := rpc.Dial(ftlv1connect.NewVerbServiceClient, "http://localhost:8892", log.Debug)

Expand All @@ -204,6 +205,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
binDir: binDir,
Verbs: verbs,
realT: t,
language: language,
}

if opts.startController {
Expand All @@ -223,6 +225,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
ic.AssertWithRetry(t, action)
}
})
done()
}
}

Expand All @@ -236,6 +239,8 @@ type TestContext struct {
testData string
// Path to the "bin" directory.
binDir string
// The Language under test
language string

Controller ftlv1connect.ControllerServiceClient
Console pbconsoleconnect.ConsoleServiceClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,13 @@
*/
public interface LeaseClient {

void acquireLease(Duration duration, String... keys) throws LeaseFailedException;
/**
* Acquire a lease for the given keys. The lease will be held for the given duration.
*
* @param duration The time to acquire the lease for
* @param keys The lease keys
* @return A handle that can be used to release the lease
* @throws LeaseFailedException
*/
LeaseHandle acquireLease(Duration duration, String... keys) throws LeaseFailedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package xyz.block.ftl;

public interface LeaseHandle extends AutoCloseable {

public void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;

import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
Expand All @@ -21,6 +19,7 @@
import io.quarkus.runtime.Startup;
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.LeaseFailedException;
import xyz.block.ftl.LeaseHandle;
import xyz.block.ftl.v1.AcquireLeaseRequest;
import xyz.block.ftl.v1.AcquireLeaseResponse;
import xyz.block.ftl.v1.CallRequest;
Expand All @@ -37,8 +36,6 @@
public class FTLController implements LeaseClient {
private static final Logger log = Logger.getLogger(FTLController.class);
final String moduleName;
private StreamObserver<AcquireLeaseRequest> leaseClient;
private final Deque<CompletableFuture<?>> leaseWaiters = new LinkedBlockingDeque<>();

private Throwable currentError;
private volatile ModuleContextResponse moduleContextResponse;
Expand Down Expand Up @@ -96,33 +93,6 @@ public FTLController(@ConfigProperty(name = "ftl.endpoint", defaultValue = "http
var channel = channelBuilder.build();
verbService = VerbServiceGrpc.newStub(channel);
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
synchronized (this) {
this.leaseClient = verbService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
@Override
public void onNext(AcquireLeaseResponse value) {
leaseWaiters.pop().complete(null);
}

@Override
public void onError(Throwable t) {
synchronized (FTLController.this) {
while (!leaseWaiters.isEmpty()) {
leaseWaiters.pop().completeExceptionally(t);
}
if (!closed) {
leaseClient = verbService.acquireLease(this);
}
}
}

@Override
public void onCompleted() {
//if we have any waiters error them out
//if we have not shut down we can try and connect again
onError(new RuntimeException("stream closed"));
}
});
}
}

public byte[] getSecret(String secretName) {
Expand Down Expand Up @@ -201,21 +171,42 @@ public void onCompleted() {
}
}

public void acquireLease(Duration duration, String... keys) throws LeaseFailedException {
public LeaseHandle acquireLease(Duration duration, String... keys) throws LeaseFailedException {
CompletableFuture<?> cf = new CompletableFuture<>();
synchronized (this) {
leaseWaiters.push(cf);
leaseClient.onNext(AcquireLeaseRequest.newBuilder().setModule(moduleName)
.addAllKey(Arrays.asList(keys))
.setTtl(com.google.protobuf.Duration.newBuilder()
.setSeconds(duration.toSeconds()))
.build());
}
var client = verbService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
@Override
public void onNext(AcquireLeaseResponse value) {
cf.complete(null);
}

@Override
public void onError(Throwable t) {
cf.completeExceptionally(t);
}

@Override
public void onCompleted() {
if (!cf.isDone()) {
onError(new RuntimeException("stream closed"));
}
}
});
client.onNext(AcquireLeaseRequest.newBuilder().setModule(moduleName)
.addAllKey(Arrays.asList(keys))
.setTtl(com.google.protobuf.Duration.newBuilder()
.setSeconds(duration.toSeconds()))
.build());
try {
cf.get();
} catch (Exception e) {
throw new LeaseFailedException(e);
throw new LeaseFailedException("lease already held", e);
}
return new LeaseHandle() {
@Override
public void close() {
client.onCompleted();
}
};
}

private ModuleContextResponse getModuleContext() {
Expand Down
Loading

0 comments on commit 2ce1deb

Please sign in to comment.