Skip to content

Commit

Permalink
Merge overlapped paired end reads
Browse files Browse the repository at this point in the history
  • Loading branch information
rhinempi committed Sep 13, 2023
1 parent 72e8c55 commit a214966
Show file tree
Hide file tree
Showing 28 changed files with 5,923 additions and 6,257 deletions.
28 changes: 17 additions & 11 deletions bin/reflexiv
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#-----------------------------------------------------------------------------

name="Reflexiv"
version="0.8"
spark_version="2.0.0" # only for auto downloading Spark package
version="1.0"
spark_version="3.2.1" # only for auto downloading Spark package

readlink -f 1>/dev/null 2>/dev/null
readlinkReturn=$?
Expand All @@ -48,7 +48,7 @@ SH_SBIN=${SH_SBIN:-$SH_HOME/sbin}
SH_OPT=""
SH_URL="https://github.com/rhinempi/reflexiv/archive/latest.zip"

SPARK_URL="http://d3kbcqa49mib13.cloudfront.net/spark-$spark_version-bin-hadoop2.6.tgz" # 1.6.0 version
SPARK_URL="https://archive.apache.org/dist/spark/spark-$spark_version/spark-$spark_version-bin-hadoop3.2.tgz" # 1.6.0 version
SPARK_CMD="" # in case no spark home was found, please manually paste the "spark-submit" file path
SPARK_OPT=""

Expand Down Expand Up @@ -80,6 +80,8 @@ function die() {

function get_spark() {
if command -v wget &>/dev/null; then
echo $1
echo $2
GET="wget -q $1 -O $2"
elif command -v curl &>/dev/null; then
GET="curl $1 -o $2"
Expand Down Expand Up @@ -158,12 +160,12 @@ fi
SPARK_CMD="$(which spark-submit)"
elif [ -x "$SPARK_CMD" ]; then
continue
elif [ -x "$SH_PACKAGE/spark-$spark_version-bin-hadoop2.6/bin/spark-submit" ]; then
SPARK_CMD="$SH_PACKAGE/spark-$spark_version-bin-hadoop2.6/bin/spark-submit"
elif [ -x "$SH_PACKAGE/spark-$spark_version-bin-hadoop3.2/bin/spark-submit" ]; then
SPARK_CMD="$SH_PACKAGE/spark-$spark_version-bin-hadoop3.2/bin/spark-submit"
else
get_spark "$SPARK_URL" "$SH_PACKAGE/spark-$spark_version-bin-hadoop2.6.tgz"
untar_spark "$SH_PACKAGE/spark-$spark_version-bin-hadoop2.6.tgz" "$SH_PACKAGE"
SPARK_CMD= "$SH_PACKAGE/spark-$spark_version-bin-hadoop2.6/bin/spark-submit"
get_spark "$SPARK_URL" "$SH_PACKAGE/spark-$spark_version-bin-hadoop3.2.tgz"
untar_spark "$SH_PACKAGE/spark-$spark_version-bin-hadoop3.2.tgz" "$SH_PACKAGE"
SPARK_CMD= "$SH_PACKAGE/spark-$spark_version-bin-hadoop3.2/bin/spark-submit"
fi

# Verify reflexiv jar is available
Expand All @@ -182,6 +184,8 @@ function dump_help() {
echo " run Run the entire assembly pipeline"
echo " counter counting Kmer frequency"
echo " reassembler re-assemble and extend genome fragments"
echo " meta assemble metagenomes"
echo " reduce Dynamic reduction of k-mers"
echo ""
echo "Type each command to view its options, eg. Usage: ./reflexiv run"
echo ""
Expand Down Expand Up @@ -218,7 +222,7 @@ function parse_param() {
if [[ ${args[$i+1]} == -* ]]; then
SPARK_OPT+="${args[$i]} "
else
SPARK_OPT+="${args[$i]} ${args[$i+1]} "
SPARK_OPT+="${args[$i]} \"${args[$i+1]}\" "
fi
elif [[ ${args[$i]} == -* ]]; then
if [[ ${args[$i+1]} == -* ]]; then
Expand Down Expand Up @@ -259,17 +263,19 @@ elif [[ ${MODULE} == "merger" ]]; then
mainClass="uni.bielefeld.cmg.reflexiv.main.MainOfMerger"
elif [[ ${MODULE} == "mercy" ]]; then
mainClass="uni.bielefeld.cmg.reflexiv.main.MainOfMercy"
elif [[ ${MODULE} == "preprocess" ]]; then
mainClass="uni.bielefeld.cmg.reflexiv.main.MainOfPreProcessing"
else
dump_help
exit 1;
fi

# Assemble the command line
cmdline="$SPARK_CMD $SPARK_OPT --class $mainClass $SH_JAR $SH_OPT"
cmdline="$SPARK_CMD $SPARK_OPT --jars $SH_LIB/hadoop-4mc-3.0.0.jar --files $SH_SBIN/flash --class $mainClass $SH_JAR $SH_OPT"

# launch command
function launch_reflexiv() {
exec bash -c "exec $cmdline"
}

launch_reflexiv
launch_reflexiv
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MainMeta {
*/
public static void main(String[] args) throws IOException {
InfoDumper info = new InfoDumper();
info.readParagraphedMessages("Reflexiv main initiating ... \ninterpreting parameters.");
info.readParagraphedMessages("Reflexiv assembly initiating ... \ninterpreting parameters.");
info.screenDump();

Parameter parameter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MainOfMercy {
*/
public static void main(String[] args){
InfoDumper info = new InfoDumper();
info.readParagraphedMessages("Reflexiv main initiating ... \ninterpreting parameters.");
info.readParagraphedMessages("Reflexiv mercy-kmer initiating ... \ninterpreting parameters.");
info.screenDump();

Parameter parameter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MainOfMerger {
*/
public static void main(String[] args){
InfoDumper info = new InfoDumper();
info.readParagraphedMessages("Reflexiv main initiating ... \ninterpreting parameters.");
info.readParagraphedMessages("Reflexiv contig merger initiating ... \ninterpreting parameters.");
info.screenDump();

Parameter parameter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class MainOfPreProcessing {
*/
public static void main(String[] args) throws IOException {
InfoDumper info = new InfoDumper();
info.readParagraphedMessages("Reflexiv main initiating ... \ninterpreting parameters.");
info.readParagraphedMessages("Reflexiv reads preprocessing initiating ... \ninterpreting parameters.");
info.screenDump();

Parameter parameter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MainOfReduce {
*/
public static void main(String[] args) throws IOException {
InfoDumper info = new InfoDumper();
info.readParagraphedMessages("Reflexiv main initiating ... \ninterpreting parameters.");
info.readParagraphedMessages("Reflexiv k-mer reduction initiating ... \ninterpreting parameters.");
info.screenDump();

Parameter parameter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MainOfStitch {
*/
public static void main(String[] args) throws IOException {
InfoDumper info = new InfoDumper();
info.readParagraphedMessages("Reflexiv main initiating ... \ninterpreting parameters.");
info.readParagraphedMessages("Reflexiv contig stitching initiating ... \ninterpreting parameters.");
info.screenDump();

Parameter parameter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ public void assembly(){
JavaPairRDD<Long, Tuple4<Integer, Long[], Integer, Integer>> ReflexivLongSubKmerRDD;

JavaPairRDD<String, Tuple4<Integer, String, Integer, Integer>> ReflexivSubKmerStringRDD; // Generates strings, for testing
// JavaPairRDD<String, Tuple4<Integer, String, Integer, Integer>> ForwardSubKmerRDD;
// JavaPairRDD<String, Tuple4<Integer, String, Integer, Integer>> ReflectedSubKmerRDD;

JavaPairRDD<String, String> ContigTuple2RDD;
JavaPairRDD<Tuple2<String, String>, Long> ContigTuple2IndexRDD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,7 @@ public void assembly() {
kmerCountTupleStruct = kmerCountTupleStruct.add("kmerBlocks", DataTypes.createArrayType(DataTypes.LongType), false);
kmerCountTupleStruct = kmerCountTupleStruct.add("count", DataTypes.IntegerType, false);
ExpressionEncoder<Row> KmerBinaryCountEncoder = RowEncoder.apply(kmerCountTupleStruct);
/*
StructType kmerBinaryStruct = new StructType();
kmerBinaryStruct = kmerBinaryStruct.add("kmerBlocks", DataTypes.createArrayType(DataTypes.LongType), false);
kmerBinaryStruct = kmerBinaryStruct.add("count", DataTypes.IntegerType, false);
ExpressionEncoder<Row> kmerBinaryEncoder = RowEncoder.apply(kmerBinaryStruct);
*/

Dataset<Row> ReflexivSubKmerDS;
StructType ReflexivKmerStruct = new StructType();
ReflexivKmerStruct = ReflexivKmerStruct.add("k-1", DataTypes.createArrayType(DataTypes.LongType), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ public void assemblyFromKmer() {
ReflexivFixingKmerStruct= ReflexivFixingKmerStruct.add("extension", DataTypes.createArrayType(DataTypes.LongType), false);
ExpressionEncoder<Row> ReflexivFixingKmerEndocer = RowEncoder.apply(ReflexivFixingKmerStruct);

StructType ReflexivLongKmerStructCompressed = new StructType();
ReflexivLongKmerStructCompressed= ReflexivLongKmerStructCompressed.add("k-1", DataTypes.createArrayType(DataTypes.LongType), false);
ReflexivLongKmerStructCompressed= ReflexivLongKmerStructCompressed.add("attribute", DataTypes.LongType, false);
ReflexivLongKmerStructCompressed= ReflexivLongKmerStructCompressed.add("extension", DataTypes.createArrayType(DataTypes.LongType), false);
ExpressionEncoder<Row> ReflexivLongSubKmerEncoderCompressed = RowEncoder.apply(ReflexivLongKmerStructCompressed);

StructType ContigLongKmerStringStruct = new StructType();
ContigLongKmerStringStruct = ContigLongKmerStringStruct.add("ID", DataTypes.StringType, false);
ContigLongKmerStringStruct = ContigLongKmerStringStruct.add("contig", DataTypes.StringType, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ public void assemblyFromKmer() {
DynamicKmerBinarizerFromReducedToSubKmer ReducedKmerToSubKmer= new DynamicKmerBinarizerFromReducedToSubKmer();
ReflexivLongSubKmerDS = KmerCountDS.mapPartitions(ReducedKmerToSubKmer, ReflexivLongSubKmerEncoderCompressed);

// DSkmerRandomReflection DSrandomizeSubKmer = new DSkmerRandomReflection();
// ReflexivSubKmerDS = ReflexivSubKmerDS.mapPartitions(DSrandomizeSubKmer, ReflexivSubKmerEncoderCompressed);

DSExtendReflexivKmerToArrayLoop DSKmerExtenstionArrayToArray = new DSExtendReflexivKmerToArrayLoop();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,6 @@ private Hashtable<List<Long>, Integer> SubKmerProbRowToHash(List<Row> s){
*
*/
public void assemblyFromKmer() throws IOException {
/* SparkConf conf = setSparkConfiguration();
info.readMessage("Initiating Spark context ...");
info.screenDump();
info.readMessage("Start Spark framework");
info.screenDump();
JavaSparkContext sc = new JavaSparkContext(conf);
*/

SparkSession spark = setSparkSessionConfiguration(param.shufflePartition);

info.readMessage("Initiating Spark SQL context ...");
Expand Down Expand Up @@ -233,12 +225,6 @@ public void assemblyFromKmer() throws IOException {
FastqDSTuple = spark.createDataset(FastqIndex.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.LONG()));

FastqDSTuple.persist(StorageLevel.DISK_ONLY());
/*
FastqDSTuple.write().
mode(SaveMode.Overwrite).
format("csv").
option("compression", "gzip").save(param.outputPath + "/Assembly_intermediate/ZippedFastqForDebug");
*/

ReverseComplementKmerBinaryExtractionFromDataset DSExtractRCKmerBinaryFromFastq = new ReverseComplementKmerBinaryExtractionFromDataset();

Expand Down Expand Up @@ -271,9 +257,6 @@ public void assemblyFromKmer() throws IOException {
ContigSeedDS = ContigSeedDS.union(ReadSeedDS);

ContigSeedDS = ContigSeedDS.sort("seed");
// long contigSeedPartition = ContigSeedDS.javaRDD().getNumPartitions();
// long contigSeedSize = ContigSeedDS.javaRDD().count();
// System.out.println("ContigSeed partition: " + contigSeedPartition + " and count: " + contigSeedSize);

Dataset<Row> RACpairDS;
StructType RACPairStruct = new StructType();
Expand All @@ -287,10 +270,6 @@ public void assemblyFromKmer() throws IOException {

RACpairDS= RACpairDS.sort("read", "contig");

// long RACpairPartition = RACpairDS.javaRDD().getNumPartitions();
// long RACpairSize = RACpairDS.javaRDD().count();
// System.out.println("RACpair partitions: " + RACpairPartition + " and count: " + RACpairSize);

Dataset<Row> CCPairDS;
StructType CCPairStruct = new StructType();
CCPairStruct = CCPairStruct.add("left", DataTypes.LongType, false);
Expand All @@ -301,9 +280,6 @@ public void assemblyFromKmer() throws IOException {

CreatCCPairs matchContigToContig = new CreatCCPairs();
CCPairDS = RACpairDS.mapPartitions(matchContigToContig, CCPairEncoder);
// long ccpairPartition1 = CCPairDS.javaRDD().getNumPartitions();
// long ccpairSize1 = CCPairDS.javaRDD().count();
// System.out.println("ccpair partitions 1: " + ccpairPartition1 + " and count: " + ccpairSize1);

CCPairDS = CCPairDS.sort("left", "right");

Expand All @@ -319,9 +295,6 @@ public void assemblyFromKmer() throws IOException {
CCPairDS= CCPairDS.mapPartitions(filterForCCpair,CCPairEncoderCount);

CCPairDS=CCPairDS.sort(col("right").asc(), col("count").desc());
// long ccpairPartition = CCPairDS.javaRDD().getNumPartitions();
// long ccpairSize = CCPairDS.javaRDD().count();
// System.out.println("ccpair partitions 2: " + ccpairPartition + " and count: " + ccpairSize);

Dataset<Row> MarkedReads;
StructType CCNetStruct = new StructType();
Expand All @@ -340,10 +313,6 @@ public void assemblyFromKmer() throws IOException {

MarkedReads = MarkedReads.sort("read");

// long readPartitions= MarkedReads.javaRDD().getNumPartitions();
// long readSize = MarkedReads.javaRDD().count();
// System.out.println("read partitions:" + readPartitions + " and count: " + readSize);

Dataset<Row> CCNetWithSeq;
StructType ContigSeqStruct = new StructType();
ContigSeqStruct = ContigSeqStruct.add("ID", DataTypes.LongType, false);
Expand All @@ -356,10 +325,6 @@ public void assemblyFromKmer() throws IOException {
CCNetWithSeq = CCNetWithSeq.union(markerTupleRow);
CCNetWithSeq= CCNetWithSeq.sort("ID");

// long ccnetParitions = CCNetWithSeq.javaRDD().getNumPartitions();
// long ccnetSize = CCNetWithSeq.javaRDD().getNumPartitions();
// System.out.println("ccnet partitions:" + ccnetParitions + " and count: " + ccnetSize);

Dataset<Row> reflexivKmer;
StructType ReflexivLongKmerStructCompressed = new StructType();
ReflexivLongKmerStructCompressed= ReflexivLongKmerStructCompressed.add("k-1", DataTypes.createArrayType(DataTypes.LongType), false);
Expand Down
Loading

0 comments on commit a214966

Please sign in to comment.