Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate array copies #31

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1219,10 +1221,12 @@ public QueueHandle lookupQueue(QueueId queueId) {
return queueStateManager.findByQueueId(queueId);
}

public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException {
public void post(QueueHandle queueHandle, Collection<PutMessageImpl> msgs) throws BMQException {
Argument.expectNonNull(queueHandle, "queueHandle");
Argument.expectNonNull(msgs, "msgs");
Argument.expectPositive(msgs.length, "message array length");
if (msgs.isEmpty()) {
return;
}

// Queue state guard
QueueState state = queueHandle.getState();
Expand All @@ -1249,6 +1253,14 @@ public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQExce
}
}

public void post(QueueHandle queueHandle, PutMessageImpl msg) throws BMQException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need there there two methods here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonList() is more efficient for a single element than Arrays.asList()

post(queueHandle, Collections.singletonList(msg));
}

public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException {
post(queueHandle, Arrays.asList(msgs));
}

public GenericResult confirm(QueueHandle queueHandle, PushMessageImpl... messages) {
Argument.expectNonNull(queueHandle, "queueHandle");
Argument.expectNonNull(messages, "messages");
Expand Down
16 changes: 13 additions & 3 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -68,9 +70,9 @@ void setMaxEventSize(int val) {
maxEventSize = Argument.expectNotGreater(val, EventHeader.MAX_SIZE_SOFT, "max event size");
}

public void pack(PutMessageImpl... msgs) {
public void pack(Collection<PutMessageImpl> msgs) {
Argument.expectNonNull(msgs, "msgs");
Argument.expectPositive(msgs.length, "message array length");
Argument.expectPositive(msgs.size(), "message array length");
for (PutMessageImpl m : msgs) {
Argument.expectNonNull(m, "put message");

Expand All @@ -87,11 +89,19 @@ public void flush() {
}
}

public void post(PutMessageImpl... msgs) {
public void post(Collection<PutMessageImpl> msgs) {
pack(msgs);
flush();
}

public void post(PutMessageImpl msg) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

post(Collections.singletonList(msg));
}

public void post(PutMessageImpl... msgs) {
post(Arrays.asList(msgs));
}

private void sendEvent() {
PutEventBuilder putBuilder = new PutEventBuilder();
putBuilder.setMaxEventSize(maxEventSize);
Expand Down
21 changes: 8 additions & 13 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +50,7 @@ public class QueueImpl implements QueueHandle {
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

static final int INVALID_QUEUE_ID = -1;
private static final int INITIAL_PUTMESSAGES_SIZE = 100;

// Immutable fields
private final BrokerSession brokerSession;
Expand All @@ -61,7 +64,8 @@ public class QueueImpl implements QueueHandle {
// Fields exposed to user thread
private final QueueHandleParameters parameters; // mutable object and final field
private volatile QueueState state;
private final ArrayList<PutMessageImpl> putMessages = new ArrayList<>();
private final AtomicReference<Collection<PutMessageImpl>> putMessages =
new AtomicReference<>(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE));
private volatile boolean isSuspended = false;
// Whether the queue is suspended.
// While suspended, a queue receives no
Expand Down Expand Up @@ -261,20 +265,11 @@ public BmqFuture<CloseQueueCode> closeAsync(Duration timeout) {
}

public void pack(PutMessageImpl message) throws BMQException {
synchronized (lock) {
putMessages.add(message);
}
putMessages.get().add(message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose .get().add() is not an atomic operation, so it's possible get the array which is being flushed and add new messages afterward and eventually lost them or modify the collection while it is being iterated in PutPoster?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get() itself is atomic, and so is the .getAndSet() in flush, so afaik I don't think its possible to get the array that is being flushed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, get() and getAndSet() are atomic, but the following sequence seems to be possible:
[t1] .get() returns an array to flush
[t2] .getAndSet() returns the same array
[t2] is iterating the array and processing the messages
[t1] calls .add() which modifies the array

Correct?

}

public PutMessageImpl[] flush() throws BMQException {
PutMessageImpl[] msgs;
synchronized (lock) {
msgs = new PutMessageImpl[putMessages.size()];
msgs = putMessages.toArray(msgs);
putMessages.clear();
}
brokerSession.post(this, msgs);
return msgs;
public void flush() throws BMQException {
brokerSession.post(this, putMessages.getAndSet(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not we check here if there are messages to send? Otherwise we may just replace empty array with another empty array

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,27 @@ public void write(byte[] ba, int offset, int length) throws IOException {
}
}

/**
* Make a readable view of the underlying data without copying it.
*
* <p>The bbos can continue to be written to.
*/
public ByteBuffer[] peek() {
return bbArray.stream()
.map(bb -> (ByteBuffer) (bb.duplicate().flip()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe asReadOnlyBuffer() instead of duplicate()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that first, but a lot of unit tests then fail with:

[ERROR] com.bloomberg.bmq.impl.infr.proto.SchemaEventImplBuilderTest.testBuildOpenQueueJson  Time elapsed: 0.001 s  <<< ERROR!
java.nio.ReadOnlyBufferException
        at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1473)
        at com.bloomberg.bmq.util.TestHelpers.buffersContents(TestHelpers.java:65)
        at com.bloomberg.bmq.util.TestHelpers.compareWithFileContent(TestHelpers.java:91)
        at com.bloomberg.bmq.impl.infr.proto.SchemaEventImplBuilderTest.testBuildOpenQueueJson(SchemaEventImplBuilderTest.java:152)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:345)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
        ```

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the peek() method is called by gson. Is it possible to mark the method to be ignored when serializing?

.toArray(ByteBuffer[]::new);
}

private ArrayList<ByteBuffer> buffers() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah maybe I can get rid of it

return bbArray;
}

@Override
public void writeBoolean(boolean v) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void writeByte(int v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -104,26 +121,42 @@ public void writeByte(int v) throws IOException {
currentBuffer.put((byte) v);
}

@Override
public void writeBytes(String s) throws IOException {
throw new UnsupportedOperationException();
}

public void writeBytes(ByteBuffer b) throws IOException {
private boolean bufferIsFresh(ByteBuffer b) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to bufferIsClear()?

// a buffer that has never been put() to nor flipped
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not completely sure if this statement is correct. From javadocs:
clear() makes a buffer ready for a new sequence of channel-read or relative put operations: It sets the limit to the capacity and the position to zero.

So it is possible to put some data (up to limit which may be set to capacity value) and then call clear() in order to be able to read it.

Or my understanding is incorrect and this is a requirement for ByteBuffer argument to be "fresh"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one thing that really bugs me about nio.ByteBuffer, there is no way to determine if a buffer that has no data vs one that has data but has already been flipped for reading. isClear() is probably a better name for this method ...

but in the long term, I think the better thing to do is replace every usage of nio.ByteBuffer with netty.ByteBuf in the entire SDK, which would

  1. allow to eliminate that initial copy from NettyTcpConnection
  2. allow to delete ByteBufferOutputStream and ByteBufferInputStream entirely since netty.ByteBufOutputStream can be used instead

Copy link
Collaborator

@sgalichkin sgalichkin Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one thing that really bugs me about nio.ByteBuffer, there is no way to determine if a buffer that has no data vs one that has data but has already been flipped for reading. isClear() is probably a better name for this method ...

Agree, but maybe we do not need to determine this and just take what is between 0 and limit regardless of current position?

but in the long term, I think the better thing to do is replace every usage of nio.ByteBuffer with netty.ByteBuf in the entire SDK, which would

  1. allow to eliminate that initial copy from NettyTcpConnection
  2. allow to delete ByteBufferOutputStream and ByteBufferInputStream entirely since netty.ByteBufOutputStream can be used instead

I would agree but AFAIK the limited use of netty and its components was intentional. The reason is that in some day SDK may switch to another network library and in case of wide use of its components in the code e.g. netty.ByteBuf instead of nio.ByteBuffer etc. it would require a lot of work to do. And having netty.ByteBuf in API class (Queue) will make it even more difficult.

I suggest to have a quick call tomorrow to discuss this

return b.position() == 0 && b.limit() == b.capacity();
}

public void writeBuffer(ByteBuffer b) throws IOException {
if (!isOpen) throw new IOException("Stream closed");
b.rewind();
while (b.hasRemaining()) {
if (currentBuffer.hasRemaining()) {
if (b.remaining() > currentBuffer.remaining()) {
// Read one byte
currentBuffer.put(b.get());
} else {
// Read the whole buffer
currentBuffer.put(b);
}
continue;
}
getNewBuffer();
if (bufferIsFresh(b)) return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So taking into account the comment above, is this statement correct?


boolean currentIsFresh = bufferIsFresh(currentBuffer);
// remove the currentBuffer if it is fresh
// we'll put it back at the end after adding other's buffers
// to avoid allocating a new one in that case
if (currentIsFresh) {
bbArray.remove(currentBufferIndex);
}
ByteBuffer buf = b.duplicate();
if (buf.limit() != buf.capacity()) {
// it has already been flipped - unflip it
int newPosition = buf.limit();
buf.limit(buf.capacity());
buf.position(newPosition);
}
prevBuffersNumBytes += buf.position();
bbArray.add(buf);
if (currentIsFresh) {
bbArray.add(currentBuffer);
} else {
addBuffer();
}
currentBufferIndex = bbArray.size() - 1;
}

/**
Expand All @@ -133,17 +166,14 @@ public void writeBytes(ByteBuffer b) throws IOException {
* @param bbos stream to write bytes to
* @throws IOException if the stream is not open
*/
public void writeBytes(ByteBufferOutputStream bbos) throws IOException {

if (!isOpen || !bbos.isOpen) throw new IOException("Stream closed");

for (int i = 0; i < bbos.bbArray.size(); i++) {
ByteBuffer data = bbos.bbArray.get(i);
data.flip();
writeBytes(data);
public void writeBuffers(ByteBufferOutputStream other) throws IOException {
if (!isOpen || !other.isOpen) throw new IOException("Stream closed");
for (ByteBuffer b : other.buffers()) {
writeBuffer(b);
}
}

@Override
public void writeChar(int v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -152,10 +182,12 @@ public void writeChar(int v) throws IOException {
currentBuffer.putChar((char) v);
}

@Override
public void writeChars(String s) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void writeDouble(double v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -164,6 +196,7 @@ public void writeDouble(double v) throws IOException {
currentBuffer.putDouble(v);
}

@Override
public void writeFloat(float v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -172,6 +205,7 @@ public void writeFloat(float v) throws IOException {
currentBuffer.putFloat(v);
}

@Override
public void writeInt(int v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -180,6 +214,7 @@ public void writeInt(int v) throws IOException {
currentBuffer.putInt(v);
}

@Override
public void writeLong(long v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -188,6 +223,7 @@ public void writeLong(long v) throws IOException {
currentBuffer.putLong(v);
}

@Override
public void writeShort(int v) throws IOException {
if (!isOpen) throw new IOException("Stream closed");

Expand All @@ -196,6 +232,7 @@ public void writeShort(int v) throws IOException {
currentBuffer.putShort((short) v);
}

@Override
public void writeUTF(String str) throws IOException {
write(str.getBytes(StandardCharsets.UTF_8));
}
Expand All @@ -210,12 +247,7 @@ public void writeAscii(String str) throws IOException {
* @return ByteBuffer[] previously allocated buffers flipped to the read mode.
*/
public ByteBuffer[] reset() {
ByteBuffer[] bbArrayCopy = new ByteBuffer[currentBufferIndex + 1];

for (int i = 0; i <= currentBufferIndex; ++i) {
bbArrayCopy[i] = bbArray.get(i);
bbArrayCopy[i].flip();
}
ByteBuffer[] bbArrayCopy = peek();

bbArray.clear();
prevBuffersNumBytes = 0;
Expand All @@ -227,22 +259,6 @@ public ByteBuffer[] reset() {
return bbArrayCopy;
}

/**
* Clears the Stream for reuse. Clears the previously stored data but leaves the stream in its
* previous state. If the stream is still open, then it can be reused to insert new data. Reuses
* previously allocated Buffers if possible.
*/
public void clear() {
if (!isOpen) return;

for (int i = 0; i <= currentBufferIndex; i++) {
bbArray.get(i).clear();
}
prevBuffersNumBytes = 0;
currentBuffer = bbArray.get(0);
currentBufferIndex = 0;
}

public int numByteBuffers() {
return bbArray.size();
}
Expand All @@ -252,13 +268,7 @@ public int size() {
}

private void getNewBuffer() {
// Try to reuse a previously allocated buffer before allocating new.
if (currentBufferIndex < bbArray.size() - 1) {
prevBuffersNumBytes += currentBuffer.position();
currentBuffer = bbArray.get(++currentBufferIndex);
} else {
addBuffer();
}
addBuffer();
}

private void addBuffer() {
Expand All @@ -272,6 +282,6 @@ private void addBuffer(int size) {
int allocationSize = Math.max(size, bufSize);
currentBuffer = ByteBuffer.allocate(allocationSize);
bbArray.add(currentBuffer);
currentBufferIndex++;
currentBufferIndex = bbArray.size() - 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -745,19 +745,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

ByteBuf byteBuf = (ByteBuf) msg;
try {
// Make a copy of bytes present in 'byteBuf'. This is needed
// so that the lifetime of 'readBuffer' (and ByteBuffer[]
// obtained from it via 'readBuffer.reset()') can be
// decoupled with the associated 'byteBuf'. As per netty docs,
// a ByteBuf object is refcounted, and user must invoke
// 'ByteBuf.release' to decrement the counter when done. Since
// there is no place in the logic where we can confidentally
// invoke 'ByteBuf.release' (BlazingMQ user may keep a PUSH message
// for ever), we make a copy of bytes present in 'ByteBuf' and
// invoke 'ByteBuf.release' right away. This copying is
// unfortunate, and we will investigate ways to avoid this
// copy later.
readBuffer.writeBytes(byteBuf.nioBuffer());
// This still makes a copy of the underlying bytes, but let netty do it
// instead of doing it ourselves.
//
// TODO - can this copy be eliminated?
ByteBuffer nioBuf = ByteBuffer.allocateDirect(byteBuf.readableBytes());
byteBuf.readBytes(nioBuf);
readBuffer.writeBuffer(nioBuf);
Copy link
Collaborator

@sgalichkin sgalichkin Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we make a temporary buffer which gets bytes from netty one and then is added to the output stream as is without any copies?

Maybe it makes sense to add a comment saying this.
And maybe I would preserve a part of the removed comment in order to explain why we need to make a copy in general

} catch (IOException e) {
logger.error("Failed to write data: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ public void streamIn(ByteBufferInputStream bbis) throws IOException {

public void streamOut(ByteBufferOutputStream bbos) throws IOException {
bbos.writeInt(statusAndCorrelationId);
for (int i = 0; i < MessageGUID.SIZE_BINARY; i++) {
bbos.writeByte(messageGUID[i]);
}
bbos.write(messageGUID);
bbos.writeInt(queueId);
}

Expand Down
Loading