diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/ContigAliasApplication.java b/src/main/java/uk/ac/ebi/eva/contigalias/ContigAliasApplication.java index bd1f1109..9c4088c7 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/ContigAliasApplication.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/ContigAliasApplication.java @@ -22,10 +22,12 @@ import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; import org.springframework.hateoas.config.EnableHypermediaSupport; import org.springframework.retry.annotation.EnableRetry; +import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; @EnableScheduling +@EnableAsync @SpringBootApplication @EnableRetry @EnableTransactionManagement diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ApplicationContextHolder.java b/src/main/java/uk/ac/ebi/eva/contigalias/conf/ApplicationContextHolder.java similarity index 93% rename from src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ApplicationContextHolder.java rename to src/main/java/uk/ac/ebi/eva/contigalias/conf/ApplicationContextHolder.java index e9080ab9..ffd62ea6 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ApplicationContextHolder.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/conf/ApplicationContextHolder.java @@ -1,4 +1,4 @@ -package uk.ac.ebi.eva.contigalias.scheduler; +package uk.ac.ebi.eva.contigalias.conf; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java b/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java index 422fcbff..6226f176 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/controller/admin/AdminController.java @@ -61,6 +61,9 @@ public ResponseEntity fetchAndInsertAssemblyByAccession( "GCA_000001405.10") String asmAccession) throws IOException { try { handler.fetchAndInsertAssemblyByAccession(asmAccession); + // submit jobs for updating ena sequence name and md5 checksum for assembly + handler.retrieveAndInsertENASequenceNameForAssembly(asmAccession); + handler.retrieveAndInsertMd5ChecksumForAssembly(asmAccession); } catch (IllegalArgumentException e) { return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST); } @@ -86,6 +89,11 @@ public ResponseEntity fetchAndInsertAssemblyByAccession( return new ResponseEntity<>(HttpStatus.BAD_REQUEST); } Map> accessionResult = handler.fetchAndInsertAssemblyByAccession(accessions); + // submit jobs for updating ena sequence names and md5 checksum for all successfully inserted assemblies + if (accessionResult.get("SUCCESS").size() > 0) { + handler.retrieveAndInsertENASequenceNameForAssembly(accessionResult.get("SUCCESS")); + handler.retrieveAndInsertMd5ChecksumForAssembly(accessionResult.get("SUCCESS")); + } return new ResponseEntity<>("Accession Processing Result : " + accessionResult, HttpStatus.MULTI_STATUS); } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChromosomeUpdater.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChromosomeUpdater.java index 32376a63..016725b8 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChromosomeUpdater.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ChromosomeUpdater.java @@ -3,22 +3,29 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import uk.ac.ebi.eva.contigalias.conf.ApplicationContextHolder; +import uk.ac.ebi.eva.contigalias.scheduler.Job.Job; +import uk.ac.ebi.eva.contigalias.scheduler.Job.JobSubmittedEvent; +import uk.ac.ebi.eva.contigalias.scheduler.Job.JobType; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @Service -public class ChromosomeUpdater implements ApplicationListener { +public class ChromosomeUpdater { private final Logger logger = LoggerFactory.getLogger(ChromosomeUpdater.class); private final BlockingQueue jobQueue = new LinkedBlockingQueue<>(); private final ENASequenceNameUpdater enaSequenceNameUpdater; private final MD5ChecksumUpdater md5ChecksumUpdater; + private AtomicBoolean running = new AtomicBoolean(false); + private Job currentJob; @Autowired public ChromosomeUpdater(ENASequenceNameUpdater enaSequenceNameUpdater, MD5ChecksumUpdater md5ChecksumUpdater) { @@ -28,40 +35,51 @@ public ChromosomeUpdater(ENASequenceNameUpdater enaSequenceNameUpdater, MD5Check public void submitJob(Job job) { jobQueue.add(job); - logger.info("Submitted Job : " + job.getType() + " for assembly " + job.getParameter()); + logger.info("Submitted Job : " + job); JobSubmittedEvent event = new JobSubmittedEvent(this); ApplicationContextHolder.getApplicationContext().publishEvent(event); } public void submitJob(List jobList) { jobQueue.addAll(jobList); - jobList.stream().forEach(job -> logger.info("Submitted Job : " + job.getType() + " for assembly " + job.getParameter())); + jobList.stream().forEach(job -> logger.info("Submitted Job : " + job)); JobSubmittedEvent event = new JobSubmittedEvent(this); ApplicationContextHolder.getApplicationContext().publishEvent(event); } - @Override - public void onApplicationEvent(JobSubmittedEvent event) { - processJobs(); - } - @Async public void processJobs() { + running.set(true); + currentJob = null; while (!jobQueue.isEmpty()) { try { - Job job = jobQueue.take(); - if (job.getType() == JobType.ENA_SEQUENCE_NAME_UPDATE) { - enaSequenceNameUpdater.updateENASequenceNameForAssembly(job.getParameter()); - } else if (job.getType() == JobType.MD5_CHECKSUM_UPDATE) { - md5ChecksumUpdater.updateMD5ChecksumForAssembly(job.getParameter()); + Job currentJob = jobQueue.take(); + if (currentJob.getType() == JobType.ENA_SEQUENCE_NAME_UPDATE) { + enaSequenceNameUpdater.updateENASequenceNameForAssembly(currentJob.getParameter()); + } else if (currentJob.getType() == JobType.MD5_CHECKSUM_UPDATE) { + md5ChecksumUpdater.updateMD5ChecksumForAssembly(currentJob.getParameter()); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.error("Exception while running job : " + currentJob); } } + currentJob = null; + running.set(false); } public List getScheduledJobStatus() { - return jobQueue.stream().map(j -> j.getType().toString()).collect(Collectors.toList()); + List jobList = new ArrayList<>(); + if (currentJob != null) { + jobList.add(currentJob.toString()); + } + jobList.addAll(jobQueue.stream() + .map(j -> j.getType().toString() + " : " + j.getParameter()) + .collect(Collectors.toList())); + + return jobList; + } + + public AtomicBoolean isRunning() { + return running; } } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ENASequenceNameUpdater.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ENASequenceNameUpdater.java index 7b14c7e3..c7be1610 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ENASequenceNameUpdater.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/ENASequenceNameUpdater.java @@ -59,7 +59,7 @@ public void updateENASequenceNameForAssembly(String assembly) { } } - public void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedENAFilePath) throws IOException { + private void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedENAFilePath) throws IOException { try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) { long chromosomesSavedTillNow = 0l; List chrLines = new ArrayList<>(); @@ -73,7 +73,7 @@ public void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedEN List chromosomeEntityList = enaDataSource.getChromosomeEntityList(chrLines); chromosomeService.updateENASequenceNameForAllChromosomeInAssembly(assembly, chromosomeEntityList); chromosomesSavedTillNow += chrLines.size(); - logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow); + logger.info("Number of chromosomes updated till now : " + chromosomesSavedTillNow); chrLines = new ArrayList<>(); } @@ -83,7 +83,7 @@ public void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedEN List chromosomeEntityList = enaDataSource.getChromosomeEntityList(chrLines); chromosomeService.updateENASequenceNameForAllChromosomeInAssembly(assembly, chromosomeEntityList); chromosomesSavedTillNow += chrLines.size(); - logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow); + logger.info("Number of chromosomes updated till now : " + chromosomesSavedTillNow); } } } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/Job.java similarity index 70% rename from src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job.java rename to src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/Job.java index a5c39ec4..77bd78b6 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/Job.java @@ -1,4 +1,4 @@ -package uk.ac.ebi.eva.contigalias.scheduler; +package uk.ac.ebi.eva.contigalias.scheduler.Job; public class Job { private final JobType type; @@ -16,4 +16,9 @@ public JobType getType() { public String getParameter() { return parameter; } + + @Override + public String toString() { + return type + " : " + parameter; + } } \ No newline at end of file diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/JobSubmittedEvent.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobSubmittedEvent.java similarity index 79% rename from src/main/java/uk/ac/ebi/eva/contigalias/scheduler/JobSubmittedEvent.java rename to src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobSubmittedEvent.java index df9eb94a..1a86340b 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/JobSubmittedEvent.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobSubmittedEvent.java @@ -1,4 +1,4 @@ -package uk.ac.ebi.eva.contigalias.scheduler; +package uk.ac.ebi.eva.contigalias.scheduler.Job; import org.springframework.context.ApplicationEvent; diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobSubmittedEventHandler.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobSubmittedEventHandler.java new file mode 100644 index 00000000..2b1dfb89 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobSubmittedEventHandler.java @@ -0,0 +1,23 @@ +package uk.ac.ebi.eva.contigalias.scheduler.Job; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; +import uk.ac.ebi.eva.contigalias.scheduler.ChromosomeUpdater; + +@Component +public class JobSubmittedEventHandler implements ApplicationListener { + private ChromosomeUpdater chromosomeUpdater; + + @Autowired + public JobSubmittedEventHandler(ChromosomeUpdater chromosomeUpdater) { + this.chromosomeUpdater = chromosomeUpdater; + } + + @Override + public void onApplicationEvent(JobSubmittedEvent event) { + if (!chromosomeUpdater.isRunning().get()) { + chromosomeUpdater.processJobs(); + } + } +} diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/JobType.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobType.java similarity index 61% rename from src/main/java/uk/ac/ebi/eva/contigalias/scheduler/JobType.java rename to src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobType.java index 204e423e..6bf8f58b 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/JobType.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/Job/JobType.java @@ -1,4 +1,4 @@ -package uk.ac.ebi.eva.contigalias.scheduler; +package uk.ac.ebi.eva.contigalias.scheduler.Job; public enum JobType { ENA_SEQUENCE_NAME_UPDATE, diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/MD5ChecksumUpdater.java b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/MD5ChecksumUpdater.java index 978d7580..a586dc80 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/MD5ChecksumUpdater.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/scheduler/MD5ChecksumUpdater.java @@ -66,7 +66,7 @@ public void updateMD5ChecksumForAssembly(String assembly) { } } - public void updateMd5ChecksumForChromosome(String assembly, List chromosomesList) { + private void updateMd5ChecksumForChromosome(String assembly, List chromosomesList) { chromosomesList.parallelStream().forEach(chromosome -> { try { String md5Checksum = retrieveMd5Checksum(chromosome.getInsdcAccession()); diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java b/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java index 65c06fcd..430e7a73 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java @@ -32,8 +32,8 @@ import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository; import uk.ac.ebi.eva.contigalias.repo.ChromosomeRepository; import uk.ac.ebi.eva.contigalias.scheduler.ChromosomeUpdater; -import uk.ac.ebi.eva.contigalias.scheduler.Job; -import uk.ac.ebi.eva.contigalias.scheduler.JobType; +import uk.ac.ebi.eva.contigalias.scheduler.Job.Job; +import uk.ac.ebi.eva.contigalias.scheduler.Job.JobType; import javax.transaction.Transactional; import java.io.BufferedReader; @@ -117,14 +117,6 @@ public void fetchAndInsertAssembly(String accession) { logger.info("Start inserting assembly for accession " + accession); parseFileAndInsertAssembly(accession); logger.info("Successfully inserted assembly for accession " + accession); - - // submit job for updating ENA Sequence name for assembly (asynchronously) - Job enaSequenceNameupdateJob = new Job(JobType.ENA_SEQUENCE_NAME_UPDATE, accession); - chromosomeUpdater.submitJob(enaSequenceNameupdateJob); - - // submit job for updating MD5 Checksum for assembly (asynchronously) - Job md5ChecksumupdateJob = new Job(JobType.MD5_CHECKSUM_UPDATE, accession); - chromosomeUpdater.submitJob(md5ChecksumupdateJob); } catch (Exception e) { // roll back inserted entries in case of any exception or error logger.error("Exception while inserting assembly " + accession + " Rolling back changes. \n" + e);