Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPROTO-265 Remove additional byte[] allocations for nested writers #192

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions core/src/main/java/org/infinispan/protostream/Decoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.infinispan.protostream;

import java.io.IOException;
import java.nio.ByteBuffer;

public interface Decoder {
int getEnd();

int getPos();

byte[] getBufferArray() throws IOException;

boolean isAtEnd() throws IOException;

int readTag() throws IOException;

void checkLastTagWas(int expectedTag) throws IOException;

boolean skipField(int tag) throws IOException;

void skipVarint() throws IOException;

void skipRawBytes(int length) throws IOException;

String readString() throws IOException;

byte readRawByte() throws IOException;

byte[] readRawByteArray(int length) throws IOException;

ByteBuffer readRawByteBuffer(int length) throws IOException;

int readVarint32() throws IOException;

long readVarint64() throws IOException;

int readFixed32() throws IOException;

long readFixed64() throws IOException;

int pushLimit(int newLimit) throws IOException;

void popLimit(int oldLimit);

Decoder decoderFromLength(int length) throws IOException;

/**
* Sets a hard limit on how many bytes we can continue to read while parsing a message from current position. This is
* useful to prevent corrupted or malicious messages with wrong length values to abuse memory allocation. Initially
* this limit is set to {@code Integer.MAX_INT}, which means the protection mechanism is disabled by default.
* The limit is only useful when processing streams. Setting a limit for a decoder backed by a byte array is useless
* because the memory allocation already happened.
*/
int setGlobalLimit(int globalLimit);
}
50 changes: 50 additions & 0 deletions core/src/main/java/org/infinispan/protostream/Encoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.infinispan.protostream;

import java.io.IOException;
import java.nio.ByteBuffer;

public interface Encoder {
void flush() throws IOException;

void close() throws IOException;

int remainingSpace();

void writeUInt32Field(int fieldNumber, int value) throws IOException;

void writeUInt64Field(int fieldNumber, long value) throws IOException;

void writeFixed32Field(int fieldNumber, int value) throws IOException;

void writeFixed64Field(int fieldNumber, long value) throws IOException;

void writeBoolField(int fieldNumber, boolean value) throws IOException;

void writeLengthDelimitedField(int fieldNumber, int length) throws IOException;

void writeVarint32(int value) throws IOException;

void writeVarint64(long value) throws IOException;

void writeFixed32(int value) throws IOException;

void writeFixed64(long value) throws IOException;

void writeByte(byte value) throws IOException;

void writeBytes(byte[] value, int offset, int length) throws IOException;

void writeBytes(ByteBuffer value) throws IOException;

default int skipFixedVarint() {
throw new UnsupportedOperationException();
}

default void writePositiveFixedVarint(int pos) {
throw new UnsupportedOperationException();
}

default boolean supportsFixedVarint() {
return false;
}
}
20 changes: 18 additions & 2 deletions core/src/main/java/org/infinispan/protostream/ProtobufUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public static void writeTo(ImmutableSerializationContext ctx, OutputStream out,
write(ctx, TagWriterImpl.newInstance(ctx, out), t);
}

public static void writeTo(ImmutableSerializationContext ctx, Encoder encoder, Object t) throws IOException {
write(ctx, TagWriterImpl.newInstance(ctx, encoder), t);
}

public static byte[] toByteArray(ImmutableSerializationContext ctx, Object t) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_ARRAY_BUFFER_SIZE);
writeTo(ctx, baos, t);
Expand Down Expand Up @@ -112,6 +116,10 @@ public static <A> A fromByteBuffer(ImmutableSerializationContext ctx, ByteBuffer
return readFrom(TagReaderImpl.newInstance(ctx, byteBuffer), clazz);
}

public static <A> A fromDecoder(ImmutableSerializationContext ctx, Decoder decoder, Class<A> clazz) throws IOException {
return readFrom(TagReaderImpl.newInstance(ctx, decoder), clazz);
}

/**
* Parses a top-level message that was wrapped according to the org.infinispan.protostream.WrappedMessage proto
* definition.
Expand All @@ -137,13 +145,17 @@ public static <A> A fromWrappedStream(ImmutableSerializationContext ctx, InputSt
return WrappedMessage.read(ctx, TagReaderImpl.newInstance(ctx, in));
}

public static <A> A fromWrappedDecoder(ImmutableSerializationContext ctx, Decoder decoder) throws IOException {
return WrappedMessage.read(ctx, TagReaderImpl.newInstance(ctx, decoder));
}

//todo [anistor] should make it possible to plug in a custom wrapping strategy instead of the default one
public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Object t) throws IOException {
return toWrappedByteArray(ctx, t, DEFAULT_ARRAY_BUFFER_SIZE);
}

public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Object t, int bufferSize) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
ByteArrayOutputStream baos = new ByteArrayOutputStreamEx(bufferSize);
WrappedMessage.write(ctx, TagWriterImpl.newInstanceNoBuffer(ctx, baos), t);
return baos.toByteArray();
}
Expand All @@ -155,13 +167,17 @@ public static ByteBuffer toWrappedByteBuffer(ImmutableSerializationContext ctx,
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
toWrappedStream(ctx, out, t, DEFAULT_STREAM_BUFFER_SIZE);
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, out), t);
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t, int bufferSize) throws IOException {
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, out, bufferSize), t);
}

public static void toWrappedEncoder(ImmutableSerializationContext ctx, Encoder encoder, Object t) throws IOException {
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, encoder), t);
}

/**
* Converts a Protobuf encoded message to its <a href="https://developers.google.com/protocol-buffers/docs/proto3#json">
* canonical JSON representation</a>.
Expand Down
35 changes: 28 additions & 7 deletions core/src/main/java/org/infinispan/protostream/TagReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public interface TagReader extends RawProtoStreamReader {

boolean readBool() throws IOException;

int readEnum() throws IOException;
default int readEnum() throws IOException {
return readInt32();
}

/**
* Reads a {@code string} value.
Expand All @@ -50,29 +52,48 @@ public interface TagReader extends RawProtoStreamReader {
*/
ByteBuffer readByteBuffer() throws IOException;

double readDouble() throws IOException;
/**
* Similar to {@link #readByteArray()} except that the reader impl may optimize creation of a sub TagReader from
* itself, possibly avoiding byte[] allocations
* @return a new TagReader
*/
ProtobufTagMarshaller.ReadContext subReaderFromArray() throws IOException;

default double readDouble() throws IOException {
return Double.longBitsToDouble(readFixed64());
}

float readFloat() throws IOException;
default float readFloat() throws IOException {
return Float.intBitsToFloat(readFixed32());
}

long readInt64() throws IOException;

long readUInt64() throws IOException;
default long readUInt64() throws IOException {
return readInt64();
}

long readSInt64() throws IOException;

long readFixed64() throws IOException;

long readSFixed64() throws IOException;
default long readSFixed64() throws IOException {
return readFixed64();
}

int readInt32() throws IOException;

int readUInt32() throws IOException;
default int readUInt32() throws IOException {
return readInt32();
}

int readSInt32() throws IOException;

int readFixed32() throws IOException;

int readSFixed32() throws IOException;
default int readSFixed32() throws IOException {
return readFixed32();
}

/**
* Sets a limit (based on the length of the length delimited value) when entering an embedded message.
Expand Down
70 changes: 58 additions & 12 deletions core/src/main/java/org/infinispan/protostream/TagWriter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.infinispan.protostream;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

Expand All @@ -9,14 +10,24 @@
* @author [email protected]
* @since 4.4
*/
public interface TagWriter extends RawProtoStreamWriter {
public interface TagWriter extends RawProtoStreamWriter, Closeable {

// start low level ops
void flush() throws IOException;

void writeTag(int number, int wireType) throws IOException;
/**
* Invoke after done with writer, this implies a flush if necessary
* It is necessary to invoke this on a writer returned from {@link #subWriter(int, boolean)} to actually push the data
*/
void close() throws IOException;

void writeTag(int number, WireType wireType) throws IOException;
default void writeTag(int number, int wireType) throws IOException {
writeVarint32(WireType.makeTag(number, wireType));
}

default void writeTag(int number, WireType wireType) throws IOException {
writeVarint32(WireType.makeTag(number, wireType));
}

void writeVarint32(int value) throws IOException;

Expand All @@ -28,38 +39,73 @@ public interface TagWriter extends RawProtoStreamWriter {
// start high level ops
void writeString(int number, String value) throws IOException;

void writeInt32(int number, int value) throws IOException;
default void writeInt32(int number, int value) throws IOException {
if (value >= 0) {
writeUInt32(number, value);
} else {
writeUInt64(number, value);
}
}

void writeUInt32(int number, int value) throws IOException;

void writeSInt32(int number, int value) throws IOException;
default void writeSInt32(int number, int value) throws IOException {
// Roll the bits in order to move the sign bit from position 31 to position 0, to reduce the wire length of negative numbers.
writeUInt32(number, (value << 1) ^ (value >> 31));
}

void writeFixed32(int number, int value) throws IOException;

void writeSFixed32(int number, int value) throws IOException;
default void writeSFixed32(int number, int value) throws IOException {
writeFixed32(number, value);
}

void writeInt64(int number, long value) throws IOException;

void writeUInt64(int number, long value) throws IOException;

void writeSInt64(int number, long value) throws IOException;
default void writeSInt64(int number, long value) throws IOException {
// Roll the bits in order to move the sign bit from position 63 to position 0, to reduce the wire length of negative numbers.
writeUInt64(number, (value << 1) ^ (value >> 63));
}

void writeFixed64(int number, long value) throws IOException;

void writeSFixed64(int number, long value) throws IOException;
default void writeSFixed64(int number, long value) throws IOException {
writeFixed64(number, value);
}

void writeEnum(int number, int value) throws IOException;
default void writeEnum(int number, int value) throws IOException {
writeInt32(number, value);
}

void writeBool(int number, boolean value) throws IOException;

void writeDouble(int number, double value) throws IOException;
default void writeDouble(int number, double value) throws IOException {
writeFixed64(number, Double.doubleToRawLongBits(value));
}

void writeFloat(int number, float value) throws IOException;
default void writeFloat(int number, float value) throws IOException {
writeFixed32(number, Float.floatToRawIntBits(value));
}

void writeBytes(int number, ByteBuffer value) throws IOException;

void writeBytes(int number, byte[] value) throws IOException;
default void writeBytes(int number, byte[] value) throws IOException {
writeBytes(number, value, 0, value.length);
}

void writeBytes(int number, byte[] value, int offset, int length) throws IOException;
// end high level ops

/**
* Used to write a sub message that can be optimized by implementation. When the sub writer is complete, flush
* should be invoked to ensure bytes are written and close should be invoked to free any resources related to the
* context (note close will flush as well)
* @param number the message number of the sub message
* @param nested whether this is a nested message or a new one
* @return a write context for a sub message
* @throws IOException exception if there is an issue
*/
ProtobufTagMarshaller.WriteContext subWriter(int number, boolean nested) throws IOException;
}
Loading
Loading