Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3494 Ingest assembly in batches #125

Merged
merged 13 commits into from
Feb 12, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.springframework.hateoas.config.EnableHypermediaSupport;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableScheduling
@SpringBootApplication
@EnableRetry
@EnableTransactionManagement
@EnableHypermediaSupport(type = EnableHypermediaSupport.HypermediaType.HAL)
public class ContigAliasApplication extends SpringBootServletInitializer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@
import uk.ac.ebi.eva.contigalias.entities.SequenceEntity;
import uk.ac.ebi.eva.contigalias.exception.DownloadFailedException;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

@Repository("ENADataSource")
public class ENAAssemblyDataSource implements AssemblyDataSource {
Expand Down Expand Up @@ -96,6 +99,24 @@ public Optional<AssemblyEntity> getAssemblyByAccession(String accession) throws

}

public Optional<Path> downloadAssemblyReport(String accession) throws IOException {
ENABrowser enaBrowser = factory.build();
enaBrowser.connect();
try {
enaBrowser.connect();
return downloadAssemblyReport(enaBrowser, accession);
} catch (Exception e){
logger.warn("Could not fetch Assembly Report from ENA for accession " + accession + "Exception: " + e);
return Optional.empty();
}finally {
try {
enaBrowser.disconnect();
} catch (IOException e) {
logger.warn("Error while trying to disconnect - ncbiBrowser (assembly: " + accession + ") : " + e);
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

@Retryable(value = Exception.class, maxAttempts = 5, backoff = @Backoff(delay = 2000, multiplier = 2))
public Optional<Path> downloadAssemblyReport(ENABrowser enaBrowser, String accession) throws IOException {
String dirPath = enaBrowser.getAssemblyDirPath(accession);
Expand All @@ -105,18 +126,33 @@ public Optional<Path> downloadAssemblyReport(ENABrowser enaBrowser, String acces
try {
boolean success = enaBrowser.downloadFTPFile(ftpFilePath, downloadFilePath, ftpFile.getSize());
if (success) {
logger.info("ENA assembly report downloaded successfully for accession "+ accession);
logger.info("ENA assembly report downloaded successfully for accession " + accession);
return Optional.of(downloadFilePath);
} else {
logger.warn("ENA assembly report could not be downloaded successfully for accession "+accession);
logger.warn("ENA assembly report could not be downloaded successfully for accession " + accession);
return Optional.empty();
}
} catch (IOException | DownloadFailedException e) {
logger.warn("Error downloading ENA assembly report for accession "+ accession + e);
logger.warn("Error downloading ENA assembly report for accession " + accession + e);
return Optional.empty();
}
}

public List<ChromosomeEntity> getChromosomeEntityList(List<String> chrDataList) {
List<ChromosomeEntity> chromosomeEntityList = new ArrayList<>();
for (String chrData : chrDataList) {
ChromosomeEntity chromosomeEntity = getChromosomeEntity(chrData);
if (chromosomeEntity != null) {
chromosomeEntityList.add(chromosomeEntity);
}
}
return chromosomeEntityList;
}

public ChromosomeEntity getChromosomeEntity(String chrLine) {
return ENAAssemblyReportReader.getChromosomeEntity(chrLine);
}

/**
* Adds ENA sequence names to chromosomes and scaffolds in an assembly. Will modify the AssemblyEntity in-place.
*
Expand Down Expand Up @@ -144,18 +180,43 @@ public boolean hasAllEnaSequenceNames(AssemblyEntity assembly) {
return chromosomes.stream().allMatch(sequence -> sequence.getEnaSequenceName() != null);
}

private void addENASequenceNames(
public void addENASequenceNames(
List<? extends SequenceEntity> sourceSequences, List<? extends SequenceEntity> targetSequences) {
Map<String, SequenceEntity> insdcToSequenceEntity = new HashMap<>();
for (SequenceEntity targetSeq : targetSequences) {
insdcToSequenceEntity.put(targetSeq.getInsdcAccession(), targetSeq);
if (targetSequences == null || sourceSequences == null || targetSequences.isEmpty() || sourceSequences.isEmpty()) {
return;
}
Map<String, SequenceEntity> insdcToSequenceEntityMap = targetSequences.stream()
.collect(Collectors.toMap(s->s.getInsdcAccession(), s->s));

for (SequenceEntity sourceSeq : sourceSequences) {
String sourceInsdcAccession = sourceSeq.getInsdcAccession();
if (insdcToSequenceEntity.containsKey(sourceInsdcAccession)) {
insdcToSequenceEntity.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName());
} else {
insdcToSequenceEntity.put(sourceInsdcAccession, sourceSeq);
if (insdcToSequenceEntityMap.containsKey(sourceInsdcAccession)) {
insdcToSequenceEntityMap.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName());
}
}
}

public void addENASequenceNameToChromosomes(List<ChromosomeEntity> ncbiChromosomeList,
Path downloadedENAFilePath, final int BATCH_SIZE) throws IOException {
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) {
List<String> chrLines = new ArrayList<>();
List<ChromosomeEntity> enaChromosomeList;
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith("accession")) {
continue;
}
chrLines.add(line);
if (chrLines.size() == BATCH_SIZE) {
enaChromosomeList = getChromosomeEntityList(chrLines);
addENASequenceNames(enaChromosomeList, ncbiChromosomeList);

chrLines = new ArrayList<>();
}
}
if (!chrLines.isEmpty()) {
enaChromosomeList = getChromosomeEntityList(chrLines);
addENASequenceNames(enaChromosomeList, ncbiChromosomeList);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@
import uk.ac.ebi.eva.contigalias.dus.NCBIBrowser;
import uk.ac.ebi.eva.contigalias.dus.NCBIBrowserFactory;
import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity;
import uk.ac.ebi.eva.contigalias.entities.ChromosomeEntity;
import uk.ac.ebi.eva.contigalias.exception.AssemblyNotFoundException;
import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository;
import uk.ac.ebi.eva.contigalias.repo.ChromosomeRepository;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Repository("NCBIDataSource")
public class NCBIAssemblyDataSource implements AssemblyDataSource {

private final Logger logger = LoggerFactory.getLogger(NCBIAssemblyDataSource.class);

private final int BATCH_SIZE = 100000;

private final NCBIBrowserFactory factory;

private final NCBIAssemblyReportReaderFactory readerFactory;
Expand Down Expand Up @@ -85,6 +96,53 @@ public Optional<AssemblyEntity> getAssemblyByAccession(
return Optional.of(assemblyEntity);
}

public AssemblyEntity getAssemblyEntity(Path downloadFilePath) throws IOException {
List<String> asmDataLines = Files.lines(downloadFilePath)
.filter(line -> line.startsWith("#"))
.collect(Collectors.toList());
return getAssemblyEntity(asmDataLines);
}

public AssemblyEntity getAssemblyEntity(List<String> asmDataLines) {
return NCBIAssemblyReportReader.getAssemblyEntity(asmDataLines);
}

public List<ChromosomeEntity> getChromosomeEntityList(AssemblyEntity assemblyEntity, List<String> chrDataList) {
List<ChromosomeEntity> chromosomeEntityList = new ArrayList<>();
for (String chrData : chrDataList) {
ChromosomeEntity chromosomeEntity = getChromosomeEntity(assemblyEntity, chrData);
if (chromosomeEntity != null) {
chromosomeEntityList.add(chromosomeEntity);
}
}
return chromosomeEntityList;
}

public ChromosomeEntity getChromosomeEntity(AssemblyEntity assemblyEntity, String chrLine) {
ChromosomeEntity chromosomeEntity = NCBIAssemblyReportReader.getChromosomeEntity(chrLine);
if (chromosomeEntity != null) {
chromosomeEntity.setAssembly(assemblyEntity);
}
return chromosomeEntity;
}

public Optional<Path> downloadAssemblyReport(String accession) throws IOException {
NCBIBrowser ncbiBrowser = factory.build();
Optional<Path> downloadPath;
try {
ncbiBrowser.connect();
downloadPath = downloadAssemblyReport(accession, ncbiBrowser);
} finally {
try {
ncbiBrowser.disconnect();
} catch (IOException e) {
logger.warn("Error while trying to disconnect - ncbiBrowser (assembly: " + accession + ") : " + e);
}
}

return downloadPath;
}

@Retryable(value = Exception.class, maxAttempts = 5, backoff = @Backoff(delay = 2000, multiplier = 2))
public Optional<Path> downloadAssemblyReport(String accession, NCBIBrowser ncbiBrowser) throws IOException {
Optional<String> directory = ncbiBrowser.getGenomeReportDirectory(accession);
Expand All @@ -106,4 +164,59 @@ public Optional<Path> downloadAssemblyReport(String accession, NCBIBrowser ncbiB
}
}

public void parseFileAndInsertAssembly(String accession, ENAAssemblyDataSource enaDataSource,
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
AssemblyRepository assemblyRepository, ChromosomeRepository chromosomeRepository) throws IOException {
Optional<Path> downloadNCBIFilePathOpt = downloadAssemblyReport(accession);
Path downloadedNCBIFilePath = downloadNCBIFilePathOpt.orElseThrow(() -> new AssemblyNotFoundException(accession));
Optional<Path> downloadENAFilePathOpt = enaDataSource.downloadAssemblyReport(accession);
Path downloadedENAFilePath = downloadENAFilePathOpt.orElse(null);

long numberOfChromosomesInFile = Files.lines(downloadedNCBIFilePath).filter(line -> !line.startsWith("#")).count();
logger.info("Number of chromosomes in assembly (" + accession + "): " + numberOfChromosomesInFile);

AssemblyEntity assemblyEntity = getAssemblyEntity(downloadedNCBIFilePath);
assemblyRepository.save(assemblyEntity);

try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedNCBIFilePath.toFile()))) {
long chromosomesSavedTillNow = 0l;
List<String> chrLines = new ArrayList<>();
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith("#")) {
continue;
}
chrLines.add(line);
if (chrLines.size() == BATCH_SIZE) {
// add ena sequence name and save
addENASequenceNameAndSave(assemblyEntity, chrLines, enaDataSource, downloadedENAFilePath, chromosomeRepository);
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
chromosomesSavedTillNow += chrLines.size();
logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow);

chrLines = new ArrayList<>();
}
}
if (!chrLines.isEmpty()) {
// add ena sequence name and save
addENASequenceNameAndSave(assemblyEntity, chrLines, enaDataSource, downloadedENAFilePath, chromosomeRepository);
chromosomesSavedTillNow += chrLines.size();
logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow);
}
}

// delete the files after assembly insertion
Files.deleteIfExists(downloadedNCBIFilePath);
if (downloadedENAFilePath != null) {
Files.deleteIfExists(downloadedENAFilePath);
}
}

public void addENASequenceNameAndSave(AssemblyEntity assemblyEntity, List<String> chrLines,
ENAAssemblyDataSource enaDataSource, Path downloadedENAFilePath,
ChromosomeRepository chromosomeRepository) throws IOException {
List<ChromosomeEntity> chromosomeEntityList = getChromosomeEntityList(assemblyEntity, chrLines);
if (downloadedENAFilePath != null) {
enaDataSource.addENASequenceNameToChromosomes(chromosomeEntityList, downloadedENAFilePath, BATCH_SIZE);
}
chromosomeRepository.saveAll(chromosomeEntityList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ protected void parseReport() throws IOException, NullPointerException {
}

// Not present in ENA assembly reports
protected void parseAssemblyData(String line) {}
protected void parseAssemblyData(String line) {
}

protected void parseChromosomeLine(String[] columns) {
ChromosomeEntity chromosomeEntity = new ChromosomeEntity();

chromosomeEntity.setInsdcAccession(columns[0]);
chromosomeEntity.setEnaSequenceName(columns[1]);
ChromosomeEntity chromosomeEntity = getChromosome(columns);
if (chromosomeEntity == null) {
return;
}

if (assemblyEntity == null) {
assemblyEntity = new AssemblyEntity();
Expand All @@ -82,10 +83,10 @@ protected void parseChromosomeLine(String[] columns) {
}

protected void parseScaffoldLine(String[] columns) {
ChromosomeEntity scaffoldEntity = new ChromosomeEntity();

scaffoldEntity.setInsdcAccession(columns[0]);
scaffoldEntity.setEnaSequenceName(columns[1]);
ChromosomeEntity scaffoldEntity = getScaffold(columns);
if (scaffoldEntity == null) {
return;
}

if (assemblyEntity == null) {
assemblyEntity = new AssemblyEntity();
Expand All @@ -101,4 +102,37 @@ protected void parseScaffoldLine(String[] columns) {
scaffolds.add(scaffoldEntity);
}

public static ChromosomeEntity getChromosomeEntity(String line) {
if (!line.startsWith("accession")) {
String[] columns = line.split("\t", -1);
if (columns.length >= 6) {
if (columns[5].equals("Chromosome") && columns[3].equals("assembled-molecule")) {
return getChromosome(columns);
} else {
return getScaffold(columns);
}
}
}

return null;
}

public static ChromosomeEntity getChromosome(String[] columns) {
ChromosomeEntity chromosomeEntity = new ChromosomeEntity();
chromosomeEntity.setInsdcAccession(columns[0]);
chromosomeEntity.setEnaSequenceName(columns[1]);
chromosomeEntity.setContigType(SequenceEntity.ContigType.CHROMOSOME);

return chromosomeEntity;
}

public static ChromosomeEntity getScaffold(String[] columns) {
ChromosomeEntity scaffoldEntity = new ChromosomeEntity();
scaffoldEntity.setInsdcAccession(columns[0]);
scaffoldEntity.setEnaSequenceName(columns[1]);
scaffoldEntity.setContigType(SequenceEntity.ContigType.SCAFFOLD);

return scaffoldEntity;
}

}
Loading
Loading