-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speed up streamed-proto query output by distributing work to multiple threads #24305
base: master
Are you sure you want to change the base?
Changes from 1 commit
6bd312b
41fbacf
0c9a1d2
913d4a3
9843a5e
9a0efa0
1852be0
89e8b3b
5fc8b13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,12 @@ | |
import com.google.devtools.build.lib.packages.LabelPrinter; | ||
import com.google.devtools.build.lib.packages.Target; | ||
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; | ||
import com.google.devtools.build.lib.query2.proto.proto2api.Build; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.util.stream.StreamSupport; | ||
|
||
/** | ||
* An output formatter that outputs a protocol buffer representation of a query result and outputs | ||
|
@@ -34,13 +38,70 @@ public String getName() { | |
public OutputFormatterCallback<Target> createPostFactoStreamCallback( | ||
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) { | ||
return new OutputFormatterCallback<Target>() { | ||
private final LabelPrinter ourLabelPrinter = labelPrinter; | ||
|
||
@Override | ||
public void processOutput(Iterable<Target> partialResult) | ||
throws IOException, InterruptedException { | ||
for (Target target : partialResult) { | ||
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out); | ||
try { | ||
StreamSupport.stream(partialResult.spliterator(), /* parallel= */true) | ||
.map(this::toProto) | ||
.map(StreamedProtoOutputFormatter::writeDelimited) | ||
.forEach(this::writeToOutputStreamThreadSafe); | ||
} catch (WrappedIOException e) { | ||
throw e.getCause(); | ||
} catch (WrappedInterruptedException e) { | ||
throw e.getCause(); | ||
} | ||
} | ||
|
||
private Build.Target toProto(Target target) { | ||
try { | ||
return toTargetProtoBuffer(target, ourLabelPrinter); | ||
} catch (InterruptedException e) { | ||
throw new WrappedInterruptedException(e); | ||
} | ||
} | ||
|
||
private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bout) { | ||
try { | ||
bout.writeTo(out); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) { | ||
try { | ||
var bout = new ByteArrayOutputStream(targetProtoBuffer.getSerializedSize() + 10); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you leave a comment on why |
||
targetProtoBuffer.writeDelimitedTo(bout); | ||
return bout; | ||
} catch (IOException e) { | ||
throw new WrappedIOException(e); | ||
} | ||
} | ||
|
||
private static class WrappedIOException extends RuntimeException { | ||
private WrappedIOException(IOException cause) { | ||
super(cause); | ||
} | ||
|
||
@Override | ||
public synchronized IOException getCause() { | ||
return (IOException) super.getCause(); | ||
} | ||
} | ||
|
||
private static class WrappedInterruptedException extends RuntimeException { | ||
private WrappedInterruptedException(InterruptedException cause) { | ||
super(cause); | ||
} | ||
|
||
@Override | ||
public synchronized InterruptedException getCause() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be synchronized? Same for the other wrapper. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks; good catch! |
||
return (InterruptedException) super.getCause(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!! fixed.