Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3570 Added recovery agent for running recovery of blocks #88

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.MonotonicDatabaseService;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class MonotonicAccessionRecoveryAgent {
private final static Logger logger = LoggerFactory.getLogger(MonotonicAccessionRecoveryAgent.class);

private final ContiguousIdBlockService blockService;
private final MonotonicDatabaseService monotonicDatabaseService;

public MonotonicAccessionRecoveryAgent(ContiguousIdBlockService blockService,
MonotonicDatabaseService monotonicDatabaseService) {
this.blockService = blockService;
this.monotonicDatabaseService = monotonicDatabaseService;
}

public void runRecovery(String categoryId, String applicationInstanceId, LocalDateTime lastUpdatedTime) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should add logging statement or have a way to report how many blocks were processed and how many blocks were recovered in each category.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added logger statements

logger.info("Starting recovering of blocks for category " + categoryId);
List<ContiguousIdBlock> blocksToRecover = blockService.allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(categoryId, lastUpdatedTime);
logger.info("List of block ids to recover : " + blocksToRecover.stream().map(b -> Long.toString(b.getId()))
.collect(Collectors.joining(",")));
for (ContiguousIdBlock block : blocksToRecover) {
logger.info("Recovering Block: " + block);
if (block.getLastCommitted() == block.getLastValue()) {
logger.info("Block is already completely used, not need to run recovery. Releasing the block.");
setAppInstanceIdAndReleaseBlock(applicationInstanceId, block);
continue;
}

// run recover state for a block using BlockManager's recover state method
Set<ContiguousIdBlock> blockSet = recoverStateForBlock(block);

if (blockSet.isEmpty()) {
// if block's last committed is correctly set, BlockManager's recover method will return an empty set
logger.info("Block's last committed is correct. No updates to last_committed. Releasing the block.");
setAppInstanceIdAndReleaseBlock(applicationInstanceId, block);
} else {
ContiguousIdBlock blockToUpdate = blockSet.iterator().next();
logger.info("Recovery ran successfully for block. Last committed updated to " + block.getLastCommitted()
+ ". Saving and releasing the block.");
setAppInstanceIdAndReleaseBlock(applicationInstanceId, blockToUpdate);
}
}
}

private Set<ContiguousIdBlock> recoverStateForBlock(ContiguousIdBlock block) {
BlockManager blockManager = new BlockManager();
blockManager.addBlock(block);
MonotonicRange monotonicRange = blockManager.getAvailableRanges().poll();
long[] committedElements = monotonicDatabaseService.getAccessionsInRanges(Collections.singletonList(monotonicRange));
return blockManager.recoverState(committedElements);
}

private void setAppInstanceIdAndReleaseBlock(String applicationInstanceId, ContiguousIdBlock block) {
block.setApplicationInstanceId(applicationInstanceId);
block.releaseReserved();
blockService.save(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,18 @@ public int compareTo(ContiguousIdBlock contiguousIdBlock) {
protected void onUpdate() {
this.lastUpdatedTimestamp = LocalDateTime.now();
}

@Override
public String toString() {
return "ContiguousIdBlock{" +
"id=" + id +
", categoryId='" + categoryId + '\'' +
", applicationInstanceId='" + applicationInstanceId + '\'' +
", firstValue=" + firstValue +
", lastValue=" + lastValue +
", lastCommitted=" + lastCommitted +
", reserved=" + reserved +
", lastUpdatedTimestamp=" + lastUpdatedTimestamp +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.stereotype.Repository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;

import java.time.LocalDateTime;
import java.util.List;

@Repository
Expand All @@ -31,4 +32,7 @@ public interface ContiguousIdBlockRepository extends CrudRepository<ContiguousId
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId);

ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId);

List<ContiguousIdBlock> findByCategoryIdAndReservedIsTrueAndLastUpdatedTimestampLessThanEqualOrderByLastValueAsc(
String categoryId, LocalDateTime lastUpdatedTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +77,16 @@ public void save(Iterable<ContiguousIdBlock> blocks) {
entityManager.flush();
}

@Transactional
public void save(ContiguousIdBlock block) {
// release block if full
if (block.isFull()) {
block.releaseReserved();
}
repository.save(block);
entityManager.flush();
}

@Transactional(isolation = Isolation.SERIALIZABLE)
public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
ContiguousIdBlock lastBlock = repository.findFirstByCategoryIdOrderByLastValueDesc(categoryId);
Expand Down Expand Up @@ -110,4 +121,9 @@ public List<ContiguousIdBlock> reserveUncompletedBlocksForCategoryIdAndApplicati
return blockList;
}

public List<ContiguousIdBlock> allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(String categoryId,
LocalDateTime lastUpdatedTimeStamp) {
return repository.findByCategoryIdAndReservedIsTrueAndLastUpdatedTimestampLessThanEqualOrderByLastValueAsc(categoryId, lastUpdatedTimeStamp);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import uk.ac.ebi.ampt2d.commons.accession.core.models.AccessionWrapper;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories.ContiguousIdBlockRepository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.commons.accession.service.BasicSpringDataRepositoryMonotonicDatabaseService;
import uk.ac.ebi.ampt2d.test.configuration.MonotonicAccessionGeneratorTestConfiguration;
import uk.ac.ebi.ampt2d.test.configuration.TestMonotonicDatabaseServiceTestConfiguration;
import uk.ac.ebi.ampt2d.test.models.TestModel;
import uk.ac.ebi.ampt2d.test.persistence.TestMonotonicEntity;

import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.StreamSupport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@DataJpaTest
@ContextConfiguration(classes = {MonotonicAccessionGeneratorTestConfiguration.class, TestMonotonicDatabaseServiceTestConfiguration.class})
public class MonotonicAccessionRecoveryAgentTest {
private static final String TEST_CATEGORY = "TEST_CATEGORY";
private static final String TEST_APP_INSTANCE_ID = "TEST_APP_INSTANCE_ID";
private static final String TEST_RECOVERY_AGENT_APP_INSTANCE_ID = "TEST_RECOVERY_AGENT_APP_INSTANCE_ID";

@Autowired
private BasicSpringDataRepositoryMonotonicDatabaseService<TestModel, TestMonotonicEntity> monotonicDBService;
@Autowired
private ContiguousIdBlockRepository repository;
@Autowired
private ContiguousIdBlockService service;

@Test
public void testRunRecovery() throws InterruptedException {
// block1 does not have any accessions used
ContiguousIdBlock block1 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 0, 100);
repository.save(block1);

// block2 is full and has all accessions used
ContiguousIdBlock block2 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 100, 100);
block2.setLastCommitted(199);
repository.save(block2);

// block3 has some of the accessions used but not captured in the block's table
ContiguousIdBlock block3 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 200, 100);
repository.save(block3);
// save some accessions in db that are not captured in block3
List<AccessionWrapper<TestModel, String, Long>> accessionsSet = LongStream.range(200l, 225l)
.boxed()
.map(longAcc -> new AccessionWrapper<>(longAcc, "hash-1" + longAcc, TestModel.of("test-obj-1-" + longAcc)))
.collect(Collectors.toList());
monotonicDBService.save(accessionsSet);

// block4 should not be recovered as it is after the recover cut off time
Thread.sleep(2000);
ContiguousIdBlock block4 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 300, 100);
repository.save(block4);

// run recovery through recovery agent
LocalDateTime recoverCutOffTime = block3.getLastUpdatedTimestamp();
MonotonicAccessionRecoveryAgent recoveryAgent = new MonotonicAccessionRecoveryAgent(service, monotonicDBService);
recoveryAgent.runRecovery(TEST_CATEGORY, TEST_RECOVERY_AGENT_APP_INSTANCE_ID, recoverCutOffTime);


List<ContiguousIdBlock> blockList = StreamSupport.stream(repository.findAll().spliterator(), false)
.sorted(Comparator.comparing(ContiguousIdBlock::getFirstValue))
.collect(Collectors.toList());
assertEquals(4, blockList.size());

assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(0).getApplicationInstanceId());
assertEquals(0, blockList.get(0).getFirstValue());
assertEquals(-1, blockList.get(0).getLastCommitted());
assertTrue(blockList.get(0).isNotReserved());

assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(1).getApplicationInstanceId());
assertEquals(100, blockList.get(1).getFirstValue());
assertEquals(199, blockList.get(1).getLastCommitted());
assertTrue(blockList.get(1).isNotReserved());

assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(2).getApplicationInstanceId());
assertEquals(200, blockList.get(2).getFirstValue());
assertEquals(224, blockList.get(2).getLastCommitted());
assertTrue(blockList.get(2).isNotReserved());

assertEquals(TEST_APP_INSTANCE_ID, blockList.get(3).getApplicationInstanceId());
assertEquals(300, blockList.get(3).getFirstValue());
assertEquals(299, blockList.get(3).getLastCommitted());
assertTrue(blockList.get(3).isReserved());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -275,8 +277,10 @@ public void testLastUpdateTimeStampAutoUpdate() {
entityManager.flush();

// assert block values
ContiguousIdBlock blockInDB = repository.findById(1L).get();
assertEquals(1L, blockInDB.getId());
List<ContiguousIdBlock> blockInDBList = StreamSupport.stream(repository.findAll().spliterator(), false)
.collect(Collectors.toList());
assertEquals(1, blockInDBList.size());
ContiguousIdBlock blockInDB = blockInDBList.get(0);
assertEquals(CATEGORY_ID, blockInDB.getCategoryId());
assertEquals(INSTANCE_ID, blockInDB.getApplicationInstanceId());
assertEquals(100, blockInDB.getFirstValue());
Expand All @@ -289,7 +293,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.setLastCommitted(100);
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertEquals(100, blockInDB.getLastCommitted());

LocalDateTime blockLastCommittedUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -298,7 +302,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.setApplicationInstanceId(INSTANCE_ID_2);
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertEquals(INSTANCE_ID_2, blockInDB.getApplicationInstanceId());

LocalDateTime blockApplicationInstanceUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -307,7 +311,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.releaseReserved();
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertTrue(blockInDB.isNotReserved());

LocalDateTime blockReleaseAsReservedUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -316,7 +320,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.markAsReserved();
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertTrue(blockInDB.isReserved());

LocalDateTime blockMarkAsReservedUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -327,4 +331,34 @@ public void testLastUpdateTimeStampAutoUpdate() {
assertTrue(blockReleaseAsReservedUpdateTime.isBefore(blockMarkAsReservedUpdateTime));
}

@Test
public void testGetBlocksWithLastUpdatedTimeStampLessThan() throws InterruptedException {
// reserved
ContiguousIdBlock block1 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100);
// reserved
ContiguousIdBlock block2 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 100, 100);
// not reserved
ContiguousIdBlock block3 = getUnreservedContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 200, 100);
// reserved but different category
ContiguousIdBlock block4 = new ContiguousIdBlock(CATEGORY_ID_2, INSTANCE_ID, 300, 100);
// reserved but after timestamp
Thread.sleep(2000L);
ContiguousIdBlock block5 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 400, 100);
repository.save(block1);
repository.save(block2);
repository.save(block3);
repository.save(block4);
repository.save(block5);
entityManager.flush();

LocalDateTime cutOffTimestamp = block4.getLastUpdatedTimestamp();
List<ContiguousIdBlock> blocksList = service.allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(CATEGORY_ID, cutOffTimestamp);

assertEquals(2, blocksList.size());
assertTrue(blocksList.get(0).isReserved());
assertEquals(0, blocksList.get(0).getFirstValue());
assertTrue(blocksList.get(1).isReserved());
assertEquals(100, blocksList.get(1).getFirstValue());
}

}
Loading