Skip to content

Commit

Permalink
EVA-3540 Create new Step for Shutting down accessioning (#440)
Browse files Browse the repository at this point in the history
* shutdown Accession Generator and release blocks once accessioning is over
  • Loading branch information
nitin-ebi authored Apr 23, 2024
1 parent af7f442 commit 1348569
Show file tree
Hide file tree
Showing 23 changed files with 202 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class BeanNames {

public static final String PROGRESS_LISTENER = "PROGRESS_LISTENER";

public static final String ACCESSIONING_SHUTDOWN_STEP = "ACCESSIONING_SHUTDOWN_STEP";

public static final String CLUSTERING_FROM_VCF_STEP = "CLUSTERING_FROM_VCF_STEP";

public static final String CLUSTERING_CLUSTERED_VARIANTS_FROM_MONGO_STEP = "CLUSTERING_CLUSTERED_VARIANTS_FROM_MONGO_STEP";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTER_UNCLUSTERED_VARIANTS_JOB;
Expand All @@ -42,13 +43,15 @@ public Job clusteringFromMongoJob(
@Qualifier(PROCESS_RS_SPLIT_CANDIDATES_STEP) Step processRSSplitCandidatesStep,
@Qualifier(CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP) Step clearRSMergeAndSplitCandidatesStep,
@Qualifier(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP) Step clusteringNonClusteredVariantsFromMongoStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(CLUSTER_UNCLUSTERED_VARIANTS_JOB)
.incrementer(new RunIdIncrementer())
.start(processRSMergeCandidatesStep)
.next(processRSSplitCandidatesStep)
.next(clearRSMergeAndSplitCandidatesStep)
.next(clusteringNonClusteredVariantsFromMongoStep)
.next(accessioningShutdownStep)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import uk.ac.ebi.eva.accession.clustering.parameters.InputParameters;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.BACK_PROPAGATE_NEW_RS_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.BACK_PROPAGATE_SPLIT_OR_MERGED_RS_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP;
Expand Down Expand Up @@ -79,6 +80,7 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
@Qualifier(PROCESS_RS_SPLIT_CANDIDATES_STEP) Step processRSSplitCandidatesStep,
@Qualifier(CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP) Step clearRSMergeAndSplitCandidatesStep,
@Qualifier(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP) Step clusteringNonClusteredVariantsFromMongoStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
// Back-propagate RS that were newly created in the remapped assembly
@Qualifier(BACK_PROPAGATE_NEW_RS_STEP) Step backPropagateNewRSStep,
// Back-propagate RS in the remapped assembly that were split or merged
Expand All @@ -101,12 +103,14 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
.next(processRSSplitCandidatesStep)
.next(clearRSMergeAndSplitCandidatesStep)
.next(clusteringNonClusteredVariantsFromMongoStep)
.next(accessioningShutdownStep)
.next(backPropagateNewRSStep)
.next(backPropagateSplitMergedRSStep).build())
.on("*").end()
.from(jobExecutionDecider)
.on("FALSE")
.to(clusteringNonClusteredVariantsFromMongoStep)
.next(accessioningShutdownStep)
.on("*").end()
.end().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_STEP;

Expand All @@ -33,10 +34,12 @@ public class StudyClusteringJobConfiguration {

@Bean(STUDY_CLUSTERING_JOB)
public Job studyClusteringJob(@Qualifier(STUDY_CLUSTERING_STEP) Step clusteringStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(STUDY_CLUSTERING_JOB)
.incrementer(new RunIdIncrementer())
.start(clusteringStep)
.next(accessioningShutdownStep)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.steps;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.core.service.nonhuman.ClusteredVariantAccessioningService;
import uk.ac.ebi.eva.accession.core.service.nonhuman.SubmittedVariantAccessioningService;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;

@Configuration
@EnableBatchProcessing
public class AccessioningShutdownStepConfiguration {
@Autowired
private SubmittedVariantAccessioningService submittedVariantAccessioningService;

@Autowired
private ClusteredVariantAccessioningService clusteredVariantAccessioningService;

@Bean(ACCESSIONING_SHUTDOWN_STEP)
public Step accessioningShutDownStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get(ACCESSIONING_SHUTDOWN_STEP)
.tasklet((contribution, chunkContext) -> {
submittedVariantAccessioningService.shutDownAccessionGenerator();
clusteredVariantAccessioningService.shutDownAccessionGenerator();
return null;
})
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.springframework.test.web.client.match.MockRestRequestMatchers.method;
import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo;
import static org.springframework.test.web.client.response.MockRestResponseCreators.withStatus;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.BACK_PROPAGATE_NEW_RS_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.BACK_PROPAGATE_SPLIT_OR_MERGED_RS_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP;
Expand Down Expand Up @@ -160,6 +161,7 @@ public void jobFromMongo() throws Exception {
expectedSteps.add(PROCESS_RS_SPLIT_CANDIDATES_STEP);
expectedSteps.add(CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP);
expectedSteps.add(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP);
expectedSteps.add(ACCESSIONING_SHUTDOWN_STEP);
expectedSteps.add(BACK_PROPAGATE_NEW_RS_STEP);
expectedSteps.add(BACK_PROPAGATE_SPLIT_OR_MERGED_RS_STEP);
assertStepsExecuted(expectedSteps, jobExecution);
Expand All @@ -171,7 +173,9 @@ public void jobFromMongo() throws Exception {
@UsingDataSet(locations = {"/test-data/submittedVariantEntityStudyReader.json"})
public void studyJobFromMongo() throws Exception {
JobExecution jobExecution = jobLauncherTestUtilsStudyFromMongo.launchJob();
List<String> expectedSteps = Collections.singletonList(STUDY_CLUSTERING_STEP);
List<String> expectedSteps = new ArrayList<>();
expectedSteps.add(STUDY_CLUSTERING_STEP);
expectedSteps.add(ACCESSIONING_SHUTDOWN_STEP);
assertStepsExecuted(expectedSteps, jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners.ListenersConfiguration;
import uk.ac.ebi.eva.accession.clustering.configuration.batch.policies.ChunkSizeCompletionPolicyConfiguration;
import uk.ac.ebi.eva.accession.clustering.configuration.batch.processors.ClusteringVariantProcessorConfiguration;
import uk.ac.ebi.eva.accession.clustering.configuration.batch.steps.AccessioningShutdownStepConfiguration;
import uk.ac.ebi.eva.accession.clustering.configuration.batch.steps.ClusteringFromMongoStepConfiguration;
import uk.ac.ebi.eva.accession.clustering.configuration.batch.steps.ClusteringFromVcfStepConfiguration;
import uk.ac.ebi.eva.accession.clustering.runner.ClusteringCommandLineRunner;
Expand Down Expand Up @@ -71,7 +72,8 @@
BackPropagatedRSWriterConfiguration.class,
ListenersConfiguration.class,
ClusteringCommandLineRunner.class,
ChunkSizeCompletionPolicyConfiguration.class})
ChunkSizeCompletionPolicyConfiguration.class,
AccessioningShutdownStepConfiguration.class})
public class BatchTestConfiguration {

public static final String JOB_LAUNCHER_FROM_VCF = "JOB_LAUNCHER_FROM_VCF";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DbsnpMonotonicAccessionGenerator<MODEL> extends MonotonicAccessionG

public DbsnpMonotonicAccessionGenerator(String categoryId, String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService) {
super(categoryId, applicationInstanceId, contiguousIdBlockService, (long[])null);
super(categoryId, applicationInstanceId, contiguousIdBlockService, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,9 @@ public AccessionWrapper<IClusteredVariant, String, Long> getLastInactive(Long ac
}
}

public void shutDownAccessionGenerator(){
accessioningService.shutDownAccessioning();
accessioningServiceDbsnp.shutDownAccessioning();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,21 @@
import org.springframework.batch.core.StepExecution;
import uk.ac.ebi.eva.accession.core.batch.listeners.GenericProgressListener;
import uk.ac.ebi.eva.accession.core.model.eva.SubmittedVariantEntity;
import uk.ac.ebi.eva.accession.core.service.nonhuman.SubmittedVariantAccessioningService;
import uk.ac.ebi.eva.accession.pipeline.parameters.InputParameters;
import uk.ac.ebi.eva.commons.core.models.pipeline.Variant;
import uk.ac.ebi.eva.metrics.metric.MetricCompute;

public class SubsnpAccessionsStepListener extends GenericProgressListener<Variant, SubmittedVariantEntity> {
private final MetricCompute metricCompute;
private final SubmittedVariantAccessioningService submittedVariantAccessioningService;

public SubsnpAccessionsStepListener(InputParameters inputParameters, MetricCompute metricCompute,
SubmittedVariantAccessioningService submittedVariantAccessioningService) {
public SubsnpAccessionsStepListener(InputParameters inputParameters, MetricCompute metricCompute) {
super(inputParameters.getChunkSize());
this.metricCompute = metricCompute;
this.submittedVariantAccessioningService = submittedVariantAccessioningService;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
ExitStatus status = super.afterStep(stepExecution);
submittedVariantAccessioningService.shutDownAccessionGenerator();
metricCompute.saveMetricsCountsInDB();
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public class BeanNames {
public static final String CREATE_SUBSNP_ACCESSION_JOB = "CREATE_SUBSNP_ACCESSION_JOB";

public static final String SUBSNP_ACCESSION_STEP_LISTENER = "SUBSNP_ACCESSION_STEP_LISTENER";

public static final String ACCESSIONING_SHUTDOWN_STEP = "ACCESSIONING_SHUTDOWN_STEP";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.BUILD_REPORT_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.CHECK_SUBSNP_ACCESSION_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.CREATE_SUBSNP_ACCESSION_JOB;
Expand All @@ -47,11 +48,16 @@ public class CreateSubsnpAccessionsJobConfiguration {
@Qualifier(BUILD_REPORT_STEP)
private Step buildReportStep;

@Autowired
@Qualifier(ACCESSIONING_SHUTDOWN_STEP)
private Step accessioningShutdownStep;

@Bean(CREATE_SUBSNP_ACCESSION_JOB)
public Job createSubsnpAccessionJob(JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(CREATE_SUBSNP_ACCESSION_JOB)
.incrementer(new RunIdIncrementer())
.start(createSubsnpAccessionStep)
.next(accessioningShutdownStep)
.next(buildReportStep)
.next(checkSubsnpAccessionStep)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.web.client.RestTemplate;
import uk.ac.ebi.eva.accession.core.service.nonhuman.SubmittedVariantAccessioningService;
import uk.ac.ebi.eva.accession.pipeline.batch.listeners.SubsnpAccessionsStepListener;
import uk.ac.ebi.eva.accession.pipeline.metric.AccessioningMetricCompute;
import uk.ac.ebi.eva.accession.pipeline.parameters.InputParameters;
Expand All @@ -19,9 +18,8 @@
@Import({MetricConfiguration.class})
public class ListenersConfiguration {
@Bean(SUBSNP_ACCESSION_STEP_LISTENER)
public SubsnpAccessionsStepListener clusteringProgressListener(InputParameters parameters, MetricCompute metricCompute,
SubmittedVariantAccessioningService submittedVariantAccessioningService) {
return new SubsnpAccessionsStepListener(parameters, metricCompute, submittedVariantAccessioningService);
public SubsnpAccessionsStepListener clusteringProgressListener(InputParameters parameters, MetricCompute metricCompute) {
return new SubsnpAccessionsStepListener(parameters, metricCompute);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package uk.ac.ebi.eva.accession.pipeline.configuration.batch.steps;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.core.service.nonhuman.SubmittedVariantAccessioningService;

import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;

@Configuration
@EnableBatchProcessing
public class AccessioningShutdownStepConfiguration {
@Autowired
private SubmittedVariantAccessioningService submittedVariantAccessioningService;

@Bean(ACCESSIONING_SHUTDOWN_STEP)
public Step accessioningShutDownStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get(ACCESSIONING_SHUTDOWN_STEP)
.tasklet((contribution, chunkContext) -> {
submittedVariantAccessioningService.shutDownAccessionGenerator();
return null;
})
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.TreeSet;

import static org.junit.Assert.assertEquals;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.BUILD_REPORT_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.CHECK_SUBSNP_ACCESSION_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.CREATE_SUBSNP_ACCESSION_STEP;
Expand Down Expand Up @@ -111,9 +112,10 @@ public void executeJob() throws Exception {
}

private void assertStepNames(Collection<StepExecution> stepExecutions) {
assertEquals(3, stepExecutions.size());
assertEquals(4, stepExecutions.size());
Iterator<StepExecution> iterator = stepExecutions.iterator();
assertEquals(CREATE_SUBSNP_ACCESSION_STEP, iterator.next().getStepName());
assertEquals(ACCESSIONING_SHUTDOWN_STEP, iterator.next().getStepName());
assertEquals(BUILD_REPORT_STEP, iterator.next().getStepName());
assertEquals(CHECK_SUBSNP_ACCESSION_STEP, iterator.next().getStepName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.BUILD_REPORT_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.CHECK_SUBSNP_ACCESSION_STEP;
import static uk.ac.ebi.eva.accession.pipeline.configuration.BeanNames.CREATE_SUBSNP_ACCESSION_STEP;
Expand Down Expand Up @@ -128,9 +129,10 @@ private static int getVariantLineNumberByPosition(File output, String position)
}

private void assertStepNames(Collection<StepExecution> stepExecutions) {
assertEquals(3, stepExecutions.size());
assertEquals(4, stepExecutions.size());
Iterator<StepExecution> iterator = stepExecutions.iterator();
assertEquals(CREATE_SUBSNP_ACCESSION_STEP, iterator.next().getStepName());
assertEquals(ACCESSIONING_SHUTDOWN_STEP, iterator.next().getStepName());
assertEquals(BUILD_REPORT_STEP, iterator.next().getStepName());
assertEquals(CHECK_SUBSNP_ACCESSION_STEP, iterator.next().getStepName());
}
Expand Down
Loading

0 comments on commit 1348569

Please sign in to comment.