-
Notifications
You must be signed in to change notification settings - Fork 31
Update to handle API reading and optimize sharded writing and indexing #165
Conversation
Hi Ilia, Looks really nice, but I don't have time to test it now - maybe a little later. Just a few things in the meantime, in order to get the dependencies to the latest versions: https://github.com/iliat/dataflow-java/blob/master/pom.xml#L72 -> s/1.1.0/1.4.0 https://github.com/iliat/dataflow-java/blob/master/pom.xml#L117 -> s/v1beta2-rev25-1.19.1/v1-rev56-1.21.0 https://github.com/iliat/dataflow-java/blob/master/pom.xml#L130 -> s/v1beta2-0.36/v1beta2-0.39 https://github.com/iliat/dataflow-java/blob/master/pom.xml#L194 -> s/1.128/2.1.0 https://github.com/iliat/dataflow-java/blob/master/pom.xml#L210 -> s/3.0.0-beta-1/3.0.0-beta-2 A re-test after these changes might not be a bad thing to perform, just to be sure everything passes. Let me know what you think. Thanks, |
static class DummyMapFn<T> extends DoFn<T, KV<T, Integer>> { | ||
@Override | ||
public void processElement(DoFn<T, KV<T, Integer>>.ProcessContext c) throws Exception { | ||
c.output( KV.of(c.element(), 42)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a nit, perhaps move 42 to a DUMMY_VALUE constant?
add a link to https://cloud.google.com/dataflow/service/dataflow-service-desc#Optimization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea, DONE.
@iliat this looks really awesome. LGTM Great speed up! I assume mvn verify passed all integration tests. Nice cleanup on the file names too. Do you think some of this code is useful outside of the context of dataflow? If so, some other time it would be nice to move it elsewhere. |
import com.google.cloud.dataflow.sdk.values.PCollection; | ||
|
||
/* | ||
* Breaks DataFlow fusion by doing GroubByKey/Ungroup that forces materialization of the data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/GroubByKey/GroupByKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
private void writeNullContent() { | ||
codec.writeLong(0); // 0 bins , 0 intv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does intv
mean interval? It might be nice to expand on the comment here and why this is necessary, even if folks reading through the code will eventually figure it out.
@iliat Everything I can understand looks good ;-) |
@deflaux @pgrosu @jakeakopp Thanks for the review, I addressed most of the comments and will upload the version with these changes once I do a bit more validation beyond basic checks. |
9fa7583
to
239b709
Compare
@deflaux - I can finally update the PR with a version that:
|
239b709
to
e09a973
Compare
Dude, there's some minor error in your JavaDoc - below is the link to the log for JDK8: https://s3.amazonaws.com/archive.travis-ci.org/jobs/110838148/log.txt Here's the start of the errors:
|
@pgrosu Yeah, on it :) |
Ah, cool man :) |
e09a973
to
2878b9e
Compare
ret = compareReadGroups(h1, h2) && ret; | ||
ret = compareProgramRecords(h1, h2) && ret; | ||
return ret; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Way too cryptic! Try the following instead free of charge :)
private boolean compareHeaders(SAMFileHeader h1, SAMFileHeader h2) throws Exception {
if ( !compareSequenceDictionaries(h1, h2) ) {
return false;
} else if (
compareValues(h1.getCreator(), h2.getCreator(), "File creator") &&
compareValues(h1.getAttribute("SO"), h2.getAttribute("SO"), "Sort order") &&
compareReadGroups(h1, h2) &&
compareProgramRecords(h1, h2) ) {
if ( !options.ignoreFileFormatVersion ) {
return compareValues(h1.getVersion(), h2.getVersion(), "File format version");
} else {
return true;
}
} else {
return false;
}
}
2878b9e
to
9df5a43
Compare
PTAL @deflaux |
LGTM @iliat Again, really nice work here. Please file issues for stuff to be done in future PRs. Thanks!!! |
Update to handle API reading and optimize sharded writing and indexing
It implements sharded index writing and removes GBK stage in writing, relying on the read side sharding.
It exports a full genome BAM file of ~60GB in ~25min.