Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas-krecan committed Nov 18, 2024
1 parent b34c4e1 commit 5241bc9
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,4 @@

import java.time.Instant;

import net.javacrumbs.shedlock.support.annotation.NonNull;

public record LockContent(@NonNull Instant lockAtLeastUntil, @NonNull Instant lockAtMostUntil) {

}
record LockContent(Instant lockAtLeastUntil, Instant lockAtMostUntil) {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.javacrumbs.shedlock.support.annotation.NonNull;
class LockContentHandler {

public class LockContentHandler {
private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class);

private static final Logger log = LoggerFactory.getLogger(LockContentHandler.class);
private LockContentHandler() {}

private LockContentHandler() {}

public static byte[] writeContent(@NonNull LockContent lockContent) {
log.debug("write lock: {}", lockContent);
return lockContent.lockAtLeastUntil().toString().getBytes();
}
static byte[] writeContent(LockContent lockContent) {
log.debug("write lock: {}", lockContent);
return lockContent.lockAtLeastUntil().toString().getBytes();
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,42 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import io.nats.client.Connection;
import java.time.Duration;
import java.time.Instant;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.nats.client.Connection;
import net.javacrumbs.shedlock.core.AbstractSimpleLock;
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.LockException;
import net.javacrumbs.shedlock.support.annotation.NonNull;

public final class NatsJetStreamLock extends AbstractSimpleLock {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class);
final class NatsJetStreamLock extends AbstractSimpleLock {

private final Connection connection;
private final Logger log = LoggerFactory.getLogger(NatsJetStreamLock.class);

protected NatsJetStreamLock(
@NonNull Connection connection, @NonNull LockConfiguration lockConfiguration) {
super(lockConfiguration);
this.connection = connection;
}
private final Connection connection;

@Override
protected void doUnlock() {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Unlocking for bucketName: {}", bucketName);
var keepLockFor = getSecondUntil(lockConfiguration.getLockAtLeastUntil());
if (keepLockFor <= 0) {
try {
log.debug("Calling delete on key");
NatsJetStreamLock(Connection connection, LockConfiguration lockConfiguration) {
super(lockConfiguration);
this.connection = connection;
}

connection.keyValue(bucketName).delete("LOCKED");
} catch (Exception e) {
throw new LockException("Can not remove node. " + e.getMessage());
}
@Override
protected void doUnlock() {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Unlocking for bucketName: {}", bucketName);
var keepLockFor = geMillisUntil(lockConfiguration.getLockAtLeastUntil());
if (keepLockFor <= 0) {
try {
log.debug("Calling delete on key");
connection.keyValue(bucketName).delete("LOCKED");
} catch (Exception e) {
throw new LockException("Can not remove node. " + e.getMessage());
}
}
}
}

private static long getSecondUntil(Instant instant) {
return Duration.between(ClockProvider.now(), instant).toMillis();
}
private static long geMillisUntil(Instant instant) {
return Duration.between(ClockProvider.now(), instant).toMillis();
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.api.KeyValueConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.support.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Lock Provider for NATS JetStream
Expand All @@ -32,28 +29,37 @@ public class NatsJetStreamLockProvider implements LockProvider {
* @param connection
* io.nats.client.Connection
*/
public NatsJetStreamLockProvider(@NonNull Connection connection) {
public NatsJetStreamLockProvider(Connection connection) {
this.connection = connection;
}

@Override
@NonNull
public Optional<SimpleLock> lock(@NonNull LockConfiguration lockConfiguration) {
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Attempting lock for bucketName: {}", bucketName);
try {
var lockTime = lockConfiguration.getLockAtMostFor();

// nats cannot accept below 100ms
if(lockTime.toMillis() < 100) {
log.debug("NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!", lockTime.toMillis());
if (lockTime.toMillis() < 100) {
log.debug(
"NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!",
lockTime.toMillis());
lockTime = Duration.ofMillis(100L);
}

connection.keyValueManagement().create(
KeyValueConfiguration.builder().name(bucketName).ttl(lockTime).build());
connection.keyValue(bucketName).create("LOCKED", LockContentHandler.writeContent(
new LockContent(lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil())));
connection
.keyValueManagement()
.create(KeyValueConfiguration.builder()
.name(bucketName)
.ttl(lockTime)
.build());
connection
.keyValue(bucketName)
.create(
"LOCKED",
LockContentHandler.writeContent(new LockContent(
lockConfiguration.getLockAtLeastUntil(), lockConfiguration.getLockAtMostUntil())));

log.debug("Accuired lock for bucketName: {}", bucketName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@NonNullApi
@NonNullFields
package net.javacrumbs.shedlock.provider.nats.jetstream;

import net.javacrumbs.shedlock.support.annotation.NonNullApi;
import net.javacrumbs.shedlock.support.annotation.NonNullFields;
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -20,10 +19,10 @@ public class NatsJetStreamContainer extends GenericContainer<NatsJetStreamContai
public NatsJetStreamContainer() {
super(NATS_IMAGE.asCanonicalNameString());
this.withExposedPorts(NATS_PORT, NATS_HTTP_PORT)
.withNetworkAliases("nats")
.withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", "")))
.withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString())
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*"))
.withStartupTimeout(Duration.ofSeconds(180L));
.withNetworkAliases("nats")
.withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", "")))
.withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString())
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*"))
.withStartupTimeout(Duration.ofSeconds(180L));
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import static java.lang.Thread.sleep;
import static org.assertj.core.api.Assertions.assertThat;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStreamApiException;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.KeyValueEntry;
import java.time.Duration;
import java.util.UUID;

import org.junit.jupiter.api.AfterEach;

import static java.lang.Thread.sleep;

import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStreamApiException;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.KeyValueEntry;

@Testcontainers
public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest {
class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest {

@Container
public static final NatsJetStreamContainer container = new NatsJetStreamContainer();
Expand All @@ -35,7 +31,7 @@ public class NatsJetStreamLockProviderIntegrationTest extends AbstractLockProvid
private Connection connection;

@BeforeEach
public void createLockProvider() throws Exception {
void createLockProvider() throws Exception {
var natsUrl = String.format("nats://%s:%d", container.getHost(), container.getFirstMappedPort());
connection = Nats.connect(Options.builder()
.server(natsUrl)
Expand All @@ -54,7 +50,7 @@ public void connectionEvent(Connection conn, Events type) {
}

@AfterEach
public void stopLockProvider() throws Exception {
void stopLockProvider() throws Exception {
connection.close();
}

Expand Down Expand Up @@ -107,8 +103,7 @@ protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedExce

// Lock for LOCK_AT_LEAST_FOR - we do not expect the lock to be released before
// this time
var lock1 =
getLockProvider().lock(lockConfig);
var lock1 = getLockProvider().lock(lockConfig);
assertThat(lock1).describedAs("Should be locked").isNotEmpty();
lock1.get().unlock();

Expand All @@ -129,7 +124,6 @@ protected void doTestShouldLockAtLeastFor(int sleepForMs) throws InterruptedExce
lock3.get().unlock();
}


@Override
protected LockProvider getLockProvider() {
return lockProvider;
Expand All @@ -139,9 +133,6 @@ private KeyValueEntry getLock(String lockName) {
try {
var bucketName = String.format("SHEDLOCK-%s", lockName);
var keyValueEntry = connection.keyValue(bucketName).get("LOCKED");
if (keyValueEntry != null) {

}
return keyValueEntry;
} catch (JetStreamApiException e) {
return null;
Expand Down

0 comments on commit 5241bc9

Please sign in to comment.