Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force flush of nio buffer when threshold is reached #84

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,32 +1,50 @@
package com.zegelin.prometheus.exposition;

import com.google.common.annotations.VisibleForTesting;

import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

public class FormattedByteChannel implements ReadableByteChannel {
public static final int MIN_CHUNK_SIZE = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = MIN_CHUNK_SIZE * 5;
public static final int DEFAULT_CHUNK_THRESHOLD = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = DEFAULT_CHUNK_THRESHOLD * 5;

private final FormattedExposition formattedExposition;
private final int chunkThreshold;

public FormattedByteChannel(final FormattedExposition formattedExposition) {
this(formattedExposition, DEFAULT_CHUNK_THRESHOLD);
}

public FormattedByteChannel(FormattedExposition formattedExposition) {
@VisibleForTesting
FormattedByteChannel(final FormattedExposition formattedExposition, final int chunkThreshold) {
this.formattedExposition = formattedExposition;
this.chunkThreshold = chunkThreshold;
}

@Override
public int read(ByteBuffer dst) {
public int read(final ByteBuffer dst) {
if (!isOpen()) {
return -1;
}

// Forcing the calling ChunkedNioStream to flush the buffer
if (hasBufferReachedChunkThreshold(dst)) {
return -1;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strategy used here is somewhat questionable.

Returning -1 while there's still data to read isn't exactly suggested by the ReadableByteChannel API. On the other hand, the ChunkedNioStream class, which is consuming this byte channel in Netty, seem to expect this behavior deliberately. I've verified this to work on both Netty 4.0 and 4.1.

This was the most elegant solution I could come up with that will avoid another staging buffer, and at the same time stay compatible with both Netty 4.0 and 4.1.

}

final NioExpositionSink sink = new NioExpositionSink(dst);
while (sink.getIngestedByteCount() < MIN_CHUNK_SIZE && isOpen()) {
while (!hasBufferReachedChunkThreshold(dst) && isOpen()) {
formattedExposition.nextSlice(sink);
}

return sink.getIngestedByteCount();
}

private boolean hasBufferReachedChunkThreshold(final ByteBuffer dst) {
return dst.position() >= chunkThreshold;
}

@Override
public boolean isOpen() {
return !formattedExposition.isEndOfInput();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.zegelin.prometheus.exposition;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedNioStream;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
Expand All @@ -8,49 +12,102 @@
import java.nio.ByteBuffer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

public class TestFormattedByteChannel {
@Mock
private FormattedExposition formattedExposition;
private ChannelHandlerContext ctx;

private ChunkedNioStream chunkedNioStream;

private TenSliceExposition formattedExposition;

private ByteBuffer buffer;
private FormattedByteChannel channel;

@BeforeMethod
public void before() {
MockitoAnnotations.initMocks(this);

buffer = ByteBuffer.allocate(128);
channel = new FormattedByteChannel(formattedExposition);
formattedExposition = new TenSliceExposition();
channel = new FormattedByteChannel(formattedExposition, 64);

when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
chunkedNioStream = new ChunkedNioStream(channel, 128);
}

@Test
public void testClosed() {
when(formattedExposition.isEndOfInput()).thenReturn(true);
formattedExposition.setSlices(0);

assertThat(channel.read(buffer)).isEqualTo(-1);
assertThat(channel.isOpen()).isEqualTo(false);
}

@Test
public void testOpen() {
when(formattedExposition.isEndOfInput()).thenReturn(false);
formattedExposition.setSlices(1);

assertThat(channel.isOpen()).isEqualTo(true);
}

@Test
public void testOneChunk() {
when(formattedExposition.isEndOfInput()).thenReturn(false).thenReturn(false).thenReturn(true);
doAnswer(invocation -> {
NioExpositionSink sink = invocation.getArgument(0);
public void testOneSlice() throws Exception {
formattedExposition.setSlices(1);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(10);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoSlices() throws Exception {
formattedExposition.setSlices(2);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(20);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoChunks() throws Exception {
formattedExposition.setSlices(10);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(70);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(false);

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(30);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

// A dummy Exposition implementation that will generate a specific number of slices of size 10.
private static class TenSliceExposition implements FormattedExposition {
private int slices = 0;
private int currentSlice = 0;

private void setSlices(final int chunks) {
this.slices = chunks;
}

@Override
public void nextSlice(final ExpositionSink<?> sink) {
if (isEndOfInput()) {
return;
}

currentSlice++;
sink.writeAscii("abcdefghij");
return null;
}).when(formattedExposition).nextSlice(any(NioExpositionSink.class));
}

assertThat(channel.read(buffer)).isEqualTo(10);
assertThat(channel.isOpen()).isEqualTo(false);
@Override
public boolean isEndOfInput() {
return currentSlice >= slices;
}
}
}