Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats #2260

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open

Nats #2260

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha
- [In-Memory](#in-memory)
- [Memcached](#memcached-using-spymemcached)
- [Datastore](#datastore)
- [NATS JetStream](#jetstream)
+ [Multi-tenancy](#multi-tenancy)
+ [Customization](#customization)
+ [Duration specification](#duration-specification)
Expand Down Expand Up @@ -885,6 +886,38 @@ public LockProvider lockProvider(DatabaseClient databaseClient) {
}
```

#### JetStream
NatsJetStreamLockProvider has some limitations due to how NATS have implemented TTL.
TTL is currently defined on 'bucket' level, meaning a lockname cannot have multiple lock timings.
Also 100ms is the smallest lock timing supported. Reaping the TTL expired locks cannot be configured in NATS currently,
so timing is best effort.

Buckets are auto created, but never deleted. So any timing changes will require manual deletion of the bucket.

NATS 2.11 should make it possible to define TTL on key level when released... so fingers crossed :D

Import the project

```xml
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jetstream</artifactId>
<version>5.16.0</version>
</dependency>
```

Configure:

```java
import net.javacrumbs.shedlock.provider.nats.jetstream.NatsJetStreamLockProvider;

...

@Bean
public NatsJetStreamLockProvider lockProvider(io.nats.client.Connection connection) {
return new NatsJetStreamLockProvider(connection);
}
```

## Multi-tenancy
If you have multi-tenancy use-case you can use a lock provider similar to this one
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<module>providers/memcached/shedlock-provider-memcached-spy</module>
<module>providers/datastore/shedlock-provider-datastore</module>
<module>providers/spanner/shedlock-provider-spanner</module>
<module>providers/jetstream/shedlock-provider-jetstream</module>
<module>providers/neo4j/shedlock-provider-neo4j</module>
</modules>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void shouldTimeout() throws InterruptedException {
@Test
@Override
public void shouldLockAtLeastFor() throws InterruptedException {
doTestShouldLockAtLeastFor(2000);
doTestShouldLockAtLeastFor(LOCK_AT_LEAST_FOR, 2000);
}

@Override
Expand Down
78 changes: 78 additions & 0 deletions providers/jetstream/shedlock-provider-jetstream/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shedlock-parent</artifactId>
<groupId>net.javacrumbs.shedlock</groupId>
<version>5.16.1-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>shedlock-provider-jetstream</artifactId>
<version>5.16.1-SNAPSHOT</version>

<properties>
<nats.version>2.20.2</nats.version>
</properties>

<dependencies>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>${nats.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.ver}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>
net.javacrumbs.shedlock.provider.nats.jetstream
</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import java.time.Instant;

record LockContent(Instant lockAtLeastUntil, Instant lockAtMostUntil) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LockContentHandler {

private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class);

private LockContentHandler() {}

static byte[] writeContent(LockContent lockContent) {
log.debug("write lock: {}", lockContent);
return lockContent.lockAtLeastUntil().toString().getBytes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import io.nats.client.Connection;
import java.time.Duration;
import java.time.Instant;
import net.javacrumbs.shedlock.core.AbstractSimpleLock;
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.LockException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NatsJetStreamLock extends AbstractSimpleLock {

private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class);

private final Connection connection;

NatsJetStreamLock(Connection connection, LockConfiguration lockConfiguration) {
super(lockConfiguration);
this.connection = connection;
}

@Override
protected void doUnlock() {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Unlocking for bucketName: {}", bucketName);
var keepLockFor = geMillisUntil(lockConfiguration.getLockAtLeastUntil());
if (keepLockFor <= 0) {
try {
log.debug("Calling delete on key");
connection.keyValue(bucketName).delete("LOCKED");
} catch (Exception e) {
throw new LockException("Can not remove node. " + e.getMessage());
}
}
}

private static long geMillisUntil(Instant instant) {
return Duration.between(ClockProvider.now(), instant).toMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.api.KeyValueConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Lock Provider for NATS JetStream
*
* @see <a href="https://docs.nats.io/nats-concepts/jetstream">JetStream</a>
*/
public class NatsJetStreamLockProvider implements LockProvider {

private final Logger log = LoggerFactory.getLogger(NatsJetStreamLockProvider.class);

private final Connection connection;

/**
* Create NatsJetStreamLockProvider
*
* @param connection
* io.nats.client.Connection
*/
public NatsJetStreamLockProvider(Connection connection) {
this.connection = connection;
}

@Override
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Attempting lock for bucketName: {}", bucketName);
try {
var lockTime = lockConfiguration.getLockAtMostFor();

// nats cannot accept below 100ms
if (lockTime.toMillis() < 100) {
log.debug(
"NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!",
lockTime.toMillis());
lockTime = Duration.ofMillis(100L);
}

connection
.keyValueManagement()
.create(KeyValueConfiguration.builder()
.name(bucketName)
.ttl(lockTime)
.build());
connection
.keyValue(bucketName)
.create(
"LOCKED",
LockContentHandler.writeContent(new LockContent(
lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil())));

log.debug("Acquired lock for bucketName: {}", bucketName);

return Optional.of(new NatsJetStreamLock(connection, lockConfiguration));
} catch (JetStreamApiException e) {
if (e.getApiErrorCode() == 10058 || e.getApiErrorCode() == 10071) {
log.debug("Rejected lock for bucketName: {}, message: {}", bucketName, e.getMessage());
return Optional.empty();
}
log.warn("Rejected lock for bucketName: {}", bucketName);
throw new IllegalStateException(e);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@NonNullApi
@NonNullFields
package net.javacrumbs.shedlock.provider.nats.jetstream;

import net.javacrumbs.shedlock.support.annotation.NonNullApi;
import net.javacrumbs.shedlock.support.annotation.NonNullFields;
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.utility.DockerImageName;

public class NatsJetStreamContainer extends GenericContainer<NatsJetStreamContainer> {

private static final Logger LOGGER = LoggerFactory.getLogger(NatsJetStreamContainer.class);

public static final DockerImageName NATS_IMAGE = DockerImageName.parse("nats:2.10-alpine");

private static final Integer NATS_PORT = 4222;
private static final Integer NATS_HTTP_PORT = 8222;

public NatsJetStreamContainer() {
super(NATS_IMAGE.asCanonicalNameString());
this.withExposedPorts(NATS_PORT, NATS_HTTP_PORT)
.withNetworkAliases("nats")
.withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", "")))
.withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString())
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*"))
.withStartupTimeout(Duration.ofSeconds(180L));
}
}
Loading
Loading