Skip to content

Commit

Permalink
pass application instance id
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Apr 29, 2024
1 parent 1348569 commit 509b2ae
Show file tree
Hide file tree
Showing 22 changed files with 84 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.mongodb.MongoBulkWriteException;
import htsjdk.samtools.util.StringUtil;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.data.mongodb.core.BulkOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
Expand Down Expand Up @@ -116,14 +117,17 @@ public class ClusteringWriter implements ItemWriter<SubmittedVariantEntity> {

private Map<Long, SubmittedVariantOperationEntity> rsSplitCandidateSVOE;

private JobExecution jobExecution;

public ClusteringWriter(MongoTemplate mongoTemplate,
String assembly,
ClusteredVariantAccessioningService clusteredVariantAccessioningService,
Long accessioningMonotonicInitSs,
Long accessioningMonotonicInitRs,
MetricCompute metricCompute,
boolean processClusteredRemappedVariants,
File rsReportFile) throws IOException {
File rsReportFile,
JobExecution jobExecution) throws IOException {
this.mongoTemplate = mongoTemplate;
this.assembly = assembly;
this.clusteredService = clusteredVariantAccessioningService;
Expand All @@ -135,6 +139,7 @@ public ClusteringWriter(MongoTemplate mongoTemplate,
this.metricCompute = metricCompute;
this.processClusteredRemappedVariants = processClusteredRemappedVariants;
this.rsReportFile = rsReportFile;
this.jobExecution = jobExecution;
getSVOEWithMergeAndRSSplitCandidates();
}

Expand Down Expand Up @@ -194,7 +199,7 @@ private void getOrCreateClusteredVariantAccessions(List<? extends SubmittedVaria
.collect(Collectors.toList());
if (!clusteredVariants.isEmpty()) {
List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> accessionWrappers =
clusteredService.getOrCreate(clusteredVariants);
clusteredService.getOrCreate(clusteredVariants, jobExecution.getJobId().toString());
for (GetOrCreateAccessionWrapper<IClusteredVariant, String, Long> result : accessionWrappers) {
if (result.isNewAccession()) {
ClusteringWriter.writeRSReportEntry(this.rsReportFileWriter, result.getAccession(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.data.mongodb.core.MongoTemplate;

Expand Down Expand Up @@ -96,18 +97,22 @@ public class RSSplitWriter implements ItemWriter<SubmittedVariantOperationEntity

private FileWriter rsReportFileWriter;

private JobExecution jobExecution;

public RSSplitWriter(ClusteringWriter clusteringWriter,
ClusteredVariantAccessioningService clusteredVariantAccessioningService,
SubmittedVariantAccessioningService submittedVariantAccessioningService,
MongoTemplate mongoTemplate,
MetricCompute<ClusteringMetric> metricCompute,
File rsReportFile) throws IOException {
File rsReportFile,
JobExecution jobExecution) throws IOException {
this.clusteringWriter = clusteringWriter;
this.clusteredVariantAccessioningService = clusteredVariantAccessioningService;
this.submittedVariantAccessioningService = submittedVariantAccessioningService;
this.mongoTemplate = mongoTemplate;
this.metricCompute = metricCompute;
this.rsReportFile = rsReportFile;
this.jobExecution = jobExecution;
}

@Override
Expand Down Expand Up @@ -194,7 +199,8 @@ private void issueNewRSForHashes(List<String> hashesThatShouldGetNewRS,
rsHashAndAssociatedSS.get(rsHash).get(0));
Long newRSAccession =
this.clusteredVariantAccessioningService.getOrCreate(
Collections.singletonList(clusteredVariantEntity)).get(0).getAccession();
Collections.singletonList(clusteredVariantEntity), jobExecution.getJobId().toString())
.get(0).getAccession();
ClusteringWriter.writeRSReportEntry(this.rsReportFileWriter, newRSAccession, rsHash);
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_CREATED, 1);
List<SubmittedVariantEntity> associatedSSEntries = rsHashAndAssociatedSS.get(rsHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.data.mongodb.core.MongoTemplate;

Expand Down Expand Up @@ -71,14 +72,18 @@ public class SSSplitWriter implements ItemWriter<SubmittedVariantEntity> {

private final MetricCompute<ClusteringMetric> metricCompute;

private JobExecution jobExecution;

public SSSplitWriter(String assembly, ClusteringWriter clusteringWriter,
SubmittedVariantAccessioningService submittedVariantAccessioningService,
MongoTemplate mongoTemplate, MetricCompute<ClusteringMetric> metricCompute) {
MongoTemplate mongoTemplate, MetricCompute<ClusteringMetric> metricCompute,
JobExecution jobExecution) {
this.assembly = assembly;
this.clusteringWriter = clusteringWriter;
this.submittedVariantAccessioningService = submittedVariantAccessioningService;
this.mongoTemplate = mongoTemplate;
this.metricCompute = metricCompute;
this.jobExecution = jobExecution;
}

@Override
Expand Down Expand Up @@ -127,7 +132,8 @@ protected void processSplitCandidates(List<SubmittedVariantOperationEntity> spli
excludeSSWithAlreadyUpdatedIDs(svesToCreateWithNewIDs);
removeCurrentSSEntriesInDBForSplitCandidates(svesToCreateWithNewIDs.keySet());
List<GetOrCreateAccessionWrapper<ISubmittedVariant, String, Long>> newlyCreatedSVEs =
this.submittedVariantAccessioningService.getOrCreate(new ArrayList<>(svesToCreateWithNewIDs.values()));
this.submittedVariantAccessioningService.getOrCreate(new ArrayList<>(svesToCreateWithNewIDs.values()),
jobExecution.getJobId().toString());
this.metricCompute.addCount(ClusteringMetric.SUBMITTED_VARIANTS_SS_SPLIT, newlyCreatedSVEs.size());
this.metricCompute.saveMetricsCountsInDB();
recordSplitOperation(svesToCreateWithNewIDs, newlyCreatedSVEs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand Down Expand Up @@ -49,11 +50,12 @@ public ClusteringWriter clusteredClusteringWriter(MongoTemplate mongoTemplate,
Long accessioningMonotonicInitSs,
Long accessioningMonotonicInitRs,
MetricCompute metricCompute,
File rsReportFile) throws IOException {
File rsReportFile,
JobExecution jobExecution) throws IOException {
return new ClusteringWriter(mongoTemplate, inputParameters.getAssemblyAccession(),
clusteredVariantAccessioningService, accessioningMonotonicInitSs,
accessioningMonotonicInitRs, metricCompute, true,
rsReportFile);
rsReportFile, jobExecution);
}

@Bean(NON_CLUSTERED_CLUSTERING_WRITER)
Expand All @@ -64,10 +66,11 @@ public ClusteringWriter nonClusteredClusteringWriter(MongoTemplate mongoTemplate
Long accessioningMonotonicInitSs,
Long accessioningMonotonicInitRs,
MetricCompute metricCompute,
File rsReportFile) throws IOException {
File rsReportFile,
JobExecution jobExecution) throws IOException {
return new ClusteringWriter(mongoTemplate, inputParameters.getAssemblyAccession(),
clusteredVariantAccessioningService, accessioningMonotonicInitSs,
accessioningMonotonicInitRs, metricCompute, false,
rsReportFile);
rsReportFile, jobExecution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package uk.ac.ebi.eva.accession.clustering.configuration.batch.io;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down Expand Up @@ -67,9 +68,10 @@ public ItemWriter<SubmittedVariantOperationEntity> rsSplitWriter(
SubmittedVariantAccessioningService submittedVariantAccessioningService,
MongoTemplate mongoTemplate,
MetricCompute metricCompute,
File rsReportFile) throws IOException {
File rsReportFile,
JobExecution jobExecution) throws IOException {
return new RSSplitWriter(clusteringWriter, clusteredVariantAccessioningService,
submittedVariantAccessioningService, mongoTemplate, metricCompute,
rsReportFile);
rsReportFile, jobExecution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package uk.ac.ebi.eva.accession.clustering.configuration.batch.io;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -44,8 +45,8 @@ public ItemWriter<SubmittedVariantEntity> ssSplitWriter(
InputParameters inputParameters,
@Qualifier(CLUSTERED_CLUSTERING_WRITER) ClusteringWriter clusteringWriter,
SubmittedVariantAccessioningService submittedVariantAccessioningService,
MongoTemplate mongoTemplate, MetricCompute metricCompute) {
MongoTemplate mongoTemplate, MetricCompute metricCompute, JobExecution jobExecution) {
return new SSSplitWriter(inputParameters.getAssemblyAccession(), clusteringWriter,
submittedVariantAccessioningService, mongoTemplate, metricCompute);
submittedVariantAccessioningService, mongoTemplate, metricCompute, jobExecution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
@ContextConfiguration(classes={BatchTestConfiguration.class})
@TestPropertySource("classpath:clustering-pipeline-test.properties")
public class ClusteringCommandLineRunnerTest {
private static String TEST_APPLICATION_INSTANCE_ID = "test-application-instance-id";

private static final String TEST_DB = "test-db";

Expand Down Expand Up @@ -843,7 +844,7 @@ private void setupRSAndSS() throws AccessionCouldNotBeGeneratedException {
// This is the easiest way to exhaust the accessions because
// the monotonic accession generator will only create accessions in the EVA collection (ClusteredVariantEntity)
// and not the dbSNP collection (DbsnpClusteredVariantEntity)
clusteredVariantAccessionGenerator.generateAccessions(10);
clusteredVariantAccessionGenerator.generateAccessions(10, TEST_APPLICATION_INSTANCE_ID);
}

private void createEVASS8InASM1WithUnassignedRS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public MonotonicAccessionGenerator<IClusteredVariant> clusteredVariantAccessionG
logger.debug("Using application properties: " + properties.toString());
return new MonotonicAccessionGenerator<>(
properties.getClustered().getCategoryId(),
properties.getInstanceId(),
blockService,
clusteredVariantAccessioningDatabaseService());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public MonotonicAccessionGenerator<ISubmittedVariant> submittedVariantAccessionG
logger.debug("Using application properties: " + properties.toString());
return new MonotonicAccessionGenerator<>(
properties.getSubmitted().getCategoryId(),
properties.getInstanceId(),
blockService,
submittedVariantAccessioningDatabaseService());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public class DbsnpMonotonicAccessionGenerator<MODEL> extends MonotonicAccessionG

public DbsnpMonotonicAccessionGenerator(String categoryId, String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService) {
super(categoryId, applicationInstanceId, contiguousIdBlockService, null);
super(categoryId, contiguousIdBlockService, null);
}

@Override
public synchronized long[] generateAccessions(int numAccessionsToGenerate) {
public synchronized long[] generateAccessions(int numAccessionsToGenerate, String applicationInstanceId) {
throw new UnsupportedOperationException("New accessions cannot be issued for dbSNP variants");
}

Expand All @@ -59,7 +59,8 @@ public synchronized void postSave(SaveResponse response) {
}

@Override
public <HASH> List<AccessionWrapper<MODEL, HASH, Long>> generateAccessions(Map<HASH, MODEL> messages) {
public <HASH> List<AccessionWrapper<MODEL, HASH, Long>> generateAccessions(Map<HASH, MODEL> messages,
String applicationInstanceId) {
throw new UnsupportedOperationException("New accessions cannot be issued for dbSNP variants");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ClusteredVariantAccessioningService(ClusteredVariantMonotonicAccessioning

@Override
public List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> getOrCreate(
List<? extends IClusteredVariant> variants)
List<? extends IClusteredVariant> variants, String applicationInstanceId)
throws AccessionCouldNotBeGeneratedException {
List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> dbsnpVariants =
accessioningServiceDbsnp.get(variants).stream()
Expand All @@ -81,7 +81,7 @@ public List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> getOrC
return dbsnpVariants;
} else {
List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> clusteredVariants =
accessioningService.getOrCreate(variantsNotInDbsnp);
accessioningService.getOrCreate(variantsNotInDbsnp, applicationInstanceId);

List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> allClusteredVariants = new ArrayList<>();
allClusteredVariants.addAll(dbsnpVariants);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public SubmittedVariantAccessioningService(SubmittedVariantMonotonicAccessioning

@Override
public List<GetOrCreateAccessionWrapper<ISubmittedVariant, String, Long>> getOrCreate(
List<? extends ISubmittedVariant> variants)
List<? extends ISubmittedVariant> variants, String applicationInstanceId)
throws AccessionCouldNotBeGeneratedException {
List<AccessionWrapper<ISubmittedVariant, String, Long>> dbsnpVariants = accessioningServiceDbsnp.get(variants);
List<ISubmittedVariant> variantsNotInDbsnp = removeFromList(variants, dbsnpVariants);
Expand All @@ -80,7 +80,7 @@ public List<GetOrCreateAccessionWrapper<ISubmittedVariant, String, Long>> getOrC
d.getData(), false)).collect(Collectors.toList());
} else {
List<AccessionWrapper<ISubmittedVariant, String, Long>> submittedVariants = new ArrayList<>();
accessioningService.getOrCreate(variantsNotInDbsnp)
accessioningService.getOrCreate(variantsNotInDbsnp, applicationInstanceId)
.forEach(getOrCreateAccessionWrapperObj -> submittedVariants.add(
new AccessionWrapper<ISubmittedVariant, String, Long>
(getOrCreateAccessionWrapperObj.getAccession(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
@ContextConfiguration(classes = {ClusteredVariantAccessioningConfiguration.class, MongoTestConfiguration.class})
public class ClusteredVariantAccessioningServiceTest {

private static String TEST_APPLICATION_INSTANCE_ID = "test-application-instance-id";

//Required by nosql-unit
@Autowired
private ApplicationContext applicationContext;
Expand Down Expand Up @@ -91,9 +93,9 @@ public void sameAccessionsAreReturnedForIdenticalVariants() throws Exception {
new ClusteredVariant("assembly", 1111, "contig_1", 200, VariantType.SNV, false, null));

List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> generatedAccessions =
service.getOrCreate(variants);
service.getOrCreate(variants, TEST_APPLICATION_INSTANCE_ID);
List<GetOrCreateAccessionWrapper<IClusteredVariant, String, Long>> retrievedAccessions =
service.getOrCreate(variants);
service.getOrCreate(variants, TEST_APPLICATION_INSTANCE_ID);

assertEquals(new HashSet<>(generatedAccessions), new HashSet<>(retrievedAccessions));
}
Expand Down
Loading

0 comments on commit 509b2ae

Please sign in to comment.