From 1da95a34c07826eb5a6be0f6940cd2a7a220812a Mon Sep 17 00:00:00 2001 From: Martin Nyholm Jelle Date: Sat, 2 Nov 2024 13:52:04 +0000 Subject: [PATCH 1/5] initial jetstream support --- pom.xml | 1 + .../shedlock-provider-jetstream/pom.xml | 78 +++++++++++++ .../jetstream/NatsJetStreamLockProvider.java | 105 ++++++++++++++++++ .../jetstream/NatsJetStreamContainer.java | 23 ++++ ...sJetStreamLockProviderIntegrationTest.java | 86 ++++++++++++++ .../src/test/resources/logback.xml | 29 +++++ 6 files changed, 322 insertions(+) create mode 100644 providers/jetstream/shedlock-provider-jetstream/pom.xml create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml diff --git a/pom.xml b/pom.xml index 88cbb54ea..bad43234d 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ providers/memcached/shedlock-provider-memcached-spy providers/datastore/shedlock-provider-datastore providers/spanner/shedlock-provider-spanner + providers/jetstream/shedlock-provider-jetstream providers/neo4j/shedlock-provider-neo4j diff --git a/providers/jetstream/shedlock-provider-jetstream/pom.xml b/providers/jetstream/shedlock-provider-jetstream/pom.xml new file mode 100644 index 000000000..a782906dd --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/pom.xml @@ -0,0 +1,78 @@ + + + + shedlock-parent + net.javacrumbs.shedlock + 5.16.1-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + shedlock-provider-jetstream + 5.16.1-SNAPSHOT + + + 2.20.2 + + + + + net.javacrumbs.shedlock + shedlock-core + ${project.version} + + + + net.javacrumbs.shedlock + shedlock-test-support + ${project.version} + test + + + + io.nats + jnats + ${nats.version} + + + + org.testcontainers + testcontainers + ${test-containers.ver} + test + + + + org.testcontainers + junit-jupiter + ${test-containers.ver} + test + + + + ch.qos.logback + logback-classic + ${logback.ver} + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + + net.javacrumbs.shedlock.provider.nats.jetstream + + + + + + + + + diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java new file mode 100644 index 000000000..0afbb3a9d --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java @@ -0,0 +1,105 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import static net.javacrumbs.shedlock.support.Utils.getHostname; +import static net.javacrumbs.shedlock.support.Utils.toIsoString; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +import io.nats.client.Connection; +import io.nats.client.api.KeyValueConfiguration; +import net.javacrumbs.shedlock.core.AbstractSimpleLock; +import net.javacrumbs.shedlock.core.ClockProvider; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.core.SimpleLock; +import net.javacrumbs.shedlock.support.LockException; +import net.javacrumbs.shedlock.support.annotation.NonNull; + +/** + * Lock Provider for NATS JetStream + * + * @see JetStream + */ +public class NatsJetStreamLockProvider implements LockProvider { + + /** KEY PREFIX */ + private static final String KEY_PREFIX = "shedlock"; + + private final Connection connection; + + /** + * Create NatsJetStreamLockProvider + * + * @param connection + * io.nats.client.Connection + */ + public NatsJetStreamLockProvider(@NonNull Connection connection) { + this.connection = connection; + } + + @Override + @NonNull + public Optional lock(@NonNull LockConfiguration lockConfiguration) { + var expireTime = getSecondUntil(lockConfiguration.getLockAtMostUntil()); + var key = buildKey(lockConfiguration.getName()); + + try { + var kvm = connection.keyValueManagement(); + kvm.create(KeyValueConfiguration.builder().name("shedlock").build()); + + var kv = connection.keyValue("shedlock"); + kv.put(key, key.getBytes()); + } catch(Exception e) { + return Optional.empty(); + } + + return Optional.of(new NatsJetStreamLock(key, connection, lockConfiguration)); + } + + private static long getSecondUntil(Instant instant) { + var millis = Duration.between(ClockProvider.now(), instant).toMillis(); + return millis / 1000; + } + + static String buildKey(String lockName) { + return String.format("%s_%s", KEY_PREFIX, lockName); + } + + private static String buildValue() { + return String.format("ADDED:%s@%s", toIsoString(ClockProvider.now()), getHostname()); + } + + private static final class NatsJetStreamLock extends AbstractSimpleLock { + + private final String key; + + private final Connection connection; + + private NatsJetStreamLock( + @NonNull String key, @NonNull Connection connection, @NonNull LockConfiguration lockConfiguration) { + super(lockConfiguration); + this.key = key; + this.connection = connection; + } + + @Override + protected void doUnlock() { + var keepLockFor = getSecondUntil(lockConfiguration.getLockAtLeastUntil()); + if (keepLockFor <= 0) { + try { + connection.keyValue("shedlock").delete(key); + } catch(Exception e) { + throw new LockException("Can not remove node. " + e.getMessage()); + } + } else { + try { + connection.keyValue("shedlock").update(key, KEY_PREFIX, 1); + } catch(Exception e) { + throw new LockException("Can not replace node. " + e.getMessage()); + } + } + } + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java new file mode 100644 index 000000000..0857e0ba6 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java @@ -0,0 +1,23 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class NatsJetStreamContainer extends GenericContainer { + + private static final Logger LOGGER = LoggerFactory.getLogger(NatsJetStreamContainer.class); + + public static final DockerImageName NATS_IMAGE = DockerImageName.parse("nats:2.10-alpine"); + + private static final Integer NATS_PORT = 4222; + private static final Integer NATS_HTTP_PORT = 8222; + + public NatsJetStreamContainer() { + super(NATS_IMAGE.asCanonicalNameString()); + this.withExposedPorts(NATS_PORT, NATS_HTTP_PORT) + .withLogConsumer(frame -> LOGGER.info(frame.getUtf8String())) + .withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString()); + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java new file mode 100644 index 000000000..c92e72886 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java @@ -0,0 +1,86 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import static java.lang.Thread.sleep; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.time.Duration; +import java.util.Optional; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.core.SimpleLock; +import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import io.nats.client.Connection; +import io.nats.client.Nats; + +@Testcontainers +public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest { + + @Container + public static final NatsJetStreamContainer container = new NatsJetStreamContainer(); + + static final String ENV = "test"; + + private LockProvider lockProvider; + + private Connection nc; + + @BeforeEach + public void createLockProvider() throws Exception { + var natsUrl = String.format("nats://%s:%d", container.getHost(), container.getFirstMappedPort()); + nc = Nats.connect(natsUrl); + + lockProvider = new NatsJetStreamLockProvider(nc); + } + + @Override + protected void assertUnlocked(String lockName) { + assertThat(getLock(lockName)).isNull(); + } + + @Override + protected void assertLocked(String lockName) { + assertThat(getLock(lockName)).isNotNull(); + } + + @Override + @Test + public void shouldTimeout() throws InterruptedException { + this.doTestTimeout(Duration.ofSeconds(1)); + } + + @Override + protected void doTestTimeout(Duration lockAtMostFor) throws InterruptedException { + LockConfiguration configWithShortTimeout = lockConfig(LOCK_NAME1, lockAtMostFor, Duration.ZERO); + Optional lock1 = getLockProvider().lock(configWithShortTimeout); + assertThat(lock1).isNotEmpty(); + + sleep(lockAtMostFor.toMillis() * 2); + assertUnlocked(LOCK_NAME1); + + Optional lock2 = + getLockProvider().lock(lockConfig(LOCK_NAME1, Duration.ofSeconds(1), Duration.ZERO)); + assertThat(lock2).isNotEmpty(); + lock2.get().unlock(); + } + + @Override + protected LockProvider getLockProvider() { + return lockProvider; + } + + private String getLock(String lockName) { + try { + return (String) nc.keyValue(NatsJetStreamLockProvider.buildKey(lockName)).getBucketName(); + } catch (IOException e) { + fail(e); + } + return null; + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml b/providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml new file mode 100644 index 000000000..8ee4ac478 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml @@ -0,0 +1,29 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + From 6056c5b64811e5359e2467cd77e6c770d318104b Mon Sep 17 00:00:00 2001 From: Martin Nyholm Jelle Date: Mon, 18 Nov 2024 12:28:43 +0000 Subject: [PATCH 2/5] Tests runs now --- .../provider/nats/jetstream/LockContent.java | 9 ++ .../nats/jetstream/LockContentHandler.java | 18 +++ .../nats/jetstream/NatsJetStreamLock.java | 47 +++++++ .../jetstream/NatsJetStreamLockProvider.java | 91 +++++--------- .../jetstream/NatsJetStreamContainer.java | 10 +- ...sJetStreamLockProviderIntegrationTest.java | 117 ++++++++++++++---- 6 files changed, 204 insertions(+), 88 deletions(-) create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java new file mode 100644 index 000000000..de12ca006 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java @@ -0,0 +1,9 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import java.time.Instant; + +import net.javacrumbs.shedlock.support.annotation.NonNull; + +public record LockContent(@NonNull Instant lockAtLeastUntil, @NonNull Instant lockAtMostUntil) { + +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java new file mode 100644 index 000000000..3d3c7defb --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java @@ -0,0 +1,18 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.javacrumbs.shedlock.support.annotation.NonNull; + +public class LockContentHandler { + + private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class); + + private LockContentHandler() {} + + public static byte[] writeContent(@NonNull LockContent lockContent) { + log.debug("write lock: {}", lockContent); + return lockContent.lockAtLeastUntil().toString().getBytes(); + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java new file mode 100644 index 000000000..ad2f6dc67 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java @@ -0,0 +1,47 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import java.time.Duration; +import java.time.Instant; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.nats.client.Connection; +import net.javacrumbs.shedlock.core.AbstractSimpleLock; +import net.javacrumbs.shedlock.core.ClockProvider; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.support.LockException; +import net.javacrumbs.shedlock.support.annotation.NonNull; + +public final class NatsJetStreamLock extends AbstractSimpleLock { + + private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class); + + private final Connection connection; + + protected NatsJetStreamLock( + @NonNull Connection connection, @NonNull LockConfiguration lockConfiguration) { + super(lockConfiguration); + this.connection = connection; + } + + @Override + protected void doUnlock() { + var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); + log.debug("Unlocking for bucketName: {}", bucketName); + var keepLockFor = getSecondUntil(lockConfiguration.getLockAtLeastUntil()); + if (keepLockFor <= 0) { + try { + log.debug("Calling delete on key"); + + connection.keyValue(bucketName).delete("LOCKED"); + } catch (Exception e) { + throw new LockException("Can not remove node. " + e.getMessage()); + } + } + } + + private static long getSecondUntil(Instant instant) { + return Duration.between(ClockProvider.now(), instant).toMillis(); + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java index 0afbb3a9d..e01816deb 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java @@ -1,20 +1,18 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; -import static net.javacrumbs.shedlock.support.Utils.getHostname; -import static net.javacrumbs.shedlock.support.Utils.toIsoString; - +import java.io.IOException; import java.time.Duration; -import java.time.Instant; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; import io.nats.client.api.KeyValueConfiguration; -import net.javacrumbs.shedlock.core.AbstractSimpleLock; -import net.javacrumbs.shedlock.core.ClockProvider; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.core.SimpleLock; -import net.javacrumbs.shedlock.support.LockException; import net.javacrumbs.shedlock.support.annotation.NonNull; /** @@ -24,8 +22,7 @@ */ public class NatsJetStreamLockProvider implements LockProvider { - /** KEY PREFIX */ - private static final String KEY_PREFIX = "shedlock"; + private final Logger log = LoggerFactory.getLogger(NatsJetStreamLockProvider.class); private final Connection connection; @@ -33,7 +30,7 @@ public class NatsJetStreamLockProvider implements LockProvider { * Create NatsJetStreamLockProvider * * @param connection - * io.nats.client.Connection + * io.nats.client.Connection */ public NatsJetStreamLockProvider(@NonNull Connection connection) { this.connection = connection; @@ -42,64 +39,34 @@ public NatsJetStreamLockProvider(@NonNull Connection connection) { @Override @NonNull public Optional lock(@NonNull LockConfiguration lockConfiguration) { - var expireTime = getSecondUntil(lockConfiguration.getLockAtMostUntil()); - var key = buildKey(lockConfiguration.getName()); - + var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); + log.debug("Attempting lock for bucketName: {}", bucketName); try { - var kvm = connection.keyValueManagement(); - kvm.create(KeyValueConfiguration.builder().name("shedlock").build()); - - var kv = connection.keyValue("shedlock"); - kv.put(key, key.getBytes()); - } catch(Exception e) { - return Optional.empty(); - } - - return Optional.of(new NatsJetStreamLock(key, connection, lockConfiguration)); - } - - private static long getSecondUntil(Instant instant) { - var millis = Duration.between(ClockProvider.now(), instant).toMillis(); - return millis / 1000; - } - - static String buildKey(String lockName) { - return String.format("%s_%s", KEY_PREFIX, lockName); - } - - private static String buildValue() { - return String.format("ADDED:%s@%s", toIsoString(ClockProvider.now()), getHostname()); - } - - private static final class NatsJetStreamLock extends AbstractSimpleLock { + var lockTime = lockConfiguration.getLockAtMostFor(); - private final String key; + // nats cannot accept below 100ms + if(lockTime.toMillis() < 100) { + log.debug("NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!", lockTime.toMillis()); + lockTime = Duration.ofMillis(100L); + } - private final Connection connection; + connection.keyValueManagement().create( + KeyValueConfiguration.builder().name(bucketName).ttl(lockTime).build()); + connection.keyValue(bucketName).create("LOCKED", LockContentHandler.writeContent( + new LockContent(lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil()))); - private NatsJetStreamLock( - @NonNull String key, @NonNull Connection connection, @NonNull LockConfiguration lockConfiguration) { - super(lockConfiguration); - this.key = key; - this.connection = connection; - } + log.debug("Accuired lock for bucketName: {}", bucketName); - @Override - protected void doUnlock() { - var keepLockFor = getSecondUntil(lockConfiguration.getLockAtLeastUntil()); - if (keepLockFor <= 0) { - try { - connection.keyValue("shedlock").delete(key); - } catch(Exception e) { - throw new LockException("Can not remove node. " + e.getMessage()); - } - } else { - try { - connection.keyValue("shedlock").update(key, KEY_PREFIX, 1); - } catch(Exception e) { - throw new LockException("Can not replace node. " + e.getMessage()); - } + return Optional.of(new NatsJetStreamLock(connection, lockConfiguration)); + } catch (JetStreamApiException e) { + if (e.getApiErrorCode() == 10058 || e.getApiErrorCode() == 10071) { + log.debug("Rejected lock for bucketName: {}, message: {}", bucketName, e.getMessage()); + return Optional.empty(); } + log.warn("Rejected lock for bucketName: {}", bucketName); + throw new IllegalStateException(e); + } catch (IOException e) { + throw new IllegalStateException(e); } } } diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java index 0857e0ba6..93c6e152c 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java @@ -1,8 +1,11 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; +import java.time.Duration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.utility.DockerImageName; public class NatsJetStreamContainer extends GenericContainer { @@ -17,7 +20,10 @@ public class NatsJetStreamContainer extends GenericContainer LOGGER.info(frame.getUtf8String())) - .withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString()); + .withNetworkAliases("nats") + .withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", ""))) + .withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString()) + .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*")) + .withStartupTimeout(Duration.ofSeconds(180L)); } } diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java index c92e72886..41af6124b 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java @@ -1,23 +1,29 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; -import static java.lang.Thread.sleep; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; -import java.io.IOException; import java.time.Duration; -import java.util.Optional; -import net.javacrumbs.shedlock.core.LockConfiguration; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; + +import static java.lang.Thread.sleep; + import net.javacrumbs.shedlock.core.LockProvider; -import net.javacrumbs.shedlock.core.SimpleLock; import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import io.nats.client.Connection; +import io.nats.client.ConnectionListener; +import io.nats.client.JetStreamApiException; import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.api.KeyValueEntry; @Testcontainers public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest { @@ -25,18 +31,31 @@ public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProvid @Container public static final NatsJetStreamContainer container = new NatsJetStreamContainer(); - static final String ENV = "test"; - private LockProvider lockProvider; - - private Connection nc; + private Connection connection; @BeforeEach public void createLockProvider() throws Exception { var natsUrl = String.format("nats://%s:%d", container.getHost(), container.getFirstMappedPort()); - nc = Nats.connect(natsUrl); + connection = Nats.connect(Options.builder() + .server(natsUrl) + .connectionListener(new ConnectionListener() { + + private final Logger log = LoggerFactory.getLogger("ConnectionListener"); + + @Override + public void connectionEvent(Connection conn, Events type) { + log.debug("Received event: {}, on conn: {}", type, conn); + } + }) + .build()); + + lockProvider = new NatsJetStreamLockProvider(connection); + } - lockProvider = new NatsJetStreamLockProvider(nc); + @AfterEach + public void stopLockProvider() throws Exception { + connection.close(); } @Override @@ -52,35 +71,85 @@ protected void assertLocked(String lockName) { @Override @Test public void shouldTimeout() throws InterruptedException { - this.doTestTimeout(Duration.ofSeconds(1)); + /** jetstreams smallest allowed unit is 100 milliseconds. */ + this.doTestTimeout(Duration.ofMillis(100)); } @Override protected void doTestTimeout(Duration lockAtMostFor) throws InterruptedException { - LockConfiguration configWithShortTimeout = lockConfig(LOCK_NAME1, lockAtMostFor, Duration.ZERO); - Optional lock1 = getLockProvider().lock(configWithShortTimeout); + final var SHORT_LOCK_NAME = UUID.randomUUID().toString(); + + var configWithShortTimeout = lockConfig(SHORT_LOCK_NAME, lockAtMostFor, Duration.ZERO); + var lock1 = getLockProvider().lock(configWithShortTimeout); assertThat(lock1).isNotEmpty(); - sleep(lockAtMostFor.toMillis() * 2); - assertUnlocked(LOCK_NAME1); + // there is no config to control how fast NATS actually honors its TTL.. ie reaper timers ect + sleep(Duration.ofSeconds(2).toMillis()); + assertUnlocked(SHORT_LOCK_NAME); - Optional lock2 = - getLockProvider().lock(lockConfig(LOCK_NAME1, Duration.ofSeconds(1), Duration.ZERO)); + var lock2 = getLockProvider().lock(lockConfig(SHORT_LOCK_NAME, lockAtMostFor, Duration.ZERO)); assertThat(lock2).isNotEmpty(); lock2.get().unlock(); } + @Override + @Test + public void shouldLockAtLeastFor() throws InterruptedException { + doTestShouldLockAtLeastFor(100); + } + + @Override + protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedException { + final var ATLEAST_LOCK_NAME = UUID.randomUUID().toString(); + final var lockAtLeastFor = Duration.ofSeconds(1L); + + var lockConfig = lockConfig(ATLEAST_LOCK_NAME, lockAtLeastFor.multipliedBy(2), lockAtLeastFor); + + // Lock for LOCK_AT_LEAST_FOR - we do not expect the lock to be released before + // this time + var lock1 = + getLockProvider().lock(lockConfig); + assertThat(lock1).describedAs("Should be locked").isNotEmpty(); + lock1.get().unlock(); + + // Even though we have unlocked the lock, it will be held for some time + assertThat(getLockProvider().lock(lockConfig)) + .describedAs(getClass().getName() + "Can not acquire lock, grace period did not pass yet") + .isEmpty(); + + // Let's wait for the lock to be automatically released + // there is no config to control how fast NATS actually honors its TTL.. ie reaper timers ect + sleep(lockAtLeastFor.toMillis() * 4); + + // Should be able to acquire now + var lock3 = getLockProvider().lock(lockConfig); + assertThat(lock3) + .describedAs(getClass().getName() + "Can acquire the lock after grace period") + .isNotEmpty(); + lock3.get().unlock(); + } + + @Override protected LockProvider getLockProvider() { return lockProvider; } - private String getLock(String lockName) { + private KeyValueEntry getLock(String lockName) { try { - return (String) nc.keyValue(NatsJetStreamLockProvider.buildKey(lockName)).getBucketName(); - } catch (IOException e) { - fail(e); + var bucketName = String.format("SHEDLOCK-%s", lockName); + var keyValueEntry = connection.keyValue(bucketName).get("LOCKED"); + if (keyValueEntry != null) { + + } + return keyValueEntry; + } catch (JetStreamApiException e) { + return null; + } catch (Exception e) { + if (e.getCause() instanceof JetStreamApiException ex && ex.getApiErrorCode() == 10059) { + return null; + } + throw new IllegalStateException(e); } - return null; } } From 921930c937077958a28b14b60befbde7a889b111 Mon Sep 17 00:00:00 2001 From: Martin Nyholm Jelle Date: Mon, 18 Nov 2024 12:52:51 +0000 Subject: [PATCH 3/5] updated readme --- README.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/README.md b/README.md index 170ed48fc..9c75524d1 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha - [In-Memory](#in-memory) - [Memcached](#memcached-using-spymemcached) - [Datastore](#datastore) + - [NATS JetStream](#jetstream) + [Multi-tenancy](#multi-tenancy) + [Customization](#customization) + [Duration specification](#duration-specification) @@ -885,6 +886,38 @@ public LockProvider lockProvider(DatabaseClient databaseClient) { } ``` +#### JetStream +NatsJetStreamLockProvider has some limitations due to how NATS have implemented TTL. +TTL is currently defined on 'bucket' level, meaning a lockname cannot have multiple lock timings. +Also 100ms is the smallest lock timing supported. Reaping the TTL expired locks cannot be configured in NATS currently, +so timing is best effort. + +Buckets are auto created, but never deleted. So any timing changes will require manual deletion of the bucket. + +NATS 2.11 should make it possible to define TTL on key level when released... so fingers crossed :D + +Import the project + +```xml + + net.javacrumbs.shedlock + shedlock-provider-jetstream + 5.16.0 + +``` + +Configure: + +```java +import net.javacrumbs.shedlock.provider.nats.jetstream.NatsJetStreamLockProvider; + +... + +@Bean +public NatsJetStreamLockProvider lockProvider(io.nats.client.Connection connection) { + return new NatsJetStreamLockProvider(connection); +} +``` ## Multi-tenancy If you have multi-tenancy use-case you can use a lock provider similar to this one From 99551245855e359c22ceded080593b90ad4af8aa Mon Sep 17 00:00:00 2001 From: Lukas Krecan Date: Mon, 18 Nov 2024 20:11:15 +0100 Subject: [PATCH 4/5] Cleanup --- .../provider/nats/jetstream/LockContent.java | 6 +- .../nats/jetstream/LockContentHandler.java | 16 +++--- .../nats/jetstream/NatsJetStreamLock.java | 57 +++++++++---------- .../jetstream/NatsJetStreamLockProvider.java | 40 +++++++------ .../provider/nats/jetstream/package-info.java | 6 ++ .../jetstream/NatsJetStreamContainer.java | 11 ++-- ...sJetStreamLockProviderIntegrationTest.java | 38 +++++-------- 7 files changed, 82 insertions(+), 92 deletions(-) create mode 100644 providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java index de12ca006..d773b3531 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java @@ -2,8 +2,4 @@ import java.time.Instant; -import net.javacrumbs.shedlock.support.annotation.NonNull; - -public record LockContent(@NonNull Instant lockAtLeastUntil, @NonNull Instant lockAtMostUntil) { - -} +record LockContent(Instant lockAtLeastUntil, Instant lockAtMostUntil) {} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java index 3d3c7defb..01937fe87 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java @@ -3,16 +3,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import net.javacrumbs.shedlock.support.annotation.NonNull; +class LockContentHandler { -public class LockContentHandler { + private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class); - private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class); + private LockContentHandler() {} - private LockContentHandler() {} - - public static byte[] writeContent(@NonNull LockContent lockContent) { - log.debug("write lock: {}", lockContent); - return lockContent.lockAtLeastUntil().toString().getBytes(); - } + static byte[] writeContent(LockContent lockContent) { + log.debug("write lock: {}", lockContent); + return lockContent.lockAtLeastUntil().toString().getBytes(); + } } diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java index ad2f6dc67..554612bcb 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java @@ -1,47 +1,42 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; +import io.nats.client.Connection; import java.time.Duration; import java.time.Instant; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.nats.client.Connection; import net.javacrumbs.shedlock.core.AbstractSimpleLock; import net.javacrumbs.shedlock.core.ClockProvider; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.support.LockException; -import net.javacrumbs.shedlock.support.annotation.NonNull; - -public final class NatsJetStreamLock extends AbstractSimpleLock { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class); +final class NatsJetStreamLock extends AbstractSimpleLock { - private final Connection connection; + private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class); - protected NatsJetStreamLock( - @NonNull Connection connection, @NonNull LockConfiguration lockConfiguration) { - super(lockConfiguration); - this.connection = connection; - } + private final Connection connection; - @Override - protected void doUnlock() { - var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); - log.debug("Unlocking for bucketName: {}", bucketName); - var keepLockFor = getSecondUntil(lockConfiguration.getLockAtLeastUntil()); - if (keepLockFor <= 0) { - try { - log.debug("Calling delete on key"); + NatsJetStreamLock(Connection connection, LockConfiguration lockConfiguration) { + super(lockConfiguration); + this.connection = connection; + } - connection.keyValue(bucketName).delete("LOCKED"); - } catch (Exception e) { - throw new LockException("Can not remove node. " + e.getMessage()); - } + @Override + protected void doUnlock() { + var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); + log.debug("Unlocking for bucketName: {}", bucketName); + var keepLockFor = geMillisUntil(lockConfiguration.getLockAtLeastUntil()); + if (keepLockFor <= 0) { + try { + log.debug("Calling delete on key"); + connection.keyValue(bucketName).delete("LOCKED"); + } catch (Exception e) { + throw new LockException("Can not remove node. " + e.getMessage()); + } + } } - } - private static long getSecondUntil(Instant instant) { - return Duration.between(ClockProvider.now(), instant).toMillis(); - } + private static long geMillisUntil(Instant instant) { + return Duration.between(ClockProvider.now(), instant).toMillis(); + } } diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java index e01816deb..f49bd3045 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java @@ -1,19 +1,16 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; -import java.io.IOException; -import java.time.Duration; -import java.util.Optional; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.nats.client.Connection; import io.nats.client.JetStreamApiException; import io.nats.client.api.KeyValueConfiguration; +import java.io.IOException; +import java.time.Duration; +import java.util.Optional; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.core.SimpleLock; -import net.javacrumbs.shedlock.support.annotation.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Lock Provider for NATS JetStream @@ -32,28 +29,37 @@ public class NatsJetStreamLockProvider implements LockProvider { * @param connection * io.nats.client.Connection */ - public NatsJetStreamLockProvider(@NonNull Connection connection) { + public NatsJetStreamLockProvider(Connection connection) { this.connection = connection; } @Override - @NonNull - public Optional lock(@NonNull LockConfiguration lockConfiguration) { + public Optional lock(LockConfiguration lockConfiguration) { var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); log.debug("Attempting lock for bucketName: {}", bucketName); try { var lockTime = lockConfiguration.getLockAtMostFor(); // nats cannot accept below 100ms - if(lockTime.toMillis() < 100) { - log.debug("NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!", lockTime.toMillis()); + if (lockTime.toMillis() < 100) { + log.debug( + "NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!", + lockTime.toMillis()); lockTime = Duration.ofMillis(100L); } - connection.keyValueManagement().create( - KeyValueConfiguration.builder().name(bucketName).ttl(lockTime).build()); - connection.keyValue(bucketName).create("LOCKED", LockContentHandler.writeContent( - new LockContent(lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil()))); + connection + .keyValueManagement() + .create(KeyValueConfiguration.builder() + .name(bucketName) + .ttl(lockTime) + .build()); + connection + .keyValue(bucketName) + .create( + "LOCKED", + LockContentHandler.writeContent(new LockContent( + lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil()))); log.debug("Accuired lock for bucketName: {}", bucketName); diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java new file mode 100644 index 000000000..704797d49 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java @@ -0,0 +1,6 @@ +@NonNullApi +@NonNullFields +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import net.javacrumbs.shedlock.support.annotation.NonNullApi; +import net.javacrumbs.shedlock.support.annotation.NonNullFields; diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java index 93c6e152c..3673273cd 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java @@ -1,7 +1,6 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; import java.time.Duration; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; @@ -20,10 +19,10 @@ public class NatsJetStreamContainer extends GenericContainer LOGGER.info(frame.getUtf8String().replace("\n", ""))) - .withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString()) - .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*")) - .withStartupTimeout(Duration.ofSeconds(180L)); + .withNetworkAliases("nats") + .withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", ""))) + .withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString()) + .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*")) + .withStartupTimeout(Duration.ofSeconds(180L)); } } diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java index 41af6124b..edd0ec641 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java @@ -1,16 +1,19 @@ package net.javacrumbs.shedlock.provider.nats.jetstream; +import static java.lang.Thread.sleep; import static org.assertj.core.api.Assertions.assertThat; +import io.nats.client.Connection; +import io.nats.client.ConnectionListener; +import io.nats.client.JetStreamApiException; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.api.KeyValueEntry; import java.time.Duration; import java.util.UUID; - -import org.junit.jupiter.api.AfterEach; - -import static java.lang.Thread.sleep; - import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -18,15 +21,8 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import io.nats.client.Connection; -import io.nats.client.ConnectionListener; -import io.nats.client.JetStreamApiException; -import io.nats.client.Nats; -import io.nats.client.Options; -import io.nats.client.api.KeyValueEntry; - @Testcontainers -public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest { +class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest { @Container public static final NatsJetStreamContainer container = new NatsJetStreamContainer(); @@ -35,7 +31,7 @@ public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProvid private Connection connection; @BeforeEach - public void createLockProvider() throws Exception { + void createLockProvider() throws Exception { var natsUrl = String.format("nats://%s:%d", container.getHost(), container.getFirstMappedPort()); connection = Nats.connect(Options.builder() .server(natsUrl) @@ -54,7 +50,7 @@ public void connectionEvent(Connection conn, Events type) { } @AfterEach - public void stopLockProvider() throws Exception { + void stopLockProvider() throws Exception { connection.close(); } @@ -107,8 +103,7 @@ protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedExce // Lock for LOCK_AT_LEAST_FOR - we do not expect the lock to be released before // this time - var lock1 = - getLockProvider().lock(lockConfig); + var lock1 = getLockProvider().lock(lockConfig); assertThat(lock1).describedAs("Should be locked").isNotEmpty(); lock1.get().unlock(); @@ -129,20 +124,15 @@ protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedExce lock3.get().unlock(); } - @Override protected LockProvider getLockProvider() { return lockProvider; } - private KeyValueEntry getLock(String lockName) { + private @Nullable KeyValueEntry getLock(String lockName) { try { var bucketName = String.format("SHEDLOCK-%s", lockName); - var keyValueEntry = connection.keyValue(bucketName).get("LOCKED"); - if (keyValueEntry != null) { - - } - return keyValueEntry; + return connection.keyValue(bucketName).get("LOCKED"); } catch (JetStreamApiException e) { return null; } catch (Exception e) { From 61b32ac5b13805364102373585a87959f2554455 Mon Sep 17 00:00:00 2001 From: Lukas Krecan Date: Mon, 18 Nov 2024 20:32:04 +0100 Subject: [PATCH 5/5] Test cleanup --- .../EtcdLockProviderIntegrationTest.java | 2 +- .../jetstream/NatsJetStreamLockProvider.java | 2 +- ...sJetStreamLockProviderIntegrationTest.java | 32 ++++--------------- .../AbstractLockProviderIntegrationTest.java | 6 +++- 4 files changed, 14 insertions(+), 28 deletions(-) diff --git a/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java b/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java index efcab26e5..f4adacb4e 100644 --- a/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java +++ b/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java @@ -81,7 +81,7 @@ public void shouldTimeout() throws InterruptedException { @Test @Override public void shouldLockAtLeastFor() throws InterruptedException { - doTestShouldLockAtLeastFor(2000); + doTestShouldLockAtLeastFor(LOCK_AT_LEAST_FOR, 2000); } @Override diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java index f49bd3045..0ccdb56f9 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java @@ -61,7 +61,7 @@ public Optional lock(LockConfiguration lockConfiguration) { LockContentHandler.writeContent(new LockContent( lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil()))); - log.debug("Accuired lock for bucketName: {}", bucketName); + log.debug("Acquired lock for bucketName: {}", bucketName); return Optional.of(new NatsJetStreamLock(connection, lockConfiguration)); } catch (JetStreamApiException e) { diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java index edd0ec641..9456f2ec8 100644 --- a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java @@ -10,8 +10,8 @@ import io.nats.client.Options; import io.nats.client.api.KeyValueEntry; import java.time.Duration; -import java.util.UUID; import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.support.annotation.Nullable; import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -68,38 +68,19 @@ protected void assertLocked(String lockName) { @Test public void shouldTimeout() throws InterruptedException { /** jetstreams smallest allowed unit is 100 milliseconds. */ - this.doTestTimeout(Duration.ofMillis(100)); - } - - @Override - protected void doTestTimeout(Duration lockAtMostFor) throws InterruptedException { - final var SHORT_LOCK_NAME = UUID.randomUUID().toString(); - - var configWithShortTimeout = lockConfig(SHORT_LOCK_NAME, lockAtMostFor, Duration.ZERO); - var lock1 = getLockProvider().lock(configWithShortTimeout); - assertThat(lock1).isNotEmpty(); - - // there is no config to control how fast NATS actually honors its TTL.. ie reaper timers ect - sleep(Duration.ofSeconds(2).toMillis()); - assertUnlocked(SHORT_LOCK_NAME); - - var lock2 = getLockProvider().lock(lockConfig(SHORT_LOCK_NAME, lockAtMostFor, Duration.ZERO)); - assertThat(lock2).isNotEmpty(); - lock2.get().unlock(); + this.doTestTimeout(Duration.ofMillis(100), Duration.ofSeconds(2)); } @Override @Test public void shouldLockAtLeastFor() throws InterruptedException { - doTestShouldLockAtLeastFor(100); + doTestShouldLockAtLeastFor(3_000); } @Override protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedException { - final var ATLEAST_LOCK_NAME = UUID.randomUUID().toString(); - final var lockAtLeastFor = Duration.ofSeconds(1L); - - var lockConfig = lockConfig(ATLEAST_LOCK_NAME, lockAtLeastFor.multipliedBy(2), lockAtLeastFor); + Duration lockAtLeastFor = LOCK_AT_LEAST_FOR; + var lockConfig = lockConfig(LOCK_NAME1, lockAtLeastFor.multipliedBy(2), lockAtLeastFor); // Lock for LOCK_AT_LEAST_FOR - we do not expect the lock to be released before // this time @@ -108,7 +89,7 @@ protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedExce lock1.get().unlock(); // Even though we have unlocked the lock, it will be held for some time - assertThat(getLockProvider().lock(lockConfig)) + assertThat(getLockProvider().lock(lockConfig(LOCK_NAME1))) .describedAs(getClass().getName() + "Can not acquire lock, grace period did not pass yet") .isEmpty(); @@ -117,6 +98,7 @@ protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedExce sleep(lockAtLeastFor.toMillis() * 4); // Should be able to acquire now + // THIS is the difference when compared to the standard test var lock3 = getLockProvider().lock(lockConfig); assertThat(lock3) .describedAs(getClass().getName() + "Can acquire the lock after grace period") diff --git a/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java b/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java index 5f6954591..855175e73 100644 --- a/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java +++ b/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java @@ -87,11 +87,15 @@ public void shouldTimeout() throws InterruptedException { } protected void doTestTimeout(Duration lockAtMostFor) throws InterruptedException { + doTestTimeout(lockAtMostFor, lockAtMostFor.multipliedBy(2)); + } + + protected void doTestTimeout(Duration lockAtMostFor, Duration waitFor) throws InterruptedException { LockConfiguration configWithShortTimeout = lockConfig(LOCK_NAME1, lockAtMostFor, Duration.ZERO); Optional lock1 = getLockProvider().lock(configWithShortTimeout); assertThat(lock1).isNotEmpty(); - sleep(lockAtMostFor.toMillis() * 2); + sleep(waitFor.toMillis()); assertUnlocked(LOCK_NAME1); Optional lock2 =