Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-1851 - Allow block allocation to recover from constraint violations #62

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;
import uk.ac.ebi.ampt2d.commons.accession.utils.exceptions.ExponentialBackOffMaxRetriesRuntimeException;

import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

/**
Expand All @@ -32,6 +33,8 @@ public abstract class ExponentialBackOff {

static int DEFAULT_TOTAL_ATTEMPTS = 7;
private static int DEFAULT_TIME_BASE = 1000;
private static int MAX_JITTER = 1000;
private static int MIN_JITTER = 100;

public static void execute(Runnable function) {
execute(function, DEFAULT_TOTAL_ATTEMPTS, DEFAULT_TIME_BASE);
Expand Down Expand Up @@ -78,7 +81,8 @@ public static <T> T execute(Supplier<T> function, int totalAttempts, int timeBas

private static void doWait(int valueInTheSeries, int timeBase) {
try {
Thread.sleep(valueInTheSeries * timeBase);
Thread.sleep(valueInTheSeries * timeBase +
ThreadLocalRandom.current().nextInt(MIN_JITTER, MAX_JITTER));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories.ContiguousIdBlockRepository;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -34,6 +36,9 @@ public class ContiguousIdBlockService {

private Map<String, BlockParameters> categoryBlockInitializations;

@PersistenceContext
EntityManager entityManager;

public ContiguousIdBlockService(ContiguousIdBlockRepository repository, Map<String, BlockParameters>
categoryBlockInitializations) {
this.repository = repository;
Expand All @@ -49,14 +54,20 @@ public void save(Iterable<ContiguousIdBlock> blocks) {
public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
ContiguousIdBlock lastBlock = repository.findFirstByCategoryIdOrderByLastValueDesc(categoryId);
BlockParameters blockParameters = getBlockParameters(categoryId);
ContiguousIdBlock reservedBlock = null;
if (lastBlock != null) {
return repository.save(lastBlock.nextBlock(instanceId, blockParameters.getBlockSize(),
blockParameters.getNextBlockInterval(), blockParameters.getBlockStartValue()));
reservedBlock = repository.save(lastBlock.nextBlock(instanceId, blockParameters.getBlockSize(),
blockParameters.getNextBlockInterval(),
blockParameters.getBlockStartValue()));
entityManager.flush();
} else {
ContiguousIdBlock newBlock = new ContiguousIdBlock(categoryId, instanceId,
blockParameters.getBlockStartValue(), blockParameters.getBlockSize());
return repository.save(newBlock);
blockParameters.getBlockStartValue(),
blockParameters.getBlockSize());
reservedBlock = repository.save(newBlock);
entityManager.flush();
}
return reservedBlock;
}

public BlockParameters getBlockParameters(String categoryId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -31,15 +32,22 @@
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.test.configuration.MonotonicAccessionGeneratorTestConfiguration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@DataJpaTest
Expand All @@ -60,6 +68,11 @@ public class MonotonicAccessionGeneratorTest {
@Autowired
private ContiguousIdBlockService service;

@Before
public void initializeBlocks() {
repository.deleteAll();
}

@Test
public void assertNoBlockGeneratedAtLoadIfNoneExists() throws Exception {
MonotonicAccessionGenerator generator = getMonotonicAccessionGenerator();
Expand Down Expand Up @@ -433,4 +446,73 @@ private MonotonicAccessionGenerator getMonotonicAccessionGeneratorForCategoryHav
return new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service);
}

//See https://www.planetgeek.ch/2009/08/25/how-to-find-a-concurrency-bug-with-java/
public static void assertConcurrent(final String message, final List<? extends Runnable> runnables,
final int maxTimeoutSeconds) throws InterruptedException {
final int numThreads = runnables.size();
final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
try {
final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
final CountDownLatch afterInitBlocker = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(numThreads);
for (final Runnable submittedTestRunnable : runnables) {
threadPool.submit(new Runnable() {
public void run() {
allExecutorThreadsReady.countDown();
try {
afterInitBlocker.await();
submittedTestRunnable.run();
} catch (final Throwable e) {
exceptions.add(e);
} finally {
allDone.countDown();
}
}
});
}
// wait until all threads are ready
assertTrue(
"Timeout when initializing threads! " +
"Perform long lasting initializations before passing runnables to assertConcurrent",
allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS));
// start all test runners
afterInitBlocker.countDown();
assertTrue(message +" timeout! More than" + maxTimeoutSeconds + "seconds",
allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS));
} finally {
threadPool.shutdownNow();
}
assertTrue(message + "failed with exception(s)" + exceptions, exceptions.isEmpty());
}

@Test
public void assertConcurrentAccessionGenerationWithSameInstance() throws InterruptedException {
List<Runnable> accessionGenerators = new ArrayList<>();
int numConcurrentGenerators = 32;

for (int i = 0; i < numConcurrentGenerators; i++) {
accessionGenerators.add(new Runnable() {
MonotonicAccessionGenerator generator = new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID,
service);
@Override
public void run() {
try {
generator.generateAccessions(BLOCK_SIZE);
}
catch(Exception ex) {
ex.printStackTrace();
}
}
});
}

assertConcurrent("Concurrent accession generation", accessionGenerators, 120);

// Ensure that we arrive at the expected final block configuration
// despite constraint violations that occur above due to concurrent accession generation with the same instance
ContiguousIdBlock lastBlock = repository.findFirstByCategoryIdOrderByLastValueDesc(CATEGORY_ID);
assertEquals(numConcurrentGenerators * BLOCK_SIZE - 1, lastBlock.getLastValue());
}

}