Skip to content

Commit

Permalink
prevent running multiple instances of job processing
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Feb 9, 2024
1 parent 5ff968b commit bd1b07c
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.hateoas.config.EnableHypermediaSupport;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableScheduling
@EnableAsync
@SpringBootApplication
@EnableRetry
@EnableTransactionManagement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.ac.ebi.eva.contigalias.scheduler;
package uk.ac.ebi.eva.contigalias.conf;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public ResponseEntity<?> fetchAndInsertAssemblyByAccession(
"GCA_000001405.10") String asmAccession) throws IOException {
try {
handler.fetchAndInsertAssemblyByAccession(asmAccession);
// submit jobs for updating ena sequence name and md5 checksum for assembly
handler.retrieveAndInsertENASequenceNameForAssembly(asmAccession);
handler.retrieveAndInsertMd5ChecksumForAssembly(asmAccession);
} catch (IllegalArgumentException e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
}
Expand All @@ -86,6 +89,11 @@ public ResponseEntity<?> fetchAndInsertAssemblyByAccession(
return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
}
Map<String, List<String>> accessionResult = handler.fetchAndInsertAssemblyByAccession(accessions);
// submit jobs for updating ena sequence names and md5 checksum for all successfully inserted assemblies
if (accessionResult.get("SUCCESS").size() > 0) {
handler.retrieveAndInsertENASequenceNameForAssembly(accessionResult.get("SUCCESS"));
handler.retrieveAndInsertMd5ChecksumForAssembly(accessionResult.get("SUCCESS"));
}
return new ResponseEntity<>("Accession Processing Result : " + accessionResult, HttpStatus.MULTI_STATUS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,29 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import uk.ac.ebi.eva.contigalias.conf.ApplicationContextHolder;
import uk.ac.ebi.eva.contigalias.scheduler.Job.Job;
import uk.ac.ebi.eva.contigalias.scheduler.Job.JobSubmittedEvent;
import uk.ac.ebi.eva.contigalias.scheduler.Job.JobType;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

@Service
public class ChromosomeUpdater implements ApplicationListener<JobSubmittedEvent> {
public class ChromosomeUpdater {
private final Logger logger = LoggerFactory.getLogger(ChromosomeUpdater.class);
private final BlockingQueue<Job> jobQueue = new LinkedBlockingQueue<>();
private final ENASequenceNameUpdater enaSequenceNameUpdater;
private final MD5ChecksumUpdater md5ChecksumUpdater;
private AtomicBoolean running = new AtomicBoolean(false);

private Job currentJob;

@Autowired
public ChromosomeUpdater(ENASequenceNameUpdater enaSequenceNameUpdater, MD5ChecksumUpdater md5ChecksumUpdater) {
Expand All @@ -28,40 +35,51 @@ public ChromosomeUpdater(ENASequenceNameUpdater enaSequenceNameUpdater, MD5Check

public void submitJob(Job job) {
jobQueue.add(job);
logger.info("Submitted Job : " + job.getType() + " for assembly " + job.getParameter());
logger.info("Submitted Job : " + job);
JobSubmittedEvent event = new JobSubmittedEvent(this);
ApplicationContextHolder.getApplicationContext().publishEvent(event);
}

public void submitJob(List<Job> jobList) {
jobQueue.addAll(jobList);
jobList.stream().forEach(job -> logger.info("Submitted Job : " + job.getType() + " for assembly " + job.getParameter()));
jobList.stream().forEach(job -> logger.info("Submitted Job : " + job));
JobSubmittedEvent event = new JobSubmittedEvent(this);
ApplicationContextHolder.getApplicationContext().publishEvent(event);
}

@Override
public void onApplicationEvent(JobSubmittedEvent event) {
processJobs();
}

@Async
public void processJobs() {
running.set(true);
currentJob = null;
while (!jobQueue.isEmpty()) {
try {
Job job = jobQueue.take();
if (job.getType() == JobType.ENA_SEQUENCE_NAME_UPDATE) {
enaSequenceNameUpdater.updateENASequenceNameForAssembly(job.getParameter());
} else if (job.getType() == JobType.MD5_CHECKSUM_UPDATE) {
md5ChecksumUpdater.updateMD5ChecksumForAssembly(job.getParameter());
Job currentJob = jobQueue.take();
if (currentJob.getType() == JobType.ENA_SEQUENCE_NAME_UPDATE) {
enaSequenceNameUpdater.updateENASequenceNameForAssembly(currentJob.getParameter());
} else if (currentJob.getType() == JobType.MD5_CHECKSUM_UPDATE) {
md5ChecksumUpdater.updateMD5ChecksumForAssembly(currentJob.getParameter());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Exception while running job : " + currentJob);
}
}
currentJob = null;
running.set(false);
}

public List<String> getScheduledJobStatus() {
return jobQueue.stream().map(j -> j.getType().toString()).collect(Collectors.toList());
List<String> jobList = new ArrayList<>();
if (currentJob != null) {
jobList.add(currentJob.toString());
}
jobList.addAll(jobQueue.stream()
.map(j -> j.getType().toString() + " : " + j.getParameter())
.collect(Collectors.toList()));

return jobList;
}

public AtomicBoolean isRunning() {
return running;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void updateENASequenceNameForAssembly(String assembly) {
}
}

public void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedENAFilePath) throws IOException {
private void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedENAFilePath) throws IOException {
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) {
long chromosomesSavedTillNow = 0l;
List<String> chrLines = new ArrayList<>();
Expand All @@ -73,7 +73,7 @@ public void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedEN
List<ChromosomeEntity> chromosomeEntityList = enaDataSource.getChromosomeEntityList(chrLines);
chromosomeService.updateENASequenceNameForAllChromosomeInAssembly(assembly, chromosomeEntityList);
chromosomesSavedTillNow += chrLines.size();
logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow);
logger.info("Number of chromosomes updated till now : " + chromosomesSavedTillNow);

chrLines = new ArrayList<>();
}
Expand All @@ -83,7 +83,7 @@ public void retrieveAndUpdateENASequenceNames(String assembly, Path downloadedEN
List<ChromosomeEntity> chromosomeEntityList = enaDataSource.getChromosomeEntityList(chrLines);
chromosomeService.updateENASequenceNameForAllChromosomeInAssembly(assembly, chromosomeEntityList);
chromosomesSavedTillNow += chrLines.size();
logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow);
logger.info("Number of chromosomes updated till now : " + chromosomesSavedTillNow);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.ac.ebi.eva.contigalias.scheduler;
package uk.ac.ebi.eva.contigalias.scheduler.Job;

public class Job {
private final JobType type;
Expand All @@ -16,4 +16,9 @@ public JobType getType() {
public String getParameter() {
return parameter;
}

@Override
public String toString() {
return type + " : " + parameter;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.ac.ebi.eva.contigalias.scheduler;
package uk.ac.ebi.eva.contigalias.scheduler.Job;


import org.springframework.context.ApplicationEvent;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package uk.ac.ebi.eva.contigalias.scheduler.Job;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import uk.ac.ebi.eva.contigalias.scheduler.ChromosomeUpdater;

@Component
public class JobSubmittedEventHandler implements ApplicationListener<JobSubmittedEvent> {
private ChromosomeUpdater chromosomeUpdater;

@Autowired
public JobSubmittedEventHandler(ChromosomeUpdater chromosomeUpdater) {
this.chromosomeUpdater = chromosomeUpdater;
}

@Override
public void onApplicationEvent(JobSubmittedEvent event) {
if (!chromosomeUpdater.isRunning().get()) {
chromosomeUpdater.processJobs();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.ac.ebi.eva.contigalias.scheduler;
package uk.ac.ebi.eva.contigalias.scheduler.Job;

public enum JobType {
ENA_SEQUENCE_NAME_UPDATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void updateMD5ChecksumForAssembly(String assembly) {
}
}

public void updateMd5ChecksumForChromosome(String assembly, List<ChromosomeEntity> chromosomesList) {
private void updateMd5ChecksumForChromosome(String assembly, List<ChromosomeEntity> chromosomesList) {
chromosomesList.parallelStream().forEach(chromosome -> {
try {
String md5Checksum = retrieveMd5Checksum(chromosome.getInsdcAccession());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository;
import uk.ac.ebi.eva.contigalias.repo.ChromosomeRepository;
import uk.ac.ebi.eva.contigalias.scheduler.ChromosomeUpdater;
import uk.ac.ebi.eva.contigalias.scheduler.Job;
import uk.ac.ebi.eva.contigalias.scheduler.JobType;
import uk.ac.ebi.eva.contigalias.scheduler.Job.Job;
import uk.ac.ebi.eva.contigalias.scheduler.Job.JobType;

import javax.transaction.Transactional;
import java.io.BufferedReader;
Expand Down Expand Up @@ -117,14 +117,6 @@ public void fetchAndInsertAssembly(String accession) {
logger.info("Start inserting assembly for accession " + accession);
parseFileAndInsertAssembly(accession);
logger.info("Successfully inserted assembly for accession " + accession);

// submit job for updating ENA Sequence name for assembly (asynchronously)
Job enaSequenceNameupdateJob = new Job(JobType.ENA_SEQUENCE_NAME_UPDATE, accession);
chromosomeUpdater.submitJob(enaSequenceNameupdateJob);

// submit job for updating MD5 Checksum for assembly (asynchronously)
Job md5ChecksumupdateJob = new Job(JobType.MD5_CHECKSUM_UPDATE, accession);
chromosomeUpdater.submitJob(md5ChecksumupdateJob);
} catch (Exception e) {
// roll back inserted entries in case of any exception or error
logger.error("Exception while inserting assembly " + accession + " Rolling back changes. \n" + e);
Expand Down

0 comments on commit bd1b07c

Please sign in to comment.