diff --git a/src/main/java/org/broadinstitute/hellbender/tools/copynumber/CollectAllelicCountsSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/copynumber/CollectAllelicCountsSpark.java new file mode 100644 index 00000000000..52d2641ab49 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/tools/copynumber/CollectAllelicCountsSpark.java @@ -0,0 +1,107 @@ +package org.broadinstitute.hellbender.tools.copynumber; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import org.broadinstitute.barclay.argparser.Argument; +import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; +import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.cmdline.programgroups.CopyNumberProgramGroup; +import org.broadinstitute.hellbender.engine.filters.MappingQualityReadFilter; +import org.broadinstitute.hellbender.engine.filters.ReadFilter; +import org.broadinstitute.hellbender.engine.spark.LocusWalkerContext; +import org.broadinstitute.hellbender.engine.spark.LocusWalkerSpark; +import org.broadinstitute.hellbender.tools.copynumber.datacollection.AllelicCountCollector; +import org.broadinstitute.hellbender.tools.copynumber.formats.metadata.*; +import org.broadinstitute.hellbender.utils.Nucleotide; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * See {@link CollectAllelicCounts}. This behaves the same, except that it supports spark. + */ +@CommandLineProgramProperties( + summary = "Collects ref/alt counts at sites.", + oneLineSummary = "Collects ref/alt counts at sites.", + programGroup = CopyNumberProgramGroup.class +) +public class CollectAllelicCountsSpark extends LocusWalkerSpark { + + private static final long serialVersionUID = 1L; + + private static final Logger logger = LogManager.getLogger(CollectAllelicCounts.class); + + @Argument( + doc = "Output allelic-counts file.", + fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, + shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME + ) + private File outputAllelicCountsFile; + + @Argument( + doc = "Minimum base quality; base calls with lower quality will be filtered out of pileup.", + fullName = "minimumBaseQuality", + shortName = "minBQ", + minValue = 0, + optional = true + ) + private int minimumBaseQuality = 20; + + private static final int DEFAULT_MINIMUM_MAPPING_QUALITY = 30; + + @Override + public boolean emitEmptyLoci() {return true;} + + @Override + public boolean requiresReference() {return true;} + + @Override + public boolean requiresIntervals() {return true;} + + @Override + protected void processAlignments(JavaRDD rdd, JavaSparkContext ctx) { + final SampleLocatableMetadata metadata = MetadataUtils.fromHeader(getHeaderForReads(), Metadata.Type.SAMPLE_LOCATABLE); + final Broadcast sampleMetadataBroadcast = ctx.broadcast(metadata); + + final AllelicCountCollector finalAllelicCountCollector = + rdd.mapPartitions(distributedCount(sampleMetadataBroadcast, minimumBaseQuality)) + .reduce((a1, a2) -> combineAllelicCountCollectors(a1, a2, sampleMetadataBroadcast.getValue())); + finalAllelicCountCollector.getAllelicCounts().write(outputAllelicCountsFile); + } + + private static FlatMapFunction, AllelicCountCollector> distributedCount(final Broadcast sampleMetadataBroadcast, + final int minimumBaseQuality) { + return (FlatMapFunction, AllelicCountCollector>) contextIterator -> { + final AllelicCountCollector result = new AllelicCountCollector(sampleMetadataBroadcast.getValue()); + + contextIterator.forEachRemaining( ctx -> { + final byte refAsByte = ctx.getReferenceContext().getBase(); + result.collectAtLocus(Nucleotide.decode(refAsByte), ctx.getAlignmentContext().getBasePileup(), + ctx.getAlignmentContext().getLocation(), minimumBaseQuality); + } + ); + return Collections.singletonList(result).iterator(); + }; + } + + private static AllelicCountCollector combineAllelicCountCollectors(final AllelicCountCollector allelicCountCollector1, + final AllelicCountCollector allelicCountCollector2, + final SampleLocatableMetadata sampleMetadata) { + return AllelicCountCollector.combine(allelicCountCollector1, allelicCountCollector2, sampleMetadata); + } + + @Override + public List getDefaultReadFilters() { + final List initialReadFilters = new ArrayList<>(super.getDefaultReadFilters()); + initialReadFilters.add(new MappingQualityReadFilter(DEFAULT_MINIMUM_MAPPING_QUALITY)); + + return initialReadFilters; + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/tools/copynumber/datacollection/AllelicCountCollector.java b/src/main/java/org/broadinstitute/hellbender/tools/copynumber/datacollection/AllelicCountCollector.java index 2f92e778795..efa0b94899f 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/copynumber/datacollection/AllelicCountCollector.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/copynumber/datacollection/AllelicCountCollector.java @@ -94,4 +94,29 @@ private static Nucleotide inferAltFromPileupBaseCounts(final Nucleotide.Counter .sorted((b1, b2) -> Long.compare(baseCounts.get(b2), baseCounts.get(b1))) .findFirst().get(); } + + /** + * Reminder that any additional information used through this method will not be able to enforce the minBaseQuality. + * + * @param allelicCountCollector input data to combine with this. + */ + public void collectFromCollector(final AllelicCountCollector allelicCountCollector) { + if (allelicCountCollector != null) { + this.allelicCounts.addAll(allelicCountCollector.getAllelicCounts().getRecords()); + } + } + + /** TODO: Docs and input parameter checking + * + * @param allelicCountCollector1 + * @param allelicCountCollector2 + * @return a new allelic count collector with the combined contents of the two inputs + */ + public static AllelicCountCollector combine(final AllelicCountCollector allelicCountCollector1, final AllelicCountCollector allelicCountCollector2, + final SampleLocatableMetadata sampleMetadata) { + final AllelicCountCollector result = new AllelicCountCollector(sampleMetadata); + result.collectFromCollector(allelicCountCollector1); + result.collectFromCollector(allelicCountCollector2); + return result; + } } diff --git a/src/main/java/org/broadinstitute/hellbender/tools/copynumber/formats/metadata/SimpleSampleMetadata.java b/src/main/java/org/broadinstitute/hellbender/tools/copynumber/formats/metadata/SimpleSampleMetadata.java index 831748b537a..46db2b12d08 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/copynumber/formats/metadata/SimpleSampleMetadata.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/copynumber/formats/metadata/SimpleSampleMetadata.java @@ -4,12 +4,17 @@ import htsjdk.samtools.SAMReadGroupRecord; import org.broadinstitute.hellbender.utils.Utils; +import java.io.Serializable; + /** * Metadata associated with a single sample. * * @author Samuel Lee <slee@broadinstitute.org> */ -public class SimpleSampleMetadata implements SampleMetadata { +public class SimpleSampleMetadata implements SampleMetadata, Serializable { + + private static final long serialVersionUID = 0L; + private final String sampleName; public SimpleSampleMetadata(final String sampleName) { diff --git a/src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleLocusWalkerSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleLocusWalkerSpark.java new file mode 100644 index 00000000000..4edc6883c50 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleLocusWalkerSpark.java @@ -0,0 +1,82 @@ +package org.broadinstitute.hellbender.tools.examples; + +import htsjdk.variant.variantcontext.VariantContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.broadinstitute.barclay.argparser.Argument; +import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; +import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.cmdline.programgroups.ExampleProgramGroup; +import org.broadinstitute.hellbender.engine.AlignmentContext; +import org.broadinstitute.hellbender.engine.FeatureContext; +import org.broadinstitute.hellbender.engine.FeatureInput; +import org.broadinstitute.hellbender.engine.ReferenceContext; +import org.broadinstitute.hellbender.engine.spark.LocusWalkerContext; +import org.broadinstitute.hellbender.engine.spark.LocusWalkerSpark; +import org.broadinstitute.hellbender.utils.pileup.ReadPileup; + +import java.io.PrintStream; +import java.util.List; + +/** + * Example/toy program that shows how to implement the LocusWalker interface. Prints locus-based coverage from supplied + * reads, and reference bases/overlapping variants if provided + */ +@CommandLineProgramProperties( + summary = "Example tool that prints locus-based coverage from supplied read to the specified output file (stdout if none provided), along with overlapping reference bases/features (if provided)", + oneLineSummary = "Example tool that prints locus-based coverage with optional contextual data", + programGroup = ExampleProgramGroup.class, + omitFromCommandLine = true +) +public final class ExampleLocusWalkerSpark extends LocusWalkerSpark { + private static final long serialVersionUID = 1L; + + @Argument(fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, doc = "Output file (if not provided, defaults to STDOUT)", common = false, optional = true) + private String outputFile = null; + + @Argument(fullName = StandardArgumentDefinitions.VARIANT_LONG_NAME, shortName = StandardArgumentDefinitions.VARIANT_SHORT_NAME, doc = "One or more VCF files", optional = true) + private List> variants; + + private PrintStream outputStream = null; + + + @Override + protected void processAlignments(JavaRDD rdd, JavaSparkContext ctx) { + rdd.map(intervalFunction(variants)).saveAsTextFile(outputFile); + } + + private static Function intervalFunction(List> variants) { + return (Function) context -> { + AlignmentContext alignmentContext = context.getAlignmentContext(); + ReferenceContext referenceContext = context.getReferenceContext(); + FeatureContext featureContext = context.getFeatureContext(); + + StringBuilder sb = new StringBuilder(); + + // Get pileup and counts + ReadPileup pileup = alignmentContext.getBasePileup(); + // print the locus and coverage + sb.append(String.format("Current locus %s:%d (coverage=%s)\n", alignmentContext.getContig(), + alignmentContext.getPosition(), pileup.size())); + // print the reference context if available + if ( referenceContext.hasBackingDataSource() ) { + sb.append("\tReference base(s): " + new String(referenceContext.getBases())); + sb.append("\n"); + } + // print the overlapping variants if there are some + if(featureContext.hasBackingDataSource()) { + List vars = featureContext.getValues(variants); + if(!vars.isEmpty()) { + sb.append("\tOverlapping variant(s):\n"); + for (VariantContext variant : vars) { + sb.append(String.format("\t\t%s:%d-%d, Ref:%s, Alt(s):%s\n", variant.getContig(), variant.getStart(), + variant.getEnd(), variant.getReference(), variant.getAlternateAlleles())); + } + } + } + + return sb.toString(); + }; + } +} diff --git a/src/test/java/org/broadinstitute/hellbender/tools/copynumber/CollectAllelicCountsSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/copynumber/CollectAllelicCountsSparkIntegrationTest.java new file mode 100644 index 00000000000..dfd118582a8 --- /dev/null +++ b/src/test/java/org/broadinstitute/hellbender/tools/copynumber/CollectAllelicCountsSparkIntegrationTest.java @@ -0,0 +1,128 @@ +package org.broadinstitute.hellbender.tools.copynumber; + +import htsjdk.samtools.SAMSequenceDictionary; +import org.broadinstitute.hellbender.CommandLineProgramTest; +import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.engine.ReferenceDataSource; +import org.broadinstitute.hellbender.tools.copynumber.formats.collections.AllelicCountCollection; +import org.broadinstitute.hellbender.tools.copynumber.formats.metadata.SampleLocatableMetadata; +import org.broadinstitute.hellbender.tools.copynumber.formats.metadata.SimpleSampleLocatableMetadata; +import org.broadinstitute.hellbender.tools.copynumber.formats.records.AllelicCount; +import org.broadinstitute.hellbender.utils.Nucleotide; +import org.broadinstitute.hellbender.utils.SimpleInterval; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Arrays; + +/** + * Integration test for {@link CollectAllelicCountsSpark}. Uses a BAM with sites generated from hg19mini using wgsim. + * + * These tests should be identical for {@link CollectAllelicCounts} + * + */ +public final class CollectAllelicCountsSparkIntegrationTest extends CommandLineProgramTest { + private static final File TEST_SUB_DIR = new File(toolsTestDir, "copynumber"); + private static final File NORMAL_BAM_FILE = new File(TEST_SUB_DIR, "collect-allelic-counts-normal.bam"); + private static final File TUMOR_BAM_FILE = new File(TEST_SUB_DIR, "collect-allelic-counts-tumor.bam"); + private static final File SITES_FILE = new File(TEST_SUB_DIR, "collect-allelic-counts-sites.interval_list"); + private static final File REFERENCE_FILE = new File(hg19MiniReference); + + private static final String NORMAL_SAMPLE_NAME_EXPECTED = "20"; + private static final String TUMOR_SAMPLE_NAME_EXPECTED = "20"; + private static final SAMSequenceDictionary SEQUENCE_DICTIONARY = ReferenceDataSource.of(REFERENCE_FILE.toPath()).getSequenceDictionary(); + private static final SampleLocatableMetadata NORMAL_METADATA_EXPECTED = new SimpleSampleLocatableMetadata( + NORMAL_SAMPLE_NAME_EXPECTED, SEQUENCE_DICTIONARY); + + private static final SampleLocatableMetadata TUMOR_METADATA_EXPECTED = new SimpleSampleLocatableMetadata( + TUMOR_SAMPLE_NAME_EXPECTED, SEQUENCE_DICTIONARY); + + @DataProvider(name = "testData") + public Object[][] testData() { + //counts from IGV with minMQ = 30 and minBQ = 20 + final AllelicCountCollection normalCountsExpected = new AllelicCountCollection( + NORMAL_METADATA_EXPECTED, + Arrays.asList( + new AllelicCount(new SimpleInterval("1", 10736, 10736), 0, 0, Nucleotide.G, Nucleotide.N), + new AllelicCount(new SimpleInterval("1", 11522, 11522), 7, 4, Nucleotide.G, Nucleotide.A), + new AllelicCount(new SimpleInterval("1", 12098, 12098), 8, 6, Nucleotide.G, Nucleotide.T), + new AllelicCount(new SimpleInterval("1", 12444, 12444), 0, 18, Nucleotide.T, Nucleotide.C), + new AllelicCount(new SimpleInterval("1", 13059, 13059), 0, 8, Nucleotide.C, Nucleotide.A), + new AllelicCount(new SimpleInterval("1", 14630, 14630), 9, 8, Nucleotide.T, Nucleotide.G), + new AllelicCount(new SimpleInterval("1", 15204, 15204), 4, 4, Nucleotide.C, Nucleotide.A), + new AllelicCount(new SimpleInterval("2", 14689, 14689), 6, 9, Nucleotide.T, Nucleotide.G), + new AllelicCount(new SimpleInterval("2", 14982, 14982), 6, 5, Nucleotide.G, Nucleotide.C), + new AllelicCount(new SimpleInterval("2", 15110, 15110), 6, 0, Nucleotide.G, Nucleotide.N), + new AllelicCount(new SimpleInterval("2", 15629, 15629), 5, 3, Nucleotide.T, Nucleotide.A))); + + final AllelicCountCollection tumorCountsExpected = new AllelicCountCollection( + TUMOR_METADATA_EXPECTED, + Arrays.asList( + new AllelicCount(new SimpleInterval("1", 10736, 10736), 0, 0, Nucleotide.G, Nucleotide.N), + new AllelicCount(new SimpleInterval("1", 11522, 11522), 7, 4, Nucleotide.G, Nucleotide.A), + new AllelicCount(new SimpleInterval("1", 12098, 12098), 8, 6, Nucleotide.G, Nucleotide.T), + new AllelicCount(new SimpleInterval("1", 12444, 12444), 0, 17, Nucleotide.T, Nucleotide.C), + new AllelicCount(new SimpleInterval("1", 13059, 13059), 0, 8, Nucleotide.C, Nucleotide.A), + new AllelicCount(new SimpleInterval("1", 14630, 14630), 9, 8, Nucleotide.T, Nucleotide.G), + new AllelicCount(new SimpleInterval("1", 15204, 15204), 4, 3, Nucleotide.C, Nucleotide.A), + new AllelicCount(new SimpleInterval("2", 14689, 14689), 6, 9, Nucleotide.T, Nucleotide.G), + new AllelicCount(new SimpleInterval("2", 14982, 14982), 6, 5, Nucleotide.G, Nucleotide.C), + new AllelicCount(new SimpleInterval("2", 15110, 15110), 6, 0, Nucleotide.G, Nucleotide.N), + new AllelicCount(new SimpleInterval("2", 15629, 15629), 5, 3, Nucleotide.T, Nucleotide.A))); + + //counts from IGV with minMQ = 30 and minBQ = 20, without nucleotides + final AllelicCountCollection normalCountsExpectedWithoutNucleotides = new AllelicCountCollection( + NORMAL_METADATA_EXPECTED, + Arrays.asList( + new AllelicCount(new SimpleInterval("1", 10736, 10736), 0, 0), + new AllelicCount(new SimpleInterval("1", 11522, 11522), 7, 4), + new AllelicCount(new SimpleInterval("1", 12098, 12098), 8, 6), + new AllelicCount(new SimpleInterval("1", 12444, 12444), 0, 18), + new AllelicCount(new SimpleInterval("1", 13059, 13059), 0, 8), + new AllelicCount(new SimpleInterval("1", 14630, 14630), 9, 8), + new AllelicCount(new SimpleInterval("1", 15204, 15204), 4, 4), + new AllelicCount(new SimpleInterval("2", 14689, 14689), 6, 9), + new AllelicCount(new SimpleInterval("2", 14982, 14982), 6, 5), + new AllelicCount(new SimpleInterval("2", 15110, 15110), 6, 0), + new AllelicCount(new SimpleInterval("2", 15629, 15629), 5, 3))); + + final AllelicCountCollection tumorCountsExpectedWithoutNucleotides = new AllelicCountCollection( + TUMOR_METADATA_EXPECTED, + Arrays.asList( + new AllelicCount(new SimpleInterval("1", 10736, 10736), 0, 0), + new AllelicCount(new SimpleInterval("1", 11522, 11522), 7, 4), + new AllelicCount(new SimpleInterval("1", 12098, 12098), 8, 6), + new AllelicCount(new SimpleInterval("1", 12444, 12444), 0, 17), + new AllelicCount(new SimpleInterval("1", 13059, 13059), 0, 8), + new AllelicCount(new SimpleInterval("1", 14630, 14630), 9, 8), + new AllelicCount(new SimpleInterval("1", 15204, 15204), 4, 3), + new AllelicCount(new SimpleInterval("2", 14689, 14689), 6, 9), + new AllelicCount(new SimpleInterval("2", 14982, 14982), 6, 5), + new AllelicCount(new SimpleInterval("2", 15110, 15110), 6, 0), + new AllelicCount(new SimpleInterval("2", 15629, 15629), 5, 3))); + + return new Object[][]{ + {NORMAL_BAM_FILE, normalCountsExpected}, + {TUMOR_BAM_FILE, tumorCountsExpected}, + {NORMAL_BAM_FILE, normalCountsExpectedWithoutNucleotides}, + {TUMOR_BAM_FILE, tumorCountsExpectedWithoutNucleotides} + }; + } + + @Test(dataProvider = "testData") + public void test(final File inputBAMFile, + final AllelicCountCollection countsExpected) { + final File outputFile = createTempFile("collect-allelic-counts-test-output", ".tsv"); + final String[] arguments = { + "-" + StandardArgumentDefinitions.INPUT_SHORT_NAME, inputBAMFile.getAbsolutePath(), + "-L", SITES_FILE.getAbsolutePath(), + "-" + StandardArgumentDefinitions.REFERENCE_SHORT_NAME, REFERENCE_FILE.getAbsolutePath(), + "-" + StandardArgumentDefinitions.OUTPUT_SHORT_NAME, outputFile.getAbsolutePath() + }; + runCommandLine(arguments); + final AllelicCountCollection countsResult = new AllelicCountCollection(outputFile); + Assert.assertEquals(countsResult.getRecords(), countsExpected.getRecords()); + } +} \ No newline at end of file diff --git a/src/test/java/org/broadinstitute/hellbender/tools/examples/ExampleLocusWalkerSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/examples/ExampleLocusWalkerSparkIntegrationTest.java new file mode 100644 index 00000000000..ac1082d41d6 --- /dev/null +++ b/src/test/java/org/broadinstitute/hellbender/tools/examples/ExampleLocusWalkerSparkIntegrationTest.java @@ -0,0 +1,34 @@ +package org.broadinstitute.hellbender.tools.examples; + +import org.broadinstitute.hellbender.CommandLineProgramTest; +import org.broadinstitute.hellbender.testutils.ArgumentsBuilder; +import org.broadinstitute.hellbender.testutils.IntegrationTestSpec; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; + +public final class ExampleLocusWalkerSparkIntegrationTest extends CommandLineProgramTest { + private static final String TEST_DATA_DIRECTORY = publicTestDir + "org/broadinstitute/hellbender/engine/"; + private static final String TEST_OUTPUT_DIRECTORY = exampleTestDir; + + @Test + public void testExampleLocusWalker() throws IOException { + final File out = File.createTempFile("out", ".txt"); + out.delete(); + out.deleteOnExit(); + final ArgumentsBuilder args = new ArgumentsBuilder(); + args.add("-L 1"); + args.add("--input"); + args.add(TEST_DATA_DIRECTORY + "reads_data_source_test1.bam"); + args.add("-V"); + args.add(TEST_DATA_DIRECTORY + "feature_data_source_test.vcf"); + args.add("--output"); + args.add(out.getAbsolutePath()); + args.add("--reference"); + args.add(hg19MiniReference); + this.runCommandLine(args.getArgsArray()); + File expected = new File(TEST_OUTPUT_DIRECTORY, "expected_ExampleLocusWalkerIntegrationTest_output.txt"); + IntegrationTestSpec.assertEqualTextFiles(new File(out, "part-00000"), expected); + } +}