diff --git a/README.md b/README.md index 170ed48fc..9c75524d1 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha - [In-Memory](#in-memory) - [Memcached](#memcached-using-spymemcached) - [Datastore](#datastore) + - [NATS JetStream](#jetstream) + [Multi-tenancy](#multi-tenancy) + [Customization](#customization) + [Duration specification](#duration-specification) @@ -885,6 +886,38 @@ public LockProvider lockProvider(DatabaseClient databaseClient) { } ``` +#### JetStream +NatsJetStreamLockProvider has some limitations due to how NATS have implemented TTL. +TTL is currently defined on 'bucket' level, meaning a lockname cannot have multiple lock timings. +Also 100ms is the smallest lock timing supported. Reaping the TTL expired locks cannot be configured in NATS currently, +so timing is best effort. + +Buckets are auto created, but never deleted. So any timing changes will require manual deletion of the bucket. + +NATS 2.11 should make it possible to define TTL on key level when released... so fingers crossed :D + +Import the project + +```xml + + net.javacrumbs.shedlock + shedlock-provider-jetstream + 5.16.0 + +``` + +Configure: + +```java +import net.javacrumbs.shedlock.provider.nats.jetstream.NatsJetStreamLockProvider; + +... + +@Bean +public NatsJetStreamLockProvider lockProvider(io.nats.client.Connection connection) { + return new NatsJetStreamLockProvider(connection); +} +``` ## Multi-tenancy If you have multi-tenancy use-case you can use a lock provider similar to this one diff --git a/pom.xml b/pom.xml index 88cbb54ea..bad43234d 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ providers/memcached/shedlock-provider-memcached-spy providers/datastore/shedlock-provider-datastore providers/spanner/shedlock-provider-spanner + providers/jetstream/shedlock-provider-jetstream providers/neo4j/shedlock-provider-neo4j diff --git a/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java b/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java index efcab26e5..f4adacb4e 100644 --- a/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java +++ b/providers/etcd/shedlock-provider-etcd-jetcd/src/test/java/net/javacrumbs/shedlock/provider/etcd/jetcd/EtcdLockProviderIntegrationTest.java @@ -81,7 +81,7 @@ public void shouldTimeout() throws InterruptedException { @Test @Override public void shouldLockAtLeastFor() throws InterruptedException { - doTestShouldLockAtLeastFor(2000); + doTestShouldLockAtLeastFor(LOCK_AT_LEAST_FOR, 2000); } @Override diff --git a/providers/jetstream/shedlock-provider-jetstream/pom.xml b/providers/jetstream/shedlock-provider-jetstream/pom.xml new file mode 100644 index 000000000..a782906dd --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/pom.xml @@ -0,0 +1,78 @@ + + + + shedlock-parent + net.javacrumbs.shedlock + 5.16.1-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + shedlock-provider-jetstream + 5.16.1-SNAPSHOT + + + 2.20.2 + + + + + net.javacrumbs.shedlock + shedlock-core + ${project.version} + + + + net.javacrumbs.shedlock + shedlock-test-support + ${project.version} + test + + + + io.nats + jnats + ${nats.version} + + + + org.testcontainers + testcontainers + ${test-containers.ver} + test + + + + org.testcontainers + junit-jupiter + ${test-containers.ver} + test + + + + ch.qos.logback + logback-classic + ${logback.ver} + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + + net.javacrumbs.shedlock.provider.nats.jetstream + + + + + + + + + diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java new file mode 100644 index 000000000..d773b3531 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContent.java @@ -0,0 +1,5 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import java.time.Instant; + +record LockContent(Instant lockAtLeastUntil, Instant lockAtMostUntil) {} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java new file mode 100644 index 000000000..01937fe87 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/LockContentHandler.java @@ -0,0 +1,16 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class LockContentHandler { + + private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class); + + private LockContentHandler() {} + + static byte[] writeContent(LockContent lockContent) { + log.debug("write lock: {}", lockContent); + return lockContent.lockAtLeastUntil().toString().getBytes(); + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java new file mode 100644 index 000000000..554612bcb --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLock.java @@ -0,0 +1,42 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import io.nats.client.Connection; +import java.time.Duration; +import java.time.Instant; +import net.javacrumbs.shedlock.core.AbstractSimpleLock; +import net.javacrumbs.shedlock.core.ClockProvider; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.support.LockException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class NatsJetStreamLock extends AbstractSimpleLock { + + private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class); + + private final Connection connection; + + NatsJetStreamLock(Connection connection, LockConfiguration lockConfiguration) { + super(lockConfiguration); + this.connection = connection; + } + + @Override + protected void doUnlock() { + var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); + log.debug("Unlocking for bucketName: {}", bucketName); + var keepLockFor = geMillisUntil(lockConfiguration.getLockAtLeastUntil()); + if (keepLockFor <= 0) { + try { + log.debug("Calling delete on key"); + connection.keyValue(bucketName).delete("LOCKED"); + } catch (Exception e) { + throw new LockException("Can not remove node. " + e.getMessage()); + } + } + } + + private static long geMillisUntil(Instant instant) { + return Duration.between(ClockProvider.now(), instant).toMillis(); + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java new file mode 100644 index 000000000..0ccdb56f9 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProvider.java @@ -0,0 +1,78 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.api.KeyValueConfiguration; +import java.io.IOException; +import java.time.Duration; +import java.util.Optional; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.core.SimpleLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Lock Provider for NATS JetStream + * + * @see JetStream + */ +public class NatsJetStreamLockProvider implements LockProvider { + + private final Logger log = LoggerFactory.getLogger(NatsJetStreamLockProvider.class); + + private final Connection connection; + + /** + * Create NatsJetStreamLockProvider + * + * @param connection + * io.nats.client.Connection + */ + public NatsJetStreamLockProvider(Connection connection) { + this.connection = connection; + } + + @Override + public Optional lock(LockConfiguration lockConfiguration) { + var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName()); + log.debug("Attempting lock for bucketName: {}", bucketName); + try { + var lockTime = lockConfiguration.getLockAtMostFor(); + + // nats cannot accept below 100ms + if (lockTime.toMillis() < 100) { + log.debug( + "NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!", + lockTime.toMillis()); + lockTime = Duration.ofMillis(100L); + } + + connection + .keyValueManagement() + .create(KeyValueConfiguration.builder() + .name(bucketName) + .ttl(lockTime) + .build()); + connection + .keyValue(bucketName) + .create( + "LOCKED", + LockContentHandler.writeContent(new LockContent( + lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil()))); + + log.debug("Acquired lock for bucketName: {}", bucketName); + + return Optional.of(new NatsJetStreamLock(connection, lockConfiguration)); + } catch (JetStreamApiException e) { + if (e.getApiErrorCode() == 10058 || e.getApiErrorCode() == 10071) { + log.debug("Rejected lock for bucketName: {}, message: {}", bucketName, e.getMessage()); + return Optional.empty(); + } + log.warn("Rejected lock for bucketName: {}", bucketName); + throw new IllegalStateException(e); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java new file mode 100644 index 000000000..704797d49 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/main/java/net/javacrumbs/shedlock/provider/nats/jetstream/package-info.java @@ -0,0 +1,6 @@ +@NonNullApi +@NonNullFields +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import net.javacrumbs.shedlock.support.annotation.NonNullApi; +import net.javacrumbs.shedlock.support.annotation.NonNullFields; diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java new file mode 100644 index 000000000..3673273cd --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamContainer.java @@ -0,0 +1,28 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import java.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +public class NatsJetStreamContainer extends GenericContainer { + + private static final Logger LOGGER = LoggerFactory.getLogger(NatsJetStreamContainer.class); + + public static final DockerImageName NATS_IMAGE = DockerImageName.parse("nats:2.10-alpine"); + + private static final Integer NATS_PORT = 4222; + private static final Integer NATS_HTTP_PORT = 8222; + + public NatsJetStreamContainer() { + super(NATS_IMAGE.asCanonicalNameString()); + this.withExposedPorts(NATS_PORT, NATS_HTTP_PORT) + .withNetworkAliases("nats") + .withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", ""))) + .withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString()) + .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*")) + .withStartupTimeout(Duration.ofSeconds(180L)); + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java new file mode 100644 index 000000000..9456f2ec8 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/java/net/javacrumbs/shedlock/provider/nats/jetstream/NatsJetStreamLockProviderIntegrationTest.java @@ -0,0 +1,127 @@ +package net.javacrumbs.shedlock.provider.nats.jetstream; + +import static java.lang.Thread.sleep; +import static org.assertj.core.api.Assertions.assertThat; + +import io.nats.client.Connection; +import io.nats.client.ConnectionListener; +import io.nats.client.JetStreamApiException; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.api.KeyValueEntry; +import java.time.Duration; +import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.support.annotation.Nullable; +import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest { + + @Container + public static final NatsJetStreamContainer container = new NatsJetStreamContainer(); + + private LockProvider lockProvider; + private Connection connection; + + @BeforeEach + void createLockProvider() throws Exception { + var natsUrl = String.format("nats://%s:%d", container.getHost(), container.getFirstMappedPort()); + connection = Nats.connect(Options.builder() + .server(natsUrl) + .connectionListener(new ConnectionListener() { + + private final Logger log = LoggerFactory.getLogger("ConnectionListener"); + + @Override + public void connectionEvent(Connection conn, Events type) { + log.debug("Received event: {}, on conn: {}", type, conn); + } + }) + .build()); + + lockProvider = new NatsJetStreamLockProvider(connection); + } + + @AfterEach + void stopLockProvider() throws Exception { + connection.close(); + } + + @Override + protected void assertUnlocked(String lockName) { + assertThat(getLock(lockName)).isNull(); + } + + @Override + protected void assertLocked(String lockName) { + assertThat(getLock(lockName)).isNotNull(); + } + + @Override + @Test + public void shouldTimeout() throws InterruptedException { + /** jetstreams smallest allowed unit is 100 milliseconds. */ + this.doTestTimeout(Duration.ofMillis(100), Duration.ofSeconds(2)); + } + + @Override + @Test + public void shouldLockAtLeastFor() throws InterruptedException { + doTestShouldLockAtLeastFor(3_000); + } + + @Override + protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedException { + Duration lockAtLeastFor = LOCK_AT_LEAST_FOR; + var lockConfig = lockConfig(LOCK_NAME1, lockAtLeastFor.multipliedBy(2), lockAtLeastFor); + + // Lock for LOCK_AT_LEAST_FOR - we do not expect the lock to be released before + // this time + var lock1 = getLockProvider().lock(lockConfig); + assertThat(lock1).describedAs("Should be locked").isNotEmpty(); + lock1.get().unlock(); + + // Even though we have unlocked the lock, it will be held for some time + assertThat(getLockProvider().lock(lockConfig(LOCK_NAME1))) + .describedAs(getClass().getName() + "Can not acquire lock, grace period did not pass yet") + .isEmpty(); + + // Let's wait for the lock to be automatically released + // there is no config to control how fast NATS actually honors its TTL.. ie reaper timers ect + sleep(lockAtLeastFor.toMillis() * 4); + + // Should be able to acquire now + // THIS is the difference when compared to the standard test + var lock3 = getLockProvider().lock(lockConfig); + assertThat(lock3) + .describedAs(getClass().getName() + "Can acquire the lock after grace period") + .isNotEmpty(); + lock3.get().unlock(); + } + + @Override + protected LockProvider getLockProvider() { + return lockProvider; + } + + private @Nullable KeyValueEntry getLock(String lockName) { + try { + var bucketName = String.format("SHEDLOCK-%s", lockName); + return connection.keyValue(bucketName).get("LOCKED"); + } catch (JetStreamApiException e) { + return null; + } catch (Exception e) { + if (e.getCause() instanceof JetStreamApiException ex && ex.getApiErrorCode() == 10059) { + return null; + } + throw new IllegalStateException(e); + } + } +} diff --git a/providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml b/providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml new file mode 100644 index 000000000..8ee4ac478 --- /dev/null +++ b/providers/jetstream/shedlock-provider-jetstream/src/test/resources/logback.xml @@ -0,0 +1,29 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java b/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java index 5f6954591..855175e73 100644 --- a/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java +++ b/shedlock-test-support/src/main/java/net/javacrumbs/shedlock/test/support/AbstractLockProviderIntegrationTest.java @@ -87,11 +87,15 @@ public void shouldTimeout() throws InterruptedException { } protected void doTestTimeout(Duration lockAtMostFor) throws InterruptedException { + doTestTimeout(lockAtMostFor, lockAtMostFor.multipliedBy(2)); + } + + protected void doTestTimeout(Duration lockAtMostFor, Duration waitFor) throws InterruptedException { LockConfiguration configWithShortTimeout = lockConfig(LOCK_NAME1, lockAtMostFor, Duration.ZERO); Optional lock1 = getLockProvider().lock(configWithShortTimeout); assertThat(lock1).isNotEmpty(); - sleep(lockAtMostFor.toMillis() * 2); + sleep(waitFor.toMillis()); assertUnlocked(LOCK_NAME1); Optional lock2 =