Skip to content

Commit

Permalink
end-point for running md5checksum updater
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Jan 22, 2024
1 parent c79660e commit f0ec9c2
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -31,6 +32,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@RequestMapping("/v1/admin")
@RestController
Expand Down Expand Up @@ -87,6 +90,30 @@ public ResponseEntity<?> fetchAndInsertAssemblyByAccession(
return new ResponseEntity<>("Accession Processing Result : " + accessionResult, HttpStatus.MULTI_STATUS);
}

@ApiOperation(value = "Given an assembly accession, retrieve MD5 checksum for all chromosomes belonging to assembly and update")
@PutMapping(value = "assemblies/{accession}/md5checksum")
public ResponseEntity<String> retrieveAndInsertMd5ChecksumForAssembly(@PathVariable(name = "accession")
@ApiParam(value = "INSDC or RefSeq assembly accession. Eg: " +
"GCA_000001405.10") String asmAccession) {
handler.retrieveAndInsertMd5ChecksumForAssembly(asmAccession);
return ResponseEntity.ok("A task has been submitted for updating md5checksum for all chromosomes " +
"in assembly " + asmAccession + ". Depending upon the number of chromosomes present in assembly, " +
"this might take some time to complete");
}

@ApiOperation(value = "Retrieve list of assemblies for which MD5 Checksum updates are running/going-to-run ")
@GetMapping(value = "assemblies/md5checksum/status")
public ResponseEntity<String> getMD5ChecksumUpdateTaskStatus() {
Map<String, Set<String>> md5ChecksumUpdateTasks = handler.getMD5ChecksumUpdateTaskStatus();
Set<String> runningTasks = md5ChecksumUpdateTasks.get("running");
Set<String> scheduledTasks = md5ChecksumUpdateTasks.get("scheduled");
String runningTaskRes = runningTasks == null || runningTasks.isEmpty() ? "No running MD5 checksum update tasks" :
runningTasks.stream().collect(Collectors.joining(","));
String scheduledTaskRes = scheduledTasks == null || scheduledTasks.isEmpty() ? "No scheduled MD5 checksum update tasks" :
scheduledTasks.stream().collect(Collectors.joining(","));
return ResponseEntity.ok("running: " + runningTaskRes + "\nscheduled: " + scheduledTaskRes);
}

// This endpoint can be enabled in the future when checksums for assemblies are added to the project.
// @ApiOperation(value = "Add MD5 and TRUNC512 checksums to an assembly by accession.",
// notes = "Given an INSDC or RefSeq accession along with a MD5 or a TRUNC512 checksum, this endpoint will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

@Service
public class AdminHandler {
Expand Down Expand Up @@ -54,6 +55,14 @@ public Map<String, List<String>> fetchAndInsertAssemblyByAccession(List<String>
return assemblyService.fetchAndInsertAssembly(accessions);
}

public void retrieveAndInsertMd5ChecksumForAssembly(String accession) {
assemblyService.retrieveAndInsertMd5ChecksumForAssembly(accession);
}

public Map<String, Set<String>> getMD5ChecksumUpdateTaskStatus() {
return assemblyService.getMD5ChecksumUpdateTaskStatus();
}

public void deleteAssemblyByAccession(String accession) {
assemblyService.deleteAssemblyByAccession(accession);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,17 @@ public Optional<Path> downloadAssemblyReport(ENABrowser enaBrowser, String acces
* @param optional {@link AssemblyEntity} to add ENA sequence names to
* @throws IOException Passes IOException thrown by {@link #getAssemblyByAccession(String)}
*/
public void addENASequenceNamesToAssembly(Optional<AssemblyEntity> optional) throws IOException {
if (optional.isPresent()) {
AssemblyEntity targetAssembly = optional.get();
if (!hasAllEnaSequenceNames(targetAssembly)) {
String insdcAccession = targetAssembly.getInsdcAccession();
Optional<AssemblyEntity> enaAssembly = getAssemblyByAccession(insdcAccession);

if (enaAssembly.isPresent()) {
AssemblyEntity sourceAssembly = enaAssembly.get();
addENASequenceNames(Objects.nonNull(sourceAssembly.getChromosomes()) ?
sourceAssembly.getChromosomes() : Collections.emptyList(),
Objects.nonNull(targetAssembly.getChromosomes()) ?
targetAssembly.getChromosomes() : Collections.emptyList());
}
public void addENASequenceNamesToAssembly(AssemblyEntity targetAssembly) throws IOException {
if (!hasAllEnaSequenceNames(targetAssembly)) {
String insdcAccession = targetAssembly.getInsdcAccession();
Optional<AssemblyEntity> enaAssembly = getAssemblyByAccession(insdcAccession);

if (enaAssembly.isPresent()) {
AssemblyEntity sourceAssembly = enaAssembly.get();
addENASequenceNames(Objects.nonNull(sourceAssembly.getChromosomes()) ?
sourceAssembly.getChromosomes() : Collections.emptyList(),
Objects.nonNull(targetAssembly.getChromosomes()) ?
targetAssembly.getChromosomes() : Collections.emptyList());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,21 @@
import uk.ac.ebi.eva.contigalias.entities.ChromosomeEntity;
import uk.ac.ebi.eva.contigalias.service.ChromosomeService;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

@Component
public class ChecksumSetter {
private final Logger logger = LoggerFactory.getLogger(ChecksumSetter.class);
private final Map<String, CompletableFuture<Void>> runningMD5ChecksumUpdateTasks = new ConcurrentHashMap<>();
private Set<String> scheduledToRunMD5ChecksumUpdateTasks = new HashSet<>();
private int DEFAULT_PAGE_SIZE = 10000;
private ChromosomeService chromosomeService;
private Md5ChecksumRetriever md5ChecksumRetriever;
Expand All @@ -28,27 +38,65 @@ public ChecksumSetter(ChromosomeService chromosomeService, Md5ChecksumRetriever

// @Scheduled(cron = "30 15 10 1 * ? 2023") -- the task to run at 10:15:30 AM on the 1st day of every month in the year 2023.
//Seconds: 30 Minutes: 15 Hours: 10 Day of the month: 1 Month: Every month Day of the week: Every day of the week Year: 2023
@Scheduled(initialDelay = 0, fixedDelay = 24 * 60 * 60 * 1000)
@Scheduled(initialDelay = 24 * 60 * 60 * 1000, fixedDelay = 24 * 60 * 60 * 1000)
public void updateMd5CheckSumForAllAssemblies() {
scheduledToRunMD5ChecksumUpdateTasks = new HashSet<>();
List<String> assemblyList = chromosomeService.getAssembliesWhereChromosomeMd5ChecksumIsNull();
logger.info("List of assemblies to be updated for MD5 Checksum: " + assemblyList);
scheduledToRunMD5ChecksumUpdateTasks.addAll(assemblyList.stream().collect(Collectors.toSet()));

for (String assembly : assemblyList) {
logger.info("Trying to update md5checksum for assembly: " + assembly);
updateMD5ChecksumForAllChromosomesInAssembly(assembly);
CompletableFuture<Void> future = updateMd5CheckSumForAssemblyAsync(assembly);
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Encountered an error when running MD5Checksum update for assembly: " + assembly);
} finally {
scheduledToRunMD5ChecksumUpdateTasks.remove(assembly);
}
}
}

public CompletableFuture<Void> updateMd5CheckSumForAssemblyAsync(String assembly) {
logger.info("Submitted job for updating MD5 Checksum for assembly (asynchronously)");
// Check if the async task for this assembly is already running
CompletableFuture<Void> existingTask = runningMD5ChecksumUpdateTasks.get(assembly);
if (existingTask != null && !existingTask.isDone()) {
logger.info("Async task is still running for assembly: " + assembly);
return existingTask;
}
// Start the async task (removing existing run if present)
runningMD5ChecksumUpdateTasks.remove(assembly);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
updateMD5ChecksumForAllChromosomesInAssembly(assembly);
});
// Store the future in the map for the given assembly
runningMD5ChecksumUpdateTasks.put(assembly, future);

// check the status of task upon completion and remove from running tasks
future.whenComplete((result, exception) -> {
if (exception != null) {
logger.error("Async task (MD5Checksum setter) failed for assembly: " + assembly, exception);
} else {
logger.info("Async task (MD5Checksum setter) completed successfully for assembly: " + assembly);
}
runningMD5ChecksumUpdateTasks.remove(assembly);
});

return future;
}

public void updateMD5ChecksumForAllChromosomesInAssembly(String assembly) {
int pageNumber = 0;
Pageable pageable = PageRequest.of(pageNumber, DEFAULT_PAGE_SIZE);
Slice<ChromosomeEntity> chrSlice = chromosomeService.getChromosomesByAssemblyInsdcAccessionWhereMd5ChecksumIsNull(assembly, pageable);
while (chrSlice.hasContent()) {
logger.info("Trying to update md5checksum for assembly: " + assembly);
Slice<ChromosomeEntity> chrSlice;
Pageable pageable = PageRequest.of(0, DEFAULT_PAGE_SIZE);
do {
chrSlice = chromosomeService.getChromosomesByAssemblyInsdcAccessionWhereMd5ChecksumIsNull(assembly, pageable);
List<ChromosomeEntity> chromosomeEntityList = chrSlice.getContent();
updateMd5ChecksumForChromosome(chromosomeEntityList);
} while (chrSlice.hasNext());

pageNumber++;
pageable = PageRequest.of(pageNumber, DEFAULT_PAGE_SIZE);
chrSlice = chromosomeService.getChromosomesByAssemblyInsdcAccessionWhereMd5ChecksumIsNull(assembly, pageable);
}
logger.info("Updating md5checksum for assembly " + assembly + " completed");
}

public void updateMd5ChecksumForChromosome(List<ChromosomeEntity> chromosomesList) {
Expand All @@ -63,4 +111,11 @@ public void updateMd5ChecksumForChromosome(List<ChromosomeEntity> chromosomesLis

chromosomeService.updateMd5ChecksumForAll(chromosomesList);
}

public Map<String, Set<String>> getMD5ChecksumUpdateTaskStatus() {
Map<String, Set<String>> taskStatus = new HashMap<>();
taskStatus.put("running", runningMD5ChecksumUpdateTasks.keySet());
taskStatus.put("scheduled", scheduledToRunMD5ChecksumUpdateTasks);
return taskStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import uk.ac.ebi.eva.contigalias.exception.AssemblyNotFoundException;
import uk.ac.ebi.eva.contigalias.exception.DuplicateAssemblyException;
import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository;
import uk.ac.ebi.eva.contigalias.scheduler.ChecksumSetter;

import javax.transaction.Transactional;
import java.io.IOException;
Expand All @@ -38,8 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Set;


@Service
Expand All @@ -51,14 +51,17 @@ public class AssemblyService {

private final ENAAssemblyDataSource enaDataSource;

private final ChecksumSetter checksumSetter;

private final Logger logger = LoggerFactory.getLogger(AssemblyService.class);

@Autowired
public AssemblyService(
AssemblyRepository repository, NCBIAssemblyDataSource ncbiDataSource, ENAAssemblyDataSource enaDataSource) {
public AssemblyService(AssemblyRepository repository, NCBIAssemblyDataSource ncbiDataSource,
ENAAssemblyDataSource enaDataSource, ChecksumSetter checksumSetter) {
this.repository = repository;
this.ncbiDataSource = ncbiDataSource;
this.enaDataSource = enaDataSource;
this.checksumSetter = checksumSetter;
}

public Optional<AssemblyEntity> getAssemblyByInsdcAccession(String insdcAccession) {
Expand Down Expand Up @@ -99,15 +102,30 @@ public void fetchAndInsertAssembly(String accession) throws IOException {
if (!fetchAssembly.isPresent()) {
throw new AssemblyNotFoundException(accession);
}
enaDataSource.addENASequenceNamesToAssembly(fetchAssembly);
if (fetchAssembly.get().getChromosomes() != null && fetchAssembly.get().getChromosomes().size() > 0) {
insertAssembly(fetchAssembly.get());
logger.info("Successfully inserted assembly for accession " + accession);
if (fetchAssembly.isPresent()) {
AssemblyEntity assemblyEntity = fetchAssembly.get();
enaDataSource.addENASequenceNamesToAssembly(assemblyEntity);
if (assemblyEntity.getChromosomes() != null && assemblyEntity.getChromosomes().size() > 0) {
insertAssembly(assemblyEntity);
logger.info("Successfully inserted assembly for accession " + accession);
// submit job for retrieving and updating MD5 Checksum for assembly (asynchronously)
checksumSetter.updateMd5CheckSumForAssemblyAsync(accession);
} else {
logger.error("Skipping inserting assembly : No chromosome in assembly " + accession);
}
} else {
logger.error("Skipping inserting assembly : No chromosome in assembly " + accession);
logger.error("Could not get assembly from NCBI");
}
}

public void retrieveAndInsertMd5ChecksumForAssembly(String assembly) {
checksumSetter.updateMd5CheckSumForAssemblyAsync(assembly);
}

public Map<String, Set<String>> getMD5ChecksumUpdateTaskStatus() {
return checksumSetter.getMD5ChecksumUpdateTaskStatus();
}

public Optional<AssemblyEntity> getAssemblyByAccession(String accession) {
Optional<AssemblyEntity> entity = repository.findAssemblyEntityByAccession(accession);
if (entity.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void getAssemblyByAccessionGCAHavingChromosomes() throws IOException {
@Test
public void getENASequenceNamesForAssembly() throws IOException {
Optional<AssemblyEntity> assembly = ncbiDataSource.getAssemblyByAccession(GCA_ACCESSION_HAVING_CHROMOSOMES);
enaDataSource.addENASequenceNamesToAssembly(assembly);
enaDataSource.addENASequenceNamesToAssembly(assembly.get());
assertTrue(assembly.isPresent());
assertTrue(enaDataSource.hasAllEnaSequenceNames(assembly.get()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity;
import uk.ac.ebi.eva.contigalias.entitygenerator.AssemblyGenerator;
import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository;
import uk.ac.ebi.eva.contigalias.scheduler.ChecksumSetter;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -62,15 +64,18 @@ public class AssemblyServiceIntegrationTest {
void setup() throws IOException {
NCBIAssemblyDataSource mockNcbiDataSource = mock(NCBIAssemblyDataSource.class);
ENAAssemblyDataSource mockEnaDataSource = mock(ENAAssemblyDataSource.class);
ChecksumSetter mockChecksumSetter = mock(ChecksumSetter.class);
for (int i = 0; i < entities.length; i++) {
AssemblyEntity generate = AssemblyGenerator.generate(i);
entities[i] = generate;
Mockito.when(mockNcbiDataSource.getAssemblyByAccession(generate.getInsdcAccession()))
.thenReturn(Optional.of(generate));
Mockito.when(mockNcbiDataSource.getAssemblyByAccession(generate.getRefseq()))
.thenReturn(Optional.of(generate));
Mockito.when(mockChecksumSetter.updateMd5CheckSumForAssemblyAsync(generate.getInsdcAccession()))
.thenReturn(new CompletableFuture<>());
}
service = new AssemblyService(repository, mockNcbiDataSource, mockEnaDataSource);
service = new AssemblyService(repository, mockNcbiDataSource, mockEnaDataSource, mockChecksumSetter);
}

@AfterEach
Expand Down

0 comments on commit f0ec9c2

Please sign in to comment.