diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java new file mode 100644 index 00000000..1288bcad --- /dev/null +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -0,0 +1,30 @@ +package au.csiro.variantspark.utils; + +import java.io.*; +import java.util.zip.GZIPInputStream; +import java.io.IOException; +import htsjdk.samtools.util.BlockCompressedInputStream; + +public class FileUtils { + + /** + * + * @param file: an input file + * @return true if input file is BGZIP by check the first two byte of input file + */ + public static boolean isBGZFile(String filePath) { + /** + * .vcf.bgz is type of GZP file, work well with BlockCompressedInputStream + * .vcf.gz is also GZP file but get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) + * .vcf.bz2 is not GZP file and get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) + * .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) + */ + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(filePath))) { + boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); + return isValid; + } catch (IOException e) { + //handle exception for non proper bgzip file + return false; + } + } +} diff --git a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala index 3d9f6641..1eac4ecc 100644 --- a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala +++ b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala @@ -2,7 +2,11 @@ package au.csiro.variantspark.cli.args import org.kohsuke.args4j.Option import au.csiro.pbdava.ssparkle.spark.SparkApp +import au.csiro.variantspark.utils.FileUtils import org.apache.spark.rdd.RDD +import htsjdk.samtools.util.BlockCompressedInputStream +import org.apache.hadoop.fs.Path +import java.io.File trait SparkArgs extends SparkApp { @@ -10,7 +14,20 @@ trait SparkArgs extends SparkApp { aliases = Array("--spark-par")) val sparkPar: Int = 0 - def textFile(inputFile: String): RDD[String] = - sc.textFile(inputFile, if (sparkPar > 0) sparkPar else sc.defaultParallelism) - + def textFile(inputFile: String): RDD[String] = { + val isBGZ = FileUtils.isBGZFile(inputFile) + println(inputFile + " is loading to spark RDD, isBGZFile: " + isBGZ) + if (isBGZ) { + // BGZIP file is compressed as blocks, requires specialized libraries htsjdk + val path = new Path(inputFile) + val fs = path.getFileSystem(sc.hadoopConfiguration) + val bgzInputStream = new BlockCompressedInputStream(fs.open(path)) + // each blocks can be decompressed independently and to be read in parallel + sc.parallelize(Stream.continually(bgzInputStream.readLine()).takeWhile(_ != null).toList) + } else { + // The standard GZIP libraries can handle files compressed as a whole + // load .vcf, .vcf.gz or .vcf.bz2 to RDD + sc.textFile(inputFile, if (sparkPar > 0) sparkPar else sc.defaultParallelism) + } + } }