Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3494 Ingest assembly in batches #125

Merged
merged 13 commits into from
Feb 12, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.springframework.hateoas.config.EnableHypermediaSupport;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableScheduling
@SpringBootApplication
@EnableRetry
@EnableTransactionManagement
@EnableHypermediaSupport(type = EnableHypermediaSupport.HypermediaType.HAL)
public class ContigAliasApplication extends SpringBootServletInitializer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@RequestMapping("/v1/admin")
@RestController
Expand Down Expand Up @@ -82,7 +80,7 @@ public ResponseEntity<?> fetchAndInsertAssemblyByAccession(
"parallel manner.")
@PutMapping(value = "assemblies")
public ResponseEntity<?> fetchAndInsertAssemblyByAccession(
@RequestBody(required = false) @ApiParam(value = "A JSON array of INSDC or RefSeq assembly accessions. " +
@RequestBody @ApiParam(value = "A JSON array of INSDC or RefSeq assembly accessions. " +
"Eg: [\"GCA_000001405.10\",\"GCA_000001405.11\",\"GCA_000001405.12\"]") List<String> accessions) {
if (accessions == null || accessions.size() <= 0) {
return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
Expand All @@ -92,7 +90,7 @@ public ResponseEntity<?> fetchAndInsertAssemblyByAccession(
}

@ApiOperation(value = "Given an assembly accession, retrieve MD5 checksum for all chromosomes belonging to assembly and update")
@PutMapping(value = "assemblies/{accession}/md5checksum")
@PutMapping(value = "assemblies/md5checksum/{accession}")
public ResponseEntity<String> retrieveAndInsertMd5ChecksumForAssembly(@PathVariable(name = "accession")
@ApiParam(value = "INSDC or RefSeq assembly accession. Eg: " +
"GCA_000001405.10") String asmAccession) {
Expand All @@ -101,24 +99,60 @@ public ResponseEntity<String> retrieveAndInsertMd5ChecksumForAssembly(@PathVaria
handler.retrieveAndInsertMd5ChecksumForAssembly(asmAccession);
return ResponseEntity.ok("A task has been submitted for updating md5checksum for all chromosomes " +
"in assembly " + asmAccession + ". Depending upon the number of chromosomes present in assembly, " +
"this might take some time to complete");
"and other scheduled jobs, this might take some time to complete");
} catch (AssemblyNotFoundException e) {
return ResponseEntity.ok("Could not find assembly " + asmAccession +
". Please insert the assembly first (md5checksum will be updated as part of the insertion process");
}
}

@ApiOperation(value = "Given a list of assembly accessions, retrieve MD5 checksum for all chromosomes belonging to all the assemblies and update")
@PutMapping(value = "assemblies/md5checksum")
public ResponseEntity<String> retrieveAndInsertMd5ChecksumForAssembly(
@RequestBody @ApiParam(value = "A JSON array of INSDC or RefSeq assembly accessions. " +
"Eg: [\"GCA_000001405.10\",\"GCA_000001405.11\",\"GCA_000001405.12\"]") List<String> accessions) {
handler.retrieveAndInsertMd5ChecksumForAssembly(accessions);
return ResponseEntity.ok("A task has been submitted for updating md5checksum for all chromosomes " +
"in assemblies " + accessions + ". Depending upon the number of chromosomes present in assembly, " +
"and other scheduled jobs, this might take some time to complete");

}

@ApiOperation(value = "Given an assembly accession, retrieve ENA sequence name for all chromosomes belonging to assembly and update")
@PutMapping(value = "assemblies/enaSequenceName/{accession}")
public ResponseEntity<String> retrieveAndInsertENASequenceNameForAssembly(@PathVariable(name = "accession")
@ApiParam(value = "INSDC or RefSeq assembly accession. " +
"Eg: GCA_000001405.10") String asmAccession) {
try {
handler.getAssemblyByAccession(asmAccession);
handler.retrieveAndInsertENASequenceNameForAssembly(asmAccession);
return ResponseEntity.ok("A task has been submitted for updating ENA Sequence Name for all chromosomes " +
"in assembly " + asmAccession + ". Depending upon the number of chromosomes present in assembly, " +
"and other scheduled jobs, this might take some time to complete");
} catch (AssemblyNotFoundException e) {
return ResponseEntity.ok("Could not find assembly " + asmAccession +
". Please insert the assembly first (ENA sequence name will be updated as part of the insertion process");
}
}

@ApiOperation(value = "Given a list of assembly accessions, retrieve ENA sequence name for all chromosomes belonging to all the assemblies and update")
@PutMapping(value = "assemblies/enaSequenceName")
public ResponseEntity<String> retrieveAndInsertENASequenceNameForAssembly(
@RequestBody @ApiParam(value = "A JSON array of INSDC or RefSeq assembly accessions. " +
"Eg: [\"GCA_000001405.10\",\"GCA_000001405.11\",\"GCA_000001405.12\"]") List<String> accessions) {

handler.retrieveAndInsertENASequenceNameForAssembly(accessions);
return ResponseEntity.ok("A task has been submitted for updating ENA Sequence Name for all chromosomes " +
"in assembly " + accessions + ". Depending upon the number of chromosomes present in assembly, " +
"and other scheduled jobs, this might take some time to complete");
}


@ApiOperation(value = "Retrieve list of assemblies for which MD5 Checksum updates are running/going-to-run ")
nitin-ebi marked this conversation as resolved.
Show resolved Hide resolved
@GetMapping(value = "assemblies/md5checksum/status")
public ResponseEntity<String> getMD5ChecksumUpdateTaskStatus() {
Map<String, Set<String>> md5ChecksumUpdateTasks = handler.getMD5ChecksumUpdateTaskStatus();
Set<String> runningTasks = md5ChecksumUpdateTasks.get("running");
Set<String> scheduledTasks = md5ChecksumUpdateTasks.get("scheduled");
String runningTaskRes = runningTasks == null || runningTasks.isEmpty() ? "No running MD5 checksum update tasks" :
runningTasks.stream().collect(Collectors.joining(","));
String scheduledTaskRes = scheduledTasks == null || scheduledTasks.isEmpty() ? "No scheduled MD5 checksum update tasks" :
scheduledTasks.stream().collect(Collectors.joining(","));
return ResponseEntity.ok("running: " + runningTaskRes + "\nscheduled: " + scheduledTaskRes);
@GetMapping(value = "assemblies/scheduledJobs")
public ResponseEntity<List<String>> getMD5ChecksumUpdateTaskStatus() {
List<String> scheduledJobStatus = handler.getScheduledJobStatus();
return ResponseEntity.ok(scheduledJobStatus);
}

// This endpoint can be enabled in the future when checksums for assemblies are added to the project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.web.PagedResourcesAssembler;
import org.springframework.stereotype.Service;

import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity;
import uk.ac.ebi.eva.contigalias.service.AssemblyService;
import uk.ac.ebi.eva.contigalias.service.ChromosomeService;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

@Service
public class AdminHandler {
Expand All @@ -52,7 +49,7 @@ public Optional<AssemblyEntity> getAssemblyByAccession(String accession) {
return assemblyService.getAssemblyByAccession(accession);
}

public void fetchAndInsertAssemblyByAccession(String accession) throws IOException {
public void fetchAndInsertAssemblyByAccession(String accession) {
assemblyService.fetchAndInsertAssembly(accession);
}

Expand All @@ -64,8 +61,20 @@ public void retrieveAndInsertMd5ChecksumForAssembly(String accession) {
assemblyService.retrieveAndInsertMd5ChecksumForAssembly(accession);
}

public Map<String, Set<String>> getMD5ChecksumUpdateTaskStatus() {
return assemblyService.getMD5ChecksumUpdateTaskStatus();
public void retrieveAndInsertMd5ChecksumForAssembly(List<String> accessions) {
assemblyService.retrieveAndInsertMd5ChecksumForAssembly(accessions);
}

public void retrieveAndInsertENASequenceNameForAssembly(String accession) {
assemblyService.retrieveAndInsertENASequenceNameForAssembly(accession);
}

public void retrieveAndInsertENASequenceNameForAssembly(List<String> accessions) {
assemblyService.retrieveAndInsertENASequenceNameForAssembly(accessions);
}

public List<String> getScheduledJobStatus() {
return assemblyService.getScheduledJobStatus();
}

public void deleteAssemblyByAccession(String accession) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PagedResourcesAssembler;
import org.springframework.hateoas.EntityModel;
Expand Down Expand Up @@ -83,6 +82,7 @@ public PagedModel<EntityModel<AssemblyEntity>> getAssemblyByRefseq(String refseq

public PagedModel<EntityModel<AssemblyEntity>> getAssembliesByTaxid(long taxid, Pageable request) {
Page<AssemblyEntity> page = assemblyService.getAssembliesByTaxid(taxid, request);
page.forEach(it->it.setChromosomes(null));
return generatePagedModelFromPage(page, assemblyAssembler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

@Repository("ENADataSource")
public class ENAAssemblyDataSource implements AssemblyDataSource {
Expand Down Expand Up @@ -96,6 +97,24 @@ public Optional<AssemblyEntity> getAssemblyByAccession(String accession) throws

}

public Optional<Path> downloadAssemblyReport(String accession) throws IOException {
ENABrowser enaBrowser = factory.build();
enaBrowser.connect();
try {
enaBrowser.connect();
return downloadAssemblyReport(enaBrowser, accession);
} catch (Exception e){
logger.warn("Could not fetch Assembly Report from ENA for accession " + accession + "Exception: " + e);
return Optional.empty();
}finally {
try {
enaBrowser.disconnect();
} catch (IOException e) {
logger.warn("Error while trying to disconnect - enaBrowser (assembly: " + accession + ") : " + e);
}
}
}

@Retryable(value = Exception.class, maxAttempts = 5, backoff = @Backoff(delay = 2000, multiplier = 2))
public Optional<Path> downloadAssemblyReport(ENABrowser enaBrowser, String accession) throws IOException {
String dirPath = enaBrowser.getAssemblyDirPath(accession);
Expand All @@ -105,18 +124,33 @@ public Optional<Path> downloadAssemblyReport(ENABrowser enaBrowser, String acces
try {
boolean success = enaBrowser.downloadFTPFile(ftpFilePath, downloadFilePath, ftpFile.getSize());
if (success) {
logger.info("ENA assembly report downloaded successfully for accession "+ accession);
logger.info("ENA assembly report downloaded successfully for accession " + accession);
return Optional.of(downloadFilePath);
} else {
logger.warn("ENA assembly report could not be downloaded successfully for accession "+accession);
logger.warn("ENA assembly report could not be downloaded successfully for accession " + accession);
return Optional.empty();
}
} catch (IOException | DownloadFailedException e) {
logger.warn("Error downloading ENA assembly report for accession "+ accession + e);
logger.warn("Error downloading ENA assembly report for accession " + accession + e);
return Optional.empty();
}
}

public List<ChromosomeEntity> getChromosomeEntityList(List<String> chrDataList) {
List<ChromosomeEntity> chromosomeEntityList = new ArrayList<>();
for (String chrData : chrDataList) {
ChromosomeEntity chromosomeEntity = getChromosomeEntity(chrData);
if (chromosomeEntity != null) {
chromosomeEntityList.add(chromosomeEntity);
}
}
return chromosomeEntityList;
}

public ChromosomeEntity getChromosomeEntity(String chrLine) {
return ENAAssemblyReportReader.getChromosomeEntity(chrLine);
}

/**
* Adds ENA sequence names to chromosomes and scaffolds in an assembly. Will modify the AssemblyEntity in-place.
*
Expand Down Expand Up @@ -144,18 +178,18 @@ public boolean hasAllEnaSequenceNames(AssemblyEntity assembly) {
return chromosomes.stream().allMatch(sequence -> sequence.getEnaSequenceName() != null);
}

private void addENASequenceNames(
public void addENASequenceNames(
List<? extends SequenceEntity> sourceSequences, List<? extends SequenceEntity> targetSequences) {
Map<String, SequenceEntity> insdcToSequenceEntity = new HashMap<>();
for (SequenceEntity targetSeq : targetSequences) {
insdcToSequenceEntity.put(targetSeq.getInsdcAccession(), targetSeq);
if (targetSequences == null || sourceSequences == null || targetSequences.isEmpty() || sourceSequences.isEmpty()) {
return;
}
Map<String, SequenceEntity> insdcToSequenceEntityMap = targetSequences.stream()
.collect(Collectors.toMap(s->s.getInsdcAccession(), s->s));

for (SequenceEntity sourceSeq : sourceSequences) {
String sourceInsdcAccession = sourceSeq.getInsdcAccession();
if (insdcToSequenceEntity.containsKey(sourceInsdcAccession)) {
insdcToSequenceEntity.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName());
} else {
insdcToSequenceEntity.put(sourceInsdcAccession, sourceSeq);
if (insdcToSequenceEntityMap.containsKey(sourceInsdcAccession)) {
insdcToSequenceEntityMap.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import uk.ac.ebi.eva.contigalias.dus.NCBIBrowser;
import uk.ac.ebi.eva.contigalias.dus.NCBIBrowserFactory;
import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity;
import uk.ac.ebi.eva.contigalias.entities.ChromosomeEntity;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Repository("NCBIDataSource")
public class NCBIAssemblyDataSource implements AssemblyDataSource {
Expand Down Expand Up @@ -85,6 +89,53 @@ public Optional<AssemblyEntity> getAssemblyByAccession(
return Optional.of(assemblyEntity);
}

public AssemblyEntity getAssemblyEntity(Path downloadFilePath) throws IOException {
List<String> asmDataLines = Files.lines(downloadFilePath)
.filter(line -> line.startsWith("#"))
.collect(Collectors.toList());
return getAssemblyEntity(asmDataLines);
}

public AssemblyEntity getAssemblyEntity(List<String> asmDataLines) {
return NCBIAssemblyReportReader.getAssemblyEntity(asmDataLines);
}

public List<ChromosomeEntity> getChromosomeEntityList(AssemblyEntity assemblyEntity, List<String> chrDataList) {
List<ChromosomeEntity> chromosomeEntityList = new ArrayList<>();
for (String chrData : chrDataList) {
ChromosomeEntity chromosomeEntity = getChromosomeEntity(assemblyEntity, chrData);
if (chromosomeEntity != null) {
chromosomeEntityList.add(chromosomeEntity);
}
}
return chromosomeEntityList;
}

public ChromosomeEntity getChromosomeEntity(AssemblyEntity assemblyEntity, String chrLine) {
ChromosomeEntity chromosomeEntity = NCBIAssemblyReportReader.getChromosomeEntity(chrLine);
if (chromosomeEntity != null) {
chromosomeEntity.setAssembly(assemblyEntity);
}
return chromosomeEntity;
}

public Optional<Path> downloadAssemblyReport(String accession) throws IOException {
NCBIBrowser ncbiBrowser = factory.build();
Optional<Path> downloadPath;
try {
ncbiBrowser.connect();
downloadPath = downloadAssemblyReport(accession, ncbiBrowser);
} finally {
try {
ncbiBrowser.disconnect();
} catch (IOException e) {
logger.warn("Error while trying to disconnect - ncbiBrowser (assembly: " + accession + ") : " + e);
}
}

return downloadPath;
}

@Retryable(value = Exception.class, maxAttempts = 5, backoff = @Backoff(delay = 2000, multiplier = 2))
public Optional<Path> downloadAssemblyReport(String accession, NCBIBrowser ncbiBrowser) throws IOException {
Optional<String> directory = ncbiBrowser.getGenomeReportDirectory(accession);
Expand All @@ -105,5 +156,4 @@ public Optional<Path> downloadAssemblyReport(String accession, NCBIBrowser ncbiB
return Optional.empty();
}
}

}
Loading
Loading