-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eliminate array copies #31
base: main
Are you sure you want to change the base?
Eliminate array copies #31
Conversation
f2917cc
to
1467cbe
Compare
1467cbe
to
f20e57d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided to provide some feedback while still looking at ByteBufferOutputStream
and its usage
brokerSession.post(this, msgs); | ||
return msgs; | ||
public void flush() throws BMQException { | ||
brokerSession.post(this, putMessages.getAndSet(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not we check here if there are messages to send? Otherwise we may just replace empty array with another empty array
synchronized (lock) { | ||
putMessages.add(message); | ||
} | ||
putMessages.get().add(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose .get().add()
is not an atomic operation, so it's possible get the array which is being flushed and add new messages afterward and eventually lost them or modify the collection while it is being iterated in PutPoster
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.get() itself is atomic, and so is the .getAndSet() in flush, so afaik I don't think its possible to get the array that is being flushed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, get()
and getAndSet()
are atomic, but the following sequence seems to be possible:
[t1] .get()
returns an array to flush
[t2] .getAndSet()
returns the same array
[t2] is iterating the array and processing the messages
[t1] calls .add()
which modifies the array
Correct?
@@ -1249,6 +1253,14 @@ public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQExce | |||
} | |||
} | |||
|
|||
public void post(QueueHandle queueHandle, PutMessageImpl msg) throws BMQException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need there there two methods here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.singletonList() is more efficient for a single element than Arrays.asList()
pack(msgs); | ||
flush(); | ||
} | ||
|
||
public void post(PutMessageImpl msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here
public void writeBytes(String s) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
public void writeBytes(ByteBuffer b) throws IOException { | ||
private boolean bufferIsFresh(ByteBuffer b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to bufferIsClear()
?
.toArray(ByteBuffer[]::new); | ||
} | ||
|
||
private ArrayList<ByteBuffer> buffers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah maybe I can get rid of it
*/ | ||
public ByteBuffer[] peek() { | ||
return bbArray.stream() | ||
.map(bb -> (ByteBuffer) (bb.duplicate().flip())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe asReadOnlyBuffer()
instead of duplicate()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that first, but a lot of unit tests then fail with:
[ERROR] com.bloomberg.bmq.impl.infr.proto.SchemaEventImplBuilderTest.testBuildOpenQueueJson Time elapsed: 0.001 s <<< ERROR!
java.nio.ReadOnlyBufferException
at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1473)
at com.bloomberg.bmq.util.TestHelpers.buffersContents(TestHelpers.java:65)
at com.bloomberg.bmq.util.TestHelpers.compareWithFileContent(TestHelpers.java:91)
at com.bloomberg.bmq.impl.infr.proto.SchemaEventImplBuilderTest.testBuildOpenQueueJson(SchemaEventImplBuilderTest.java:152)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:345)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the peek()
method is called by gson
. Is it possible to mark the method to be ignored when serializing?
public void writeBytes(String s) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
public void writeBytes(ByteBuffer b) throws IOException { | ||
private boolean bufferIsFresh(ByteBuffer b) { | ||
// a buffer that has never been put() to nor flipped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not completely sure if this statement is correct. From javadocs:
clear() makes a buffer ready for a new sequence of channel-read or relative put operations: It sets the limit to the capacity and the position to zero.
So it is possible to put some data (up to limit which may be set to capacity value) and then call clear()
in order to be able to read it.
Or my understanding is incorrect and this is a requirement for ByteBuffer
argument to be "fresh"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one thing that really bugs me about nio.ByteBuffer, there is no way to determine if a buffer that has no data vs one that has data but has already been flipped for reading. isClear()
is probably a better name for this method ...
but in the long term, I think the better thing to do is replace every usage of nio.ByteBuffer with netty.ByteBuf in the entire SDK, which would
- allow to eliminate that initial copy from NettyTcpConnection
- allow to delete ByteBufferOutputStream and ByteBufferInputStream entirely since netty.ByteBufOutputStream can be used instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one thing that really bugs me about nio.ByteBuffer, there is no way to determine if a buffer that has no data vs one that has data but has already been flipped for reading.
isClear()
is probably a better name for this method ...
Agree, but maybe we do not need to determine this and just take what is between 0 and limit regardless of current position?
but in the long term, I think the better thing to do is replace every usage of nio.ByteBuffer with netty.ByteBuf in the entire SDK, which would
- allow to eliminate that initial copy from NettyTcpConnection
- allow to delete ByteBufferOutputStream and ByteBufferInputStream entirely since netty.ByteBufOutputStream can be used instead
I would agree but AFAIK the limited use of netty and its components was intentional. The reason is that in some day SDK may switch to another network library and in case of wide use of its components in the code e.g. netty.ByteBuf
instead of nio.ByteBuffer
etc. it would require a lot of work to do. And having netty.ByteBuf
in API class (Queue
) will make it even more difficult.
I suggest to have a quick call tomorrow to discuss this
continue; | ||
} | ||
getNewBuffer(); | ||
if (bufferIsFresh(b)) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So taking into account the comment above, is this statement correct?
// TODO - can this copy be eliminated? | ||
ByteBuffer nioBuf = ByteBuffer.allocate(byteBuf.readableBytes()); | ||
byteBuf.readBytes(nioBuf); | ||
readBuffer.writeBuffer(nioBuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here we make a temporary buffer which gets bytes from netty one and then is added to the output stream as is without any copies?
Maybe it makes sense to add a comment saying this.
And maybe I would preserve a part of the removed comment in order to explain why we need to make a copy in general
I was able to run the JMH benchmarks with this latest commit, here are the results Before:
After:
except for the individual 512B messages, looks like 2x - 4x higher throughput for uncompressed. |
*Issue number of the reported bug or feature request: #30
Describe your changes
Testing performed
Unit and integration tests have been updated to reflect the changes.
Additional context
As discussed with @sgalichkin