Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ByteBuffer field type marshaling support to exporter. #6686

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.opentelemetry.api.internal.ConfigUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* Protobuf wire encoder.
Expand All @@ -56,7 +57,7 @@
//
// Differences
// - No support for Message/Lite
// - No support for ByteString or ByteBuffer
// - No support for ByteString
// - No support for message set extensions
// - No support for Unsafe
// - No support for Java String, only UTF-8 bytes
Expand Down Expand Up @@ -329,6 +330,11 @@
return computeLengthDelimitedFieldSize(value.length);
}

/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
public static int computeByteBufferSizeNoTag(final ByteBuffer value) {
return computeLengthDelimitedFieldSize(value.capacity());

Check warning on line 335 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L335

Added line #L335 was not covered by tests
}

static int computeLengthDelimitedFieldSize(int fieldLength) {
return computeUInt32SizeNoTag(fieldLength) + fieldLength;
}
Expand Down Expand Up @@ -375,6 +381,8 @@
abstract void writeByteArrayNoTag(final byte[] value, final int offset, final int length)
throws IOException;

abstract void writeByteBufferNoTag(final ByteBuffer value) throws IOException;

// =================================================================

/** Abstract base class for buffered encoders. */
Expand Down Expand Up @@ -487,6 +495,49 @@
write(value, offset, length);
}

@Override
void writeByteBufferNoTag(final ByteBuffer value) throws IOException {
writeUInt32NoTag(value.capacity());

Check warning on line 500 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L500

Added line #L500 was not covered by tests
if (value.hasArray()) {
write(value.array(), value.arrayOffset(), value.capacity());

Check warning on line 502 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L502

Added line #L502 was not covered by tests
} else {
write((ByteBuffer) value.duplicate().clear());

Check warning on line 504 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L504

Added line #L504 was not covered by tests
}
}

Check warning on line 506 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L506

Added line #L506 was not covered by tests

void write(ByteBuffer value) throws IOException {
int length = value.remaining();

Check warning on line 509 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L509

Added line #L509 was not covered by tests
if (limit - position >= length) {
// We have room in the current buffer.
value.get(buffer, position, length);
position += length;
totalBytesWritten += length;

Check warning on line 514 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L512-L514

Added lines #L512 - L514 were not covered by tests
} else {
// Write extends past current buffer. Fill the rest of this buffer and
// flush.
final int bytesWritten = limit - position;
value.get(buffer, position, bytesWritten);
length -= bytesWritten;
position = limit;
totalBytesWritten += bytesWritten;
doFlush();

Check warning on line 523 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L518-L523

Added lines #L518 - L523 were not covered by tests

// Now deal with the rest.
// Since we have an output stream, this is our buffer
// and buffer offset == 0
while (length > limit) {
// Copy data into the buffer before writing it to OutputStream.
value.get(buffer, 0, limit);
out.write(buffer, 0, limit);
length -= limit;
totalBytesWritten += limit;

Check warning on line 533 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L530-L533

Added lines #L530 - L533 were not covered by tests
}
value.get(buffer, 0, length);
position = length;
totalBytesWritten += length;

Check warning on line 537 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L535-L537

Added lines #L535 - L537 were not covered by tests
}
}

Check warning on line 539 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L539

Added line #L539 was not covered by tests

@Override
void write(byte value) throws IOException {
if (position == limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;

Expand Down Expand Up @@ -126,6 +127,13 @@
generator.writeBinaryField(field.getJsonName(), value);
}

@Override
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
byte[] data = new byte[value.capacity()];
((ByteBuffer) value.duplicate().clear()).get(data);
generator.writeBinaryField(field.getJsonName(), data);
}

Check warning on line 135 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java#L132-L135

Added lines #L132 - L135 were not covered by tests

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
generator.writeObjectFieldStart(field.getJsonName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -346,6 +347,14 @@
return field.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(message);
}

/** Returns the size of a bytes field based on the buffer's capacity. */
public static int sizeByteBuffer(ProtoFieldInfo field, ByteBuffer message) {
if (message.capacity() == 0) {
return 0;

Check warning on line 353 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java#L353

Added line #L353 was not covered by tests
}
return field.getTagSize() + CodedOutputStream.computeByteBufferSizeNoTag(message);

Check warning on line 355 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java#L355

Added line #L355 was not covered by tests
}

/** Returns the size of a enum field. */
// Assumes OTLP always defines the first item in an enum with number 0, which it does and will.
public static int sizeEnum(ProtoFieldInfo field, ProtoEnumInfo enumValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.trace.TraceId;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -168,6 +169,12 @@
output.writeByteArrayNoTag(value);
}

@Override
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
output.writeUInt32NoTag(field.getTag());
output.writeByteBufferNoTag(value);
}

Check warning on line 176 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java#L174-L176

Added lines #L174 - L176 were not covered by tests

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
output.writeUInt32NoTag(field.getTag());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -253,8 +254,22 @@
writeBytes(field, value);
}

/**
* Serializes a protobuf {@code bytes} field. Writes all content of the ByteBuffer regardless of
* the current position and limit. Does not alter the position or limit of the provided
* ByteBuffer.
*/
public void serializeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
if (value.capacity() == 0) {
return;

Check warning on line 264 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java#L264

Added line #L264 was not covered by tests
}
writeByteBuffer(field, value);
}

Check warning on line 267 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java#L266-L267

Added lines #L266 - L267 were not covered by tests

public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException;

public abstract void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException;

protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.internal.otlp.KeyValueMarshaler;
import io.opentelemetry.proto.profiles.v1experimental.internal.ProfileContainer;
import java.io.IOException;
import java.nio.ByteBuffer;

final class ProfileContainerMarshaler extends MarshalerWithSize {

Expand All @@ -20,17 +21,19 @@ final class ProfileContainerMarshaler extends MarshalerWithSize {
private final KeyValueMarshaler[] attributeMarshalers;
private final int droppedAttributesCount;
private final byte[] originalPayloadFormatUtf8;
private final byte[] originalPayload;
private final ByteBuffer originalPayload;
private final ProfileMarshaler profileMarshaler;

static ProfileContainerMarshaler create(ProfileContainerData profileContainerData) {
int droppedAttributesCount =
profileContainerData.getTotalAttributeCount() - profileContainerData.getAttributes().size();

// Not ideal, but this will do for now. ByteBuffer support in
// Serialzer/CodedOutputStream/MarshalerUtilwill follow in a separate step.
byte[] originalPayload = new byte[profileContainerData.getOriginalPayload().remaining()];
profileContainerData.getOriginalPayload().get(originalPayload);
ByteBuffer originalPayload = profileContainerData.getOriginalPayload();
if (originalPayload == null) {
originalPayload = ByteBuffer.allocate(0);
} else {
originalPayload = originalPayload.duplicate().asReadOnlyBuffer();
}

return new ProfileContainerMarshaler(
profileContainerData.getProfileIdBytes(),
Expand All @@ -50,7 +53,7 @@ private ProfileContainerMarshaler(
KeyValueMarshaler[] attributeMarshalers,
int droppedAttributesCount,
byte[] originalPayloadFormat,
byte[] originalPayload,
ByteBuffer originalPayload,
ProfileMarshaler profileMarshaler) {
super(
calculateSize(
Expand Down Expand Up @@ -80,7 +83,7 @@ protected void writeTo(Serializer output) throws IOException {
output.serializeRepeatedMessage(ProfileContainer.ATTRIBUTES, attributeMarshalers);
output.serializeUInt32(ProfileContainer.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
output.serializeString(ProfileContainer.ORIGINAL_PAYLOAD_FORMAT, originalPayloadFormatUtf8);
output.serializeBytes(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
output.serializeByteBuffer(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
output.serializeMessage(ProfileContainer.PROFILE, profileMarshaler);
}

Expand All @@ -91,7 +94,7 @@ private static int calculateSize(
KeyValueMarshaler[] attributeMarshalers,
int droppedAttributesCount,
byte[] originalPayloadFormat,
byte[] originalPayload,
ByteBuffer originalPayload,
ProfileMarshaler profileMarshaler) {
int size;
size = 0;
Expand All @@ -103,7 +106,7 @@ private static int calculateSize(
MarshalerUtil.sizeUInt32(ProfileContainer.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
size +=
MarshalerUtil.sizeBytes(ProfileContainer.ORIGINAL_PAYLOAD_FORMAT, originalPayloadFormat);
size += MarshalerUtil.sizeBytes(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
size += MarshalerUtil.sizeByteBuffer(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
size += MarshalerUtil.sizeMessage(ProfileContainer.PROFILE, profileMarshaler);
return size;
}
Expand Down
Loading