Skip to content

Commit

Permalink
Fixes serde logic for proxy scheduled jobrunner/parser requests/respo…
Browse files Browse the repository at this point in the history
…nses (#349)

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored Mar 27, 2023
1 parent 00bd2d4 commit 9d6e976
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
package org.opensearch.jobscheduler.transport.request;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.jobscheduler.utils.JobDetailsService;

/**
* Request to extensions to invoke a job action, converts request params to a byte array
*
*/
public class ExtensionJobActionRequest<T extends Writeable> extends ExtensionActionRequest {

public static final byte UNIT_SEPARATOR = (byte) '\u001F';

/**
* Instantiates a new ExtensionJobActionRequest
*
Expand All @@ -27,7 +34,63 @@ public class ExtensionJobActionRequest<T extends Writeable> extends ExtensionAct
* @throws IOException if serialization fails
*/
public ExtensionJobActionRequest(String extensionActionName, T actionParams) throws IOException {
super(extensionActionName, JobDetailsService.convertParamsToBytes(actionParams));
super(extensionActionName, convertParamsToBytes(actionParams));
}

/**
* Finds the index of the specified byte value within the given byte array
*
* @param bytes the byte array to process
* @param value the byte to identify index of
* @return the index of the byte value
*/
private static int indexOf(byte[] bytes, byte value) {
for (int offset = 0; offset < bytes.length; ++offset) {
if (bytes[offset] == value) {
return offset;
}
}
return -1;
}

/**
* Trims off the fully qualified request class name bytes and null byte from the ExtensionActionRequest requestBytes
*
* @param requestBytes the request bytes of an ExtensionActionRequest
* @return the trimmed array of bytes
*/
public static byte[] trimRequestBytes(byte[] requestBytes) {
int nullPos = indexOf(requestBytes, ExtensionJobActionRequest.UNIT_SEPARATOR);
return Arrays.copyOfRange(requestBytes, nullPos + 1, requestBytes.length);
}

/**
* Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified class name bytes
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
private static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {

// Write inner request to output stream and convert to byte array
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();
byte[] requestBytes = BytesReference.toBytes(out.bytes());

// Convert fully qualifed class name to byte array
byte[] requestClassBytes = ExtensionActionRequest.class.getName().getBytes(StandardCharsets.UTF_8);

// Generate ExtensionActionRequest responseByte array
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + requestBytes.length)
.put(requestClassBytes)
.put(ExtensionJobActionRequest.UNIT_SEPARATOR)
.put(requestBytes)
.array();

return proxyRequestBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public JobParameterRequest(StreamInput in) throws IOException {
* @throws IOException when message de-serialization fails.
*/
public JobParameterRequest(byte[] requestParams) throws IOException {
this(StreamInput.wrap(requestParams));
this(StreamInput.wrap(ExtensionJobActionRequest.trimRequestBytes(requestParams)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JobRunnerRequest(StreamInput in) throws IOException {
* @throws IOException when message de-serialization fails.
*/
public JobRunnerRequest(byte[] requestParams) throws IOException {
this(StreamInput.wrap(requestParams));
this(StreamInput.wrap(ExtensionJobActionRequest.trimRequestBytes(requestParams)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.jobscheduler.transport.response;

import java.io.IOException;

import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.jobscheduler.utils.JobDetailsService;

/**
* Response from extension job action, converts response params to a byte array
Expand All @@ -26,7 +28,25 @@ public class ExtensionJobActionResponse<T extends Writeable> extends ExtensionAc
* @throws IOException if serialization fails
*/
public ExtensionJobActionResponse(T actionResponse) throws IOException {
super(JobDetailsService.convertParamsToBytes(actionResponse));
super(convertParamsToBytes(actionResponse));
}

/**
* Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
private static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {
// Write all to output stream
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();

// convert bytes stream to byte array
return BytesReference.toBytes(out.bytes());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
Expand Down Expand Up @@ -503,24 +500,6 @@ private void updateJobDetails(final String documentId, final JobDetails updateJo
}
}

/**
* Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
public static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {
// Write all to output stream
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();

// convert bytes stream to byte array
return BytesReference.toBytes(out.bytes());
}

private String jobDetailsMapping() {
try {
InputStream in = JobDetailsService.class.getResourceAsStream(PLUGINS_JOB_DETAILS_MAPPING_FILE);
Expand Down

0 comments on commit 9d6e976

Please sign in to comment.