Skip to content

Commit

Permalink
fix bug on serialization when passing task resource usage to coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jul 9, 2024
1 parent d1a6038 commit bfccfa5
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@

package org.opensearch.plugin.insights.rules.model;

import org.apache.lucene.util.ArrayUtil;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* Valid attributes for a search query record
Expand Down Expand Up @@ -75,6 +82,69 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO
out.writeString(attribute.toString());
}

/**
* Write Attribute value to a StreamOutput
* @param out the StreamOutput to write
* @param attributeValue the Attribute value to write
*/
@SuppressWarnings("unchecked")
public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException {
if (attributeValue instanceof List) {
out.writeList((List<? extends Writeable>) attributeValue);

Check warning on line 93 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L93

Added line #L93 was not covered by tests
} else {
out.writeGenericValue(attributeValue);
}
}

/**
* Read attribute value from the input stream given the Attribute type
*
* @param in the {@link StreamInput} input to read
* @param attribute attribute type to differentiate between Source and others
* @return parse value
* @throws IOException IOException
*/
public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException {
if (attribute == Attribute.TASK_RESOURCE_USAGES) {
return in.readList(TaskResourceInfo::readFromStream);

Check warning on line 109 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L109

Added line #L109 was not covered by tests
} else {
return in.readGenericValue();
}
}

/**
* Read attribute map from the input stream
*
* @param in the {@link StreamInput} to read
* @return parsed attribute map
* @throws IOException IOException
*/
public static Map<Attribute, Object> readAttributeMap(StreamInput in) throws IOException {
int size = readArraySize(in);
if (size == 0) {
return Collections.emptyMap();

Check warning on line 125 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L125

Added line #L125 was not covered by tests
}
Map<Attribute, Object> map = new HashMap<>(size);

for (int i = 0; i < size; i++) {
Attribute key = readFromStream(in);
Object value = readAttributeValue(in, key);
map.put(key, value);
}
return map;
}

private static int readArraySize(StreamInput in) throws IOException {
final int arraySize = in.readVInt();
if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) {
throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize);

Check warning on line 140 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L140

Added line #L140 was not covered by tests
}
if (arraySize < 0) {
throw new NegativeArraySizeException("array size must be positive but was: " + arraySize);

Check warning on line 143 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L143

Added line #L143 was not covered by tests
}
return arraySize;
}

@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce
measurements = new HashMap<>();
in.readMap(MetricType::readFromStream, StreamInput::readGenericValue)
.forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o))));
this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue);
this.attributes = Attribute.readAttributeMap(in);
}

/**
Expand Down Expand Up @@ -134,7 +134,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten
public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue);
out.writeMap(
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
);
}

/**
Expand Down

0 comments on commit bfccfa5

Please sign in to comment.