Skip to content

Commit

Permalink
IPROTO Remove additional byte[] allocations for nested readers/writers
Browse files Browse the repository at this point in the history
* Add new subWriter method to implement to allow reusing encoder
  instances
* Add some common default methods to the TagWriter/TagReader interfaces
* Add common way to write a fixed varint of 5 bytes
  • Loading branch information
wburns committed May 8, 2023
1 parent beff6d3 commit 7afea21
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Objec
}

public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Object t, int bufferSize) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
ByteArrayOutputStream baos = new ByteArrayOutputStreamEx(bufferSize);
WrappedMessage.write(ctx, TagWriterImpl.newInstanceNoBuffer(ctx, baos), t);
return baos.toByteArray();
}
Expand All @@ -155,7 +155,7 @@ public static ByteBuffer toWrappedByteBuffer(ImmutableSerializationContext ctx,
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
toWrappedStream(ctx, out, t, DEFAULT_STREAM_BUFFER_SIZE);
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, out), t);
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t, int bufferSize) throws IOException {
Expand Down
35 changes: 28 additions & 7 deletions core/src/main/java/org/infinispan/protostream/TagReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public interface TagReader extends RawProtoStreamReader {

boolean readBool() throws IOException;

int readEnum() throws IOException;
default int readEnum() throws IOException {
return readInt32();
}

/**
* Reads a {@code string} value.
Expand All @@ -50,29 +52,48 @@ public interface TagReader extends RawProtoStreamReader {
*/
ByteBuffer readByteBuffer() throws IOException;

double readDouble() throws IOException;
/**
* Similar to {@link #readByteArray()} except that the reader impl may optimize creation of a sub TagReader from
* itself, possibly avoiding byte[] allocations
* @return a new TagReader
*/
TagReader subReaderFromArray() throws IOException;

default double readDouble() throws IOException {
return Double.longBitsToDouble(readFixed64());
}

float readFloat() throws IOException;
default float readFloat() throws IOException {
return Float.intBitsToFloat(readFixed32());
}

long readInt64() throws IOException;

long readUInt64() throws IOException;
default long readUInt64() throws IOException {
return readInt64();
}

long readSInt64() throws IOException;

long readFixed64() throws IOException;

long readSFixed64() throws IOException;
default long readSFixed64() throws IOException {
return readFixed64();
}

int readInt32() throws IOException;

int readUInt32() throws IOException;
default int readUInt32() throws IOException {
return readInt32();
}

int readSInt32() throws IOException;

int readFixed32() throws IOException;

int readSFixed32() throws IOException;
default int readSFixed32() throws IOException {
return readFixed32();
}

/**
* Sets a limit (based on the length of the length delimited value) when entering an embedded message.
Expand Down
64 changes: 53 additions & 11 deletions core/src/main/java/org/infinispan/protostream/TagWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,19 @@ public interface TagWriter extends RawProtoStreamWriter {
// start low level ops
void flush() throws IOException;

void writeTag(int number, int wireType) throws IOException;
/**
* Invoke after done with writer, this implies a flush if necessary
* It is necessary to invoke this on a writer returned from {@link #subWriter(int)} to actually push the data
*/
void close() throws IOException;

void writeTag(int number, WireType wireType) throws IOException;
default void writeTag(int number, int wireType) throws IOException {
writeVarint32(WireType.makeTag(number, wireType));
}

default void writeTag(int number, WireType wireType) throws IOException {
writeVarint32(WireType.makeTag(number, wireType));
}

void writeVarint32(int value) throws IOException;

Expand All @@ -28,38 +38,70 @@ public interface TagWriter extends RawProtoStreamWriter {
// start high level ops
void writeString(int number, String value) throws IOException;

void writeInt32(int number, int value) throws IOException;
default void writeInt32(int number, int value) throws IOException {
if (value >= 0) {
writeUInt32(number, value);
} else {
writeUInt64(number, value);
}
}

void writeUInt32(int number, int value) throws IOException;

void writeSInt32(int number, int value) throws IOException;
default void writeSInt32(int number, int value) throws IOException {
// Roll the bits in order to move the sign bit from position 31 to position 0, to reduce the wire length of negative numbers.
writeUInt32(number, (value << 1) ^ (value >> 31));
}

void writeFixed32(int number, int value) throws IOException;

void writeSFixed32(int number, int value) throws IOException;
default void writeSFixed32(int number, int value) throws IOException {
writeFixed32(number, value);
}

void writeInt64(int number, long value) throws IOException;

void writeUInt64(int number, long value) throws IOException;

void writeSInt64(int number, long value) throws IOException;
default void writeSInt64(int number, long value) throws IOException {
// Roll the bits in order to move the sign bit from position 63 to position 0, to reduce the wire length of negative numbers.
writeUInt64(number, (value << 1) ^ (value >> 63));
}

void writeFixed64(int number, long value) throws IOException;

void writeSFixed64(int number, long value) throws IOException;
default void writeSFixed64(int number, long value) throws IOException {
writeFixed64(number, value);
}

void writeEnum(int number, int value) throws IOException;
default void writeEnum(int number, int value) throws IOException {
writeInt32(number, value);
}

void writeBool(int number, boolean value) throws IOException;

void writeDouble(int number, double value) throws IOException;
default void writeDouble(int number, double value) throws IOException {
writeFixed64(number, Double.doubleToRawLongBits(value));
}

void writeFloat(int number, float value) throws IOException;
default void writeFloat(int number, float value) throws IOException {
writeFixed32(number, Float.floatToRawIntBits(value));
}

void writeBytes(int number, ByteBuffer value) throws IOException;

void writeBytes(int number, byte[] value) throws IOException;
default void writeBytes(int number, byte[] value) throws IOException {
writeBytes(number, value, 0, value.length);
}

void writeBytes(int number, byte[] value, int offset, int length) throws IOException;
// end high level ops

/**
* Used to write a sub message that can be optimized by implementation. When the sub writer is complete, flush
* should be invoked to ensure
* @return
* @throws IOException
*/
TagWriter subWriter(int number, boolean nested) throws IOException;
}
21 changes: 9 additions & 12 deletions core/src/main/java/org/infinispan/protostream/WrappedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,13 @@ private static void writeMessage(ImmutableSerializationContext ctx, TagWriter ou
if (t.getClass().isEnum()) {
((EnumMarshallerDelegate) marshallerDelegate).encode(WRAPPED_ENUM, (Enum) t, out);
} else {
ByteArrayOutputStreamEx buffer = new ByteArrayOutputStreamEx();
TagWriterImpl nestedCtx = TagWriterImpl.newInstanceNoBuffer(ctx, buffer);
marshallerDelegate.marshall(nestedCtx, null, t);
nestedCtx.flush();
out.writeBytes(WRAPPED_MESSAGE, buffer.getByteBuffer());
TagWriter nestedWriter = out.subWriter(WRAPPED_MESSAGE, false);
marshallerDelegate.marshall((ProtobufTagMarshaller.WriteContext) nestedWriter, null, t);
nestedWriter.close();
}
}
}
out.flush();
out.close();
}

private static void writeContainer(ImmutableSerializationContext ctx, TagWriter out, BaseMarshallerDelegate marshallerDelegate, Object container) throws IOException {
Expand Down Expand Up @@ -355,7 +353,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
String typeName = null;
Integer typeId = null;
int enumValue = -1;
byte[] messageBytes = null;
TagReader messageReader = null;
Object value = null;
int fieldCount = 0;
int expectedFieldCount = 1;
Expand Down Expand Up @@ -398,7 +396,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
}
case WRAPPED_MESSAGE << WireType.TAG_TYPE_NUM_BITS | WireType.WIRETYPE_LENGTH_DELIMITED: {
expectedFieldCount = 2;
messageBytes = in.readByteArray();
messageReader = in.subReaderFromArray();
break;
}
case WRAPPED_STRING << WireType.TAG_TYPE_NUM_BITS | WireType.WIRETYPE_LENGTH_DELIMITED: {
Expand Down Expand Up @@ -514,7 +512,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
}
}

if (value == null && typeName == null && typeId == null && messageBytes == null) {
if (value == null && typeName == null && typeId == null && messageReader == null) {
return null;
}

Expand All @@ -533,10 +531,9 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
typeName = ctx.getDescriptorByTypeId(typeId).getFullName();
}
BaseMarshallerDelegate marshallerDelegate = ((SerializationContextImpl) ctx).getMarshallerDelegate(typeName);
if (messageBytes != null) {
if (messageReader != null) {
// it's a Message type
TagReaderImpl nestedInput = TagReaderImpl.newInstance(ctx, messageBytes);
return (T) marshallerDelegate.unmarshall(nestedInput, null);
return (T) marshallerDelegate.unmarshall((ProtobufTagMarshaller.ReadContext) messageReader, null);
} else {
// it's an Enum
EnumMarshaller marshaller = (EnumMarshaller) marshallerDelegate.getMarshaller();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;

import org.infinispan.protostream.ProtobufTagMarshaller;
import org.infinispan.protostream.TagWriter;
import org.infinispan.protostream.impl.BaseMarshallerDelegate;
import org.infinispan.protostream.impl.ByteArrayOutputStreamEx;
import org.infinispan.protostream.impl.Log;
Expand Down Expand Up @@ -46,6 +47,17 @@ protected final <T> void writeNestedMessage(BaseMarshallerDelegate<T> marshaller
throw log.maxNestedMessageDepth(maxNestedMessageDepth, message.getClass());
}

if (ctx instanceof TagWriter) {
TagWriter nestedWriter = ((TagWriter) ctx).subWriter(fieldNumber, true);
marshallerDelegate.marshall((ProtobufTagMarshaller.WriteContext) nestedWriter, null, message);
nestedWriter.close();
} else {
handleNonTagWriter(marshallerDelegate, ctx, fieldNumber, message);
}
}

private <T> void handleNonTagWriter(BaseMarshallerDelegate<T> marshallerDelegate, ProtobufTagMarshaller.WriteContext ctx,
int fieldNumber, T message) throws IOException {
ByteArrayOutputStreamEx baos = new ByteArrayOutputStreamEx();
TagWriterImpl nested = TagWriterImpl.newNestedInstance(ctx, baos);
writeMessage(marshallerDelegate, nested, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,14 @@ public ByteArrayOutputStreamEx(int size) {
public synchronized ByteBuffer getByteBuffer() {
return ByteBuffer.wrap(buf, 0, count);
}

public int skipFixedVarint() {
int prev = count;
count += 5;
return prev;
}

public void writePositiveFixedVarint(int pos) {
TagWriterImpl.writePositiveFixedVarint(buf, pos, count - pos - 5);
}
}
Loading

0 comments on commit 7afea21

Please sign in to comment.