Skip to content

Commit

Permalink
LocusWalkerSpark drops intervals with no reads (#5222)
Browse files Browse the repository at this point in the history
* Spark version of collect allelic counts spark.

* Add ExampleLocusWalkerSpark.java and test
  • Loading branch information
tomwhite authored Oct 8, 2018
1 parent 82d1d82 commit 158f7f7
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.broadinstitute.hellbender.tools.copynumber;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.cmdline.programgroups.CopyNumberProgramGroup;
import org.broadinstitute.hellbender.engine.filters.MappingQualityReadFilter;
import org.broadinstitute.hellbender.engine.filters.ReadFilter;
import org.broadinstitute.hellbender.engine.spark.LocusWalkerContext;
import org.broadinstitute.hellbender.engine.spark.LocusWalkerSpark;
import org.broadinstitute.hellbender.tools.copynumber.datacollection.AllelicCountCollector;
import org.broadinstitute.hellbender.tools.copynumber.formats.metadata.*;
import org.broadinstitute.hellbender.utils.Nucleotide;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/**
* See {@link CollectAllelicCounts}. This behaves the same, except that it supports spark.
*/
@CommandLineProgramProperties(
summary = "Collects ref/alt counts at sites.",
oneLineSummary = "Collects ref/alt counts at sites.",
programGroup = CopyNumberProgramGroup.class
)
public class CollectAllelicCountsSpark extends LocusWalkerSpark {

private static final long serialVersionUID = 1L;

private static final Logger logger = LogManager.getLogger(CollectAllelicCounts.class);

@Argument(
doc = "Output allelic-counts file.",
fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME,
shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME
)
private File outputAllelicCountsFile;

@Argument(
doc = "Minimum base quality; base calls with lower quality will be filtered out of pileup.",
fullName = "minimumBaseQuality",
shortName = "minBQ",
minValue = 0,
optional = true
)
private int minimumBaseQuality = 20;

private static final int DEFAULT_MINIMUM_MAPPING_QUALITY = 30;

@Override
public boolean emitEmptyLoci() {return true;}

@Override
public boolean requiresReference() {return true;}

@Override
public boolean requiresIntervals() {return true;}

@Override
protected void processAlignments(JavaRDD<LocusWalkerContext> rdd, JavaSparkContext ctx) {
final SampleLocatableMetadata metadata = MetadataUtils.fromHeader(getHeaderForReads(), Metadata.Type.SAMPLE_LOCATABLE);
final Broadcast<SampleLocatableMetadata> sampleMetadataBroadcast = ctx.broadcast(metadata);

final AllelicCountCollector finalAllelicCountCollector =
rdd.mapPartitions(distributedCount(sampleMetadataBroadcast, minimumBaseQuality))
.reduce((a1, a2) -> combineAllelicCountCollectors(a1, a2, sampleMetadataBroadcast.getValue()));
finalAllelicCountCollector.getAllelicCounts().write(outputAllelicCountsFile);
}

private static FlatMapFunction<Iterator<LocusWalkerContext>, AllelicCountCollector> distributedCount(final Broadcast<SampleLocatableMetadata> sampleMetadataBroadcast,
final int minimumBaseQuality) {
return (FlatMapFunction<Iterator<LocusWalkerContext>, AllelicCountCollector>) contextIterator -> {
final AllelicCountCollector result = new AllelicCountCollector(sampleMetadataBroadcast.getValue());

contextIterator.forEachRemaining( ctx -> {
final byte refAsByte = ctx.getReferenceContext().getBase();
result.collectAtLocus(Nucleotide.decode(refAsByte), ctx.getAlignmentContext().getBasePileup(),
ctx.getAlignmentContext().getLocation(), minimumBaseQuality);
}
);
return Collections.singletonList(result).iterator();
};
}

private static AllelicCountCollector combineAllelicCountCollectors(final AllelicCountCollector allelicCountCollector1,
final AllelicCountCollector allelicCountCollector2,
final SampleLocatableMetadata sampleMetadata) {
return AllelicCountCollector.combine(allelicCountCollector1, allelicCountCollector2, sampleMetadata);
}

@Override
public List<ReadFilter> getDefaultReadFilters() {
final List<ReadFilter> initialReadFilters = new ArrayList<>(super.getDefaultReadFilters());
initialReadFilters.add(new MappingQualityReadFilter(DEFAULT_MINIMUM_MAPPING_QUALITY));

return initialReadFilters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,29 @@ private static Nucleotide inferAltFromPileupBaseCounts(final Nucleotide.Counter
.sorted((b1, b2) -> Long.compare(baseCounts.get(b2), baseCounts.get(b1)))
.findFirst().get();
}

/**
* Reminder that any additional information used through this method will not be able to enforce the minBaseQuality.
*
* @param allelicCountCollector input data to combine with this.
*/
public void collectFromCollector(final AllelicCountCollector allelicCountCollector) {
if (allelicCountCollector != null) {
this.allelicCounts.addAll(allelicCountCollector.getAllelicCounts().getRecords());
}
}

/** TODO: Docs and input parameter checking
*
* @param allelicCountCollector1
* @param allelicCountCollector2
* @return a new allelic count collector with the combined contents of the two inputs
*/
public static AllelicCountCollector combine(final AllelicCountCollector allelicCountCollector1, final AllelicCountCollector allelicCountCollector2,
final SampleLocatableMetadata sampleMetadata) {
final AllelicCountCollector result = new AllelicCountCollector(sampleMetadata);
result.collectFromCollector(allelicCountCollector1);
result.collectFromCollector(allelicCountCollector2);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import htsjdk.samtools.SAMReadGroupRecord;
import org.broadinstitute.hellbender.utils.Utils;

import java.io.Serializable;

/**
* Metadata associated with a single sample.
*
* @author Samuel Lee &lt;[email protected]&gt;
*/
public class SimpleSampleMetadata implements SampleMetadata {
public class SimpleSampleMetadata implements SampleMetadata, Serializable {

private static final long serialVersionUID = 0L;

private final String sampleName;

public SimpleSampleMetadata(final String sampleName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.broadinstitute.hellbender.tools.examples;

import htsjdk.variant.variantcontext.VariantContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.cmdline.programgroups.ExampleProgramGroup;
import org.broadinstitute.hellbender.engine.AlignmentContext;
import org.broadinstitute.hellbender.engine.FeatureContext;
import org.broadinstitute.hellbender.engine.FeatureInput;
import org.broadinstitute.hellbender.engine.ReferenceContext;
import org.broadinstitute.hellbender.engine.spark.LocusWalkerContext;
import org.broadinstitute.hellbender.engine.spark.LocusWalkerSpark;
import org.broadinstitute.hellbender.utils.pileup.ReadPileup;

import java.io.PrintStream;
import java.util.List;

/**
* Example/toy program that shows how to implement the LocusWalker interface. Prints locus-based coverage from supplied
* reads, and reference bases/overlapping variants if provided
*/
@CommandLineProgramProperties(
summary = "Example tool that prints locus-based coverage from supplied read to the specified output file (stdout if none provided), along with overlapping reference bases/features (if provided)",
oneLineSummary = "Example tool that prints locus-based coverage with optional contextual data",
programGroup = ExampleProgramGroup.class,
omitFromCommandLine = true
)
public final class ExampleLocusWalkerSpark extends LocusWalkerSpark {
private static final long serialVersionUID = 1L;

@Argument(fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, doc = "Output file (if not provided, defaults to STDOUT)", common = false, optional = true)
private String outputFile = null;

@Argument(fullName = StandardArgumentDefinitions.VARIANT_LONG_NAME, shortName = StandardArgumentDefinitions.VARIANT_SHORT_NAME, doc = "One or more VCF files", optional = true)
private List<FeatureInput<VariantContext>> variants;

private PrintStream outputStream = null;


@Override
protected void processAlignments(JavaRDD<LocusWalkerContext> rdd, JavaSparkContext ctx) {
rdd.map(intervalFunction(variants)).saveAsTextFile(outputFile);
}

private static Function<LocusWalkerContext, String> intervalFunction(List<FeatureInput<VariantContext>> variants) {
return (Function<LocusWalkerContext, String>) context -> {
AlignmentContext alignmentContext = context.getAlignmentContext();
ReferenceContext referenceContext = context.getReferenceContext();
FeatureContext featureContext = context.getFeatureContext();

StringBuilder sb = new StringBuilder();

// Get pileup and counts
ReadPileup pileup = alignmentContext.getBasePileup();
// print the locus and coverage
sb.append(String.format("Current locus %s:%d (coverage=%s)\n", alignmentContext.getContig(),
alignmentContext.getPosition(), pileup.size()));
// print the reference context if available
if ( referenceContext.hasBackingDataSource() ) {
sb.append("\tReference base(s): " + new String(referenceContext.getBases()));
sb.append("\n");
}
// print the overlapping variants if there are some
if(featureContext.hasBackingDataSource()) {
List<VariantContext> vars = featureContext.getValues(variants);
if(!vars.isEmpty()) {
sb.append("\tOverlapping variant(s):\n");
for (VariantContext variant : vars) {
sb.append(String.format("\t\t%s:%d-%d, Ref:%s, Alt(s):%s\n", variant.getContig(), variant.getStart(),
variant.getEnd(), variant.getReference(), variant.getAlternateAlleles()));
}
}
}

return sb.toString();
};
}
}
Loading

0 comments on commit 158f7f7

Please sign in to comment.