diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java b/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java index 1312301c..1042e548 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java @@ -21,6 +21,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -31,6 +32,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; @RequestMapping("/v1/admin") @RestController @@ -87,6 +90,30 @@ public ResponseEntity fetchAndInsertAssemblyByAccession( return new ResponseEntity<>("Accession Processing Result : " + accessionResult, HttpStatus.MULTI_STATUS); } + @ApiOperation(value = "Given an assembly accession, retrieve MD5 checksum for all chromosomes belonging to assembly and update") + @PutMapping(value = "assemblies/{accession}/md5checksum") + public ResponseEntity retrieveAndInsertMd5ChecksumForAssembly(@PathVariable(name = "accession") + @ApiParam(value = "INSDC or RefSeq assembly accession. Eg: " + + "GCA_000001405.10") String asmAccession) { + handler.retrieveAndInsertMd5ChecksumForAssembly(asmAccession); + return ResponseEntity.ok("A task has been submitted for updating md5checksum for all chromosomes " + + "in assembly " + asmAccession + ". Depending upon the number of chromosomes present in assembly, " + + "this might take some time to complete"); + } + + @ApiOperation(value = "Retrieve list of assemblies for which MD5 Checksum updates are running/going-to-run ") + @GetMapping(value = "assemblies/md5checksum/status") + public ResponseEntity getMD5ChecksumUpdateTaskStatus() { + Map> md5ChecksumUpdateTasks = handler.getMD5ChecksumUpdateTaskStatus(); + Set runningTasks = md5ChecksumUpdateTasks.get("running"); + Set scheduledTasks = md5ChecksumUpdateTasks.get("scheduled"); + String runningTaskRes = runningTasks == null || runningTasks.isEmpty() ? "No running MD5 checksum update tasks" : + runningTasks.stream().collect(Collectors.joining(",")); + String scheduledTaskRes = scheduledTasks == null || scheduledTasks.isEmpty() ? "No scheduled MD5 checksum update tasks" : + scheduledTasks.stream().collect(Collectors.joining(",")); + return ResponseEntity.ok("running: " + runningTaskRes + "\nscheduled: " + scheduledTaskRes); + } + // This endpoint can be enabled in the future when checksums for assemblies are added to the project. // @ApiOperation(value = "Add MD5 and TRUNC512 checksums to an assembly by accession.", // notes = "Given an INSDC or RefSeq accession along with a MD5 or a TRUNC512 checksum, this endpoint will diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminHandler.java b/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminHandler.java index 9cc50eb9..08586672 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminHandler.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminHandler.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; @Service public class AdminHandler { @@ -54,6 +55,14 @@ public Map> fetchAndInsertAssemblyByAccession(List return assemblyService.fetchAndInsertAssembly(accessions); } + public void retrieveAndInsertMd5ChecksumForAssembly(String accession) { + assemblyService.retrieveAndInsertMd5ChecksumForAssembly(accession); + } + + public Map> getMD5ChecksumUpdateTaskStatus() { + return assemblyService.getMD5ChecksumUpdateTaskStatus(); + } + public void deleteAssemblyByAccession(String accession) { assemblyService.deleteAssemblyByAccession(accession); } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java b/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java index d739dc7f..37a5c791 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java @@ -123,20 +123,17 @@ public Optional downloadAssemblyReport(ENABrowser enaBrowser, String acces * @param optional {@link AssemblyEntity} to add ENA sequence names to * @throws IOException Passes IOException thrown by {@link #getAssemblyByAccession(String)} */ - public void addENASequenceNamesToAssembly(Optional optional) throws IOException { - if (optional.isPresent()) { - AssemblyEntity targetAssembly = optional.get(); - if (!hasAllEnaSequenceNames(targetAssembly)) { - String insdcAccession = targetAssembly.getInsdcAccession(); - Optional enaAssembly = getAssemblyByAccession(insdcAccession); - - if (enaAssembly.isPresent()) { - AssemblyEntity sourceAssembly = enaAssembly.get(); - addENASequenceNames(Objects.nonNull(sourceAssembly.getChromosomes()) ? - sourceAssembly.getChromosomes() : Collections.emptyList(), - Objects.nonNull(targetAssembly.getChromosomes()) ? - targetAssembly.getChromosomes() : Collections.emptyList()); - } + public void addENASequenceNamesToAssembly(AssemblyEntity targetAssembly) throws IOException { + if (!hasAllEnaSequenceNames(targetAssembly)) { + String insdcAccession = targetAssembly.getInsdcAccession(); + Optional enaAssembly = getAssemblyByAccession(insdcAccession); + + if (enaAssembly.isPresent()) { + AssemblyEntity sourceAssembly = enaAssembly.get(); + addENASequenceNames(Objects.nonNull(sourceAssembly.getChromosomes()) ? + sourceAssembly.getChromosomes() : Collections.emptyList(), + Objects.nonNull(targetAssembly.getChromosomes()) ? + targetAssembly.getChromosomes() : Collections.emptyList()); } } } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChecksumSetter.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChecksumSetter.java index fe7c8cd2..d0b4cdf5 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChecksumSetter.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChecksumSetter.java @@ -11,11 +11,21 @@ import uk.ac.ebi.eva.contigalias.entities.ChromosomeEntity; import uk.ac.ebi.eva.contigalias.service.ChromosomeService; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; @Component public class ChecksumSetter { private final Logger logger = LoggerFactory.getLogger(ChecksumSetter.class); + private final Map> runningMD5ChecksumUpdateTasks = new ConcurrentHashMap<>(); + private Set scheduledToRunMD5ChecksumUpdateTasks = new HashSet<>(); private int DEFAULT_PAGE_SIZE = 10000; private ChromosomeService chromosomeService; private Md5ChecksumRetriever md5ChecksumRetriever; @@ -28,27 +38,65 @@ public ChecksumSetter(ChromosomeService chromosomeService, Md5ChecksumRetriever // @Scheduled(cron = "30 15 10 1 * ? 2023") -- the task to run at 10:15:30 AM on the 1st day of every month in the year 2023. //Seconds: 30 Minutes: 15 Hours: 10 Day of the month: 1 Month: Every month Day of the week: Every day of the week Year: 2023 - @Scheduled(initialDelay = 0, fixedDelay = 24 * 60 * 60 * 1000) + @Scheduled(initialDelay = 24 * 60 * 60 * 1000, fixedDelay = 24 * 60 * 60 * 1000) public void updateMd5CheckSumForAllAssemblies() { + scheduledToRunMD5ChecksumUpdateTasks = new HashSet<>(); List assemblyList = chromosomeService.getAssembliesWhereChromosomeMd5ChecksumIsNull(); + logger.info("List of assemblies to be updated for MD5 Checksum: " + assemblyList); + scheduledToRunMD5ChecksumUpdateTasks.addAll(assemblyList.stream().collect(Collectors.toSet())); + for (String assembly : assemblyList) { - logger.info("Trying to update md5checksum for assembly: " + assembly); - updateMD5ChecksumForAllChromosomesInAssembly(assembly); + CompletableFuture future = updateMd5CheckSumForAssemblyAsync(assembly); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("Encountered an error when running MD5Checksum update for assembly: " + assembly); + } finally { + scheduledToRunMD5ChecksumUpdateTasks.remove(assembly); + } } } + public CompletableFuture updateMd5CheckSumForAssemblyAsync(String assembly) { + logger.info("Submitted job for updating MD5 Checksum for assembly (asynchronously)"); + // Check if the async task for this assembly is already running + CompletableFuture existingTask = runningMD5ChecksumUpdateTasks.get(assembly); + if (existingTask != null && !existingTask.isDone()) { + logger.info("Async task is still running for assembly: " + assembly); + return existingTask; + } + // Start the async task (removing existing run if present) + runningMD5ChecksumUpdateTasks.remove(assembly); + CompletableFuture future = CompletableFuture.runAsync(() -> { + updateMD5ChecksumForAllChromosomesInAssembly(assembly); + }); + // Store the future in the map for the given assembly + runningMD5ChecksumUpdateTasks.put(assembly, future); + + // check the status of task upon completion and remove from running tasks + future.whenComplete((result, exception) -> { + if (exception != null) { + logger.error("Async task (MD5Checksum setter) failed for assembly: " + assembly, exception); + } else { + logger.info("Async task (MD5Checksum setter) completed successfully for assembly: " + assembly); + } + runningMD5ChecksumUpdateTasks.remove(assembly); + }); + + return future; + } + public void updateMD5ChecksumForAllChromosomesInAssembly(String assembly) { - int pageNumber = 0; - Pageable pageable = PageRequest.of(pageNumber, DEFAULT_PAGE_SIZE); - Slice chrSlice = chromosomeService.getChromosomesByAssemblyInsdcAccessionWhereMd5ChecksumIsNull(assembly, pageable); - while (chrSlice.hasContent()) { + logger.info("Trying to update md5checksum for assembly: " + assembly); + Slice chrSlice; + Pageable pageable = PageRequest.of(0, DEFAULT_PAGE_SIZE); + do { + chrSlice = chromosomeService.getChromosomesByAssemblyInsdcAccessionWhereMd5ChecksumIsNull(assembly, pageable); List chromosomeEntityList = chrSlice.getContent(); updateMd5ChecksumForChromosome(chromosomeEntityList); + } while (chrSlice.hasNext()); - pageNumber++; - pageable = PageRequest.of(pageNumber, DEFAULT_PAGE_SIZE); - chrSlice = chromosomeService.getChromosomesByAssemblyInsdcAccessionWhereMd5ChecksumIsNull(assembly, pageable); - } + logger.info("Updating md5checksum for assembly " + assembly + " completed"); } public void updateMd5ChecksumForChromosome(List chromosomesList) { @@ -63,4 +111,11 @@ public void updateMd5ChecksumForChromosome(List chromosomesLis chromosomeService.updateMd5ChecksumForAll(chromosomesList); } + + public Map> getMD5ChecksumUpdateTaskStatus() { + Map> taskStatus = new HashMap<>(); + taskStatus.put("running", runningMD5ChecksumUpdateTasks.keySet()); + taskStatus.put("scheduled", scheduledToRunMD5ChecksumUpdateTasks); + return taskStatus; + } } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java b/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java index 58a68d34..e97adc2f 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java @@ -29,6 +29,7 @@ import uk.ac.ebi.eva.contigalias.exception.AssemblyNotFoundException; import uk.ac.ebi.eva.contigalias.exception.DuplicateAssemblyException; import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository; +import uk.ac.ebi.eva.contigalias.scheduler.ChecksumSetter; import javax.transaction.Transactional; import java.io.IOException; @@ -38,8 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.Set; @Service @@ -51,14 +51,17 @@ public class AssemblyService { private final ENAAssemblyDataSource enaDataSource; + private final ChecksumSetter checksumSetter; + private final Logger logger = LoggerFactory.getLogger(AssemblyService.class); @Autowired - public AssemblyService( - AssemblyRepository repository, NCBIAssemblyDataSource ncbiDataSource, ENAAssemblyDataSource enaDataSource) { + public AssemblyService(AssemblyRepository repository, NCBIAssemblyDataSource ncbiDataSource, + ENAAssemblyDataSource enaDataSource, ChecksumSetter checksumSetter) { this.repository = repository; this.ncbiDataSource = ncbiDataSource; this.enaDataSource = enaDataSource; + this.checksumSetter = checksumSetter; } public Optional getAssemblyByInsdcAccession(String insdcAccession) { @@ -99,15 +102,30 @@ public void fetchAndInsertAssembly(String accession) throws IOException { if (!fetchAssembly.isPresent()) { throw new AssemblyNotFoundException(accession); } - enaDataSource.addENASequenceNamesToAssembly(fetchAssembly); - if (fetchAssembly.get().getChromosomes() != null && fetchAssembly.get().getChromosomes().size() > 0) { - insertAssembly(fetchAssembly.get()); - logger.info("Successfully inserted assembly for accession " + accession); + if (fetchAssembly.isPresent()) { + AssemblyEntity assemblyEntity = fetchAssembly.get(); + enaDataSource.addENASequenceNamesToAssembly(assemblyEntity); + if (assemblyEntity.getChromosomes() != null && assemblyEntity.getChromosomes().size() > 0) { + insertAssembly(assemblyEntity); + logger.info("Successfully inserted assembly for accession " + accession); + // submit job for retrieving and updating MD5 Checksum for assembly (asynchronously) + checksumSetter.updateMd5CheckSumForAssemblyAsync(accession); + } else { + logger.error("Skipping inserting assembly : No chromosome in assembly " + accession); + } } else { - logger.error("Skipping inserting assembly : No chromosome in assembly " + accession); + logger.error("Could not get assembly from NCBI"); } } + public void retrieveAndInsertMd5ChecksumForAssembly(String assembly) { + checksumSetter.updateMd5CheckSumForAssemblyAsync(assembly); + } + + public Map> getMD5ChecksumUpdateTaskStatus() { + return checksumSetter.getMD5ChecksumUpdateTaskStatus(); + } + public Optional getAssemblyByAccession(String accession) { Optional entity = repository.findAssemblyEntityByAccession(accession); if (entity.isPresent()) { diff --git a/src/test/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSourceTest.java b/src/test/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSourceTest.java index 89de6d66..11f7bd5c 100644 --- a/src/test/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSourceTest.java +++ b/src/test/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSourceTest.java @@ -56,7 +56,7 @@ public void getAssemblyByAccessionGCAHavingChromosomes() throws IOException { @Test public void getENASequenceNamesForAssembly() throws IOException { Optional assembly = ncbiDataSource.getAssemblyByAccession(GCA_ACCESSION_HAVING_CHROMOSOMES); - enaDataSource.addENASequenceNamesToAssembly(assembly); + enaDataSource.addENASequenceNamesToAssembly(assembly.get()); assertTrue(assembly.isPresent()); assertTrue(enaDataSource.hasAllEnaSequenceNames(assembly.get())); } diff --git a/src/test/java/uk/ac/ebi/eva/contigalias/service/AssemblyServiceIntegrationTest.java b/src/test/java/uk/ac/ebi/eva/contigalias/service/AssemblyServiceIntegrationTest.java index 2d0c8542..d713e2e9 100644 --- a/src/test/java/uk/ac/ebi/eva/contigalias/service/AssemblyServiceIntegrationTest.java +++ b/src/test/java/uk/ac/ebi/eva/contigalias/service/AssemblyServiceIntegrationTest.java @@ -31,10 +31,12 @@ import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity; import uk.ac.ebi.eva.contigalias.entitygenerator.AssemblyGenerator; import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository; +import uk.ac.ebi.eva.contigalias.scheduler.ChecksumSetter; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -62,6 +64,7 @@ public class AssemblyServiceIntegrationTest { void setup() throws IOException { NCBIAssemblyDataSource mockNcbiDataSource = mock(NCBIAssemblyDataSource.class); ENAAssemblyDataSource mockEnaDataSource = mock(ENAAssemblyDataSource.class); + ChecksumSetter mockChecksumSetter = mock(ChecksumSetter.class); for (int i = 0; i < entities.length; i++) { AssemblyEntity generate = AssemblyGenerator.generate(i); entities[i] = generate; @@ -69,8 +72,10 @@ void setup() throws IOException { .thenReturn(Optional.of(generate)); Mockito.when(mockNcbiDataSource.getAssemblyByAccession(generate.getRefseq())) .thenReturn(Optional.of(generate)); + Mockito.when(mockChecksumSetter.updateMd5CheckSumForAssemblyAsync(generate.getInsdcAccession())) + .thenReturn(new CompletableFuture<>()); } - service = new AssemblyService(repository, mockNcbiDataSource, mockEnaDataSource); + service = new AssemblyService(repository, mockNcbiDataSource, mockEnaDataSource, mockChecksumSetter); } @AfterEach