From 9d6e9765d83c443faaf5c751d0f6f340a63a88ae Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Mon, 27 Mar 2023 12:18:10 -0700 Subject: [PATCH] Fixes serde logic for proxy scheduled jobrunner/parser requests/responses (#349) Signed-off-by: Joshua Palis --- .../request/ExtensionJobActionRequest.java | 67 ++++++++++++++++++- .../request/JobParameterRequest.java | 2 +- .../transport/request/JobRunnerRequest.java | 2 +- .../response/ExtensionJobActionResponse.java | 24 ++++++- .../jobscheduler/utils/JobDetailsService.java | 21 ------ 5 files changed, 89 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java index 711c2341..51b72de9 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java @@ -9,9 +9,14 @@ package org.opensearch.jobscheduler.transport.request; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.extensions.action.ExtensionActionRequest; -import org.opensearch.jobscheduler.utils.JobDetailsService; /** * Request to extensions to invoke a job action, converts request params to a byte array @@ -19,6 +24,8 @@ */ public class ExtensionJobActionRequest extends ExtensionActionRequest { + public static final byte UNIT_SEPARATOR = (byte) '\u001F'; + /** * Instantiates a new ExtensionJobActionRequest * @@ -27,7 +34,63 @@ public class ExtensionJobActionRequest extends ExtensionAct * @throws IOException if serialization fails */ public ExtensionJobActionRequest(String extensionActionName, T actionParams) throws IOException { - super(extensionActionName, JobDetailsService.convertParamsToBytes(actionParams)); + super(extensionActionName, convertParamsToBytes(actionParams)); + } + + /** + * Finds the index of the specified byte value within the given byte array + * + * @param bytes the byte array to process + * @param value the byte to identify index of + * @return the index of the byte value + */ + private static int indexOf(byte[] bytes, byte value) { + for (int offset = 0; offset < bytes.length; ++offset) { + if (bytes[offset] == value) { + return offset; + } + } + return -1; + } + + /** + * Trims off the fully qualified request class name bytes and null byte from the ExtensionActionRequest requestBytes + * + * @param requestBytes the request bytes of an ExtensionActionRequest + * @return the trimmed array of bytes + */ + public static byte[] trimRequestBytes(byte[] requestBytes) { + int nullPos = indexOf(requestBytes, ExtensionJobActionRequest.UNIT_SEPARATOR); + return Arrays.copyOfRange(requestBytes, nullPos + 1, requestBytes.length); + } + + /** + * Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified class name bytes + * + * @param a class that extends writeable + * @param actionParams the action parameters to be serialized + * @throws IOException if serialization fails + * @return the byte array of the parameters + */ + private static byte[] convertParamsToBytes(T actionParams) throws IOException { + + // Write inner request to output stream and convert to byte array + BytesStreamOutput out = new BytesStreamOutput(); + actionParams.writeTo(out); + out.flush(); + byte[] requestBytes = BytesReference.toBytes(out.bytes()); + + // Convert fully qualifed class name to byte array + byte[] requestClassBytes = ExtensionActionRequest.class.getName().getBytes(StandardCharsets.UTF_8); + + // Generate ExtensionActionRequest responseByte array + byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + requestBytes.length) + .put(requestClassBytes) + .put(ExtensionJobActionRequest.UNIT_SEPARATOR) + .put(requestBytes) + .array(); + + return proxyRequestBytes; } } diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java index 245423a5..adadbaa7 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java @@ -83,7 +83,7 @@ public JobParameterRequest(StreamInput in) throws IOException { * @throws IOException when message de-serialization fails. */ public JobParameterRequest(byte[] requestParams) throws IOException { - this(StreamInput.wrap(requestParams)); + this(StreamInput.wrap(ExtensionJobActionRequest.trimRequestBytes(requestParams))); } @Override diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java index 29eef927..0e279e5c 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java @@ -68,7 +68,7 @@ public JobRunnerRequest(StreamInput in) throws IOException { * @throws IOException when message de-serialization fails. */ public JobRunnerRequest(byte[] requestParams) throws IOException { - this(StreamInput.wrap(requestParams)); + this(StreamInput.wrap(ExtensionJobActionRequest.trimRequestBytes(requestParams))); } @Override diff --git a/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java index 50f37b9e..c7f89046 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java @@ -9,9 +9,11 @@ package org.opensearch.jobscheduler.transport.response; import java.io.IOException; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.extensions.action.ExtensionActionResponse; -import org.opensearch.jobscheduler.utils.JobDetailsService; /** * Response from extension job action, converts response params to a byte array @@ -26,7 +28,25 @@ public class ExtensionJobActionResponse extends ExtensionAc * @throws IOException if serialization fails */ public ExtensionJobActionResponse(T actionResponse) throws IOException { - super(JobDetailsService.convertParamsToBytes(actionResponse)); + super(convertParamsToBytes(actionResponse)); + } + + /** + * Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array + * + * @param a class that extends writeable + * @param actionParams the action parameters to be serialized + * @throws IOException if serialization fails + * @return the byte array of the parameters + */ + private static byte[] convertParamsToBytes(T actionParams) throws IOException { + // Write all to output stream + BytesStreamOutput out = new BytesStreamOutput(); + actionParams.writeTo(out); + out.flush(); + + // convert bytes stream to byte array + return BytesReference.toBytes(out.bytes()); } } diff --git a/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java index f380cdde..c1ed8269 100644 --- a/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java +++ b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java @@ -31,9 +31,6 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.extensions.action.ExtensionProxyAction; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.DocumentMissingException; @@ -503,24 +500,6 @@ private void updateJobDetails(final String documentId, final JobDetails updateJo } } - /** - * Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array - * - * @param a class that extends writeable - * @param actionParams the action parameters to be serialized - * @throws IOException if serialization fails - * @return the byte array of the parameters - */ - public static byte[] convertParamsToBytes(T actionParams) throws IOException { - // Write all to output stream - BytesStreamOutput out = new BytesStreamOutput(); - actionParams.writeTo(out); - out.flush(); - - // convert bytes stream to byte array - return BytesReference.toBytes(out.bytes()); - } - private String jobDetailsMapping() { try { InputStream in = JobDetailsService.class.getResourceAsStream(PLUGINS_JOB_DETAILS_MAPPING_FILE);