Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

https://github.com/opensearch-project/job-scheduler/pull/351 #349

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
package org.opensearch.jobscheduler.transport.request;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.jobscheduler.utils.JobDetailsService;

/**
* Request to extensions to invoke a job action, converts request params to a byte array
*
*/
public class ExtensionJobActionRequest<T extends Writeable> extends ExtensionActionRequest {

public static final byte UNIT_SEPARATOR = (byte) '\u001F';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make this a constant in the ExtensionTransportAction.


/**
* Instantiates a new ExtensionJobActionRequest
*
Expand All @@ -27,7 +34,63 @@ public class ExtensionJobActionRequest<T extends Writeable> extends ExtensionAct
* @throws IOException if serialization fails
*/
public ExtensionJobActionRequest(String extensionActionName, T actionParams) throws IOException {
super(extensionActionName, JobDetailsService.convertParamsToBytes(actionParams));
super(extensionActionName, convertParamsToBytes(actionParams));
}

/**
* Finds the index of the specified byte value within the given byte array
*
* @param bytes the byte array to process
* @param value the byte to identify index of
* @return the index of the byte value
*/
private static int indexOf(byte[] bytes, byte value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make this a util method in ExtensionTransportAction

for (int offset = 0; offset < bytes.length; ++offset) {
if (bytes[offset] == value) {
return offset;
}
}
return -1;
}

/**
* Trims off the fully qualified request class name bytes and null byte from the ExtensionActionRequest requestBytes
*
* @param requestBytes the request bytes of an ExtensionActionRequest
* @return the trimmed array of bytes
*/
public static byte[] trimRequestBytes(byte[] requestBytes) {
int nullPos = indexOf(requestBytes, ExtensionJobActionRequest.UNIT_SEPARATOR);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that nullPos was left over from when this was a null byte, and maybe should just be a pos.

return Arrays.copyOfRange(requestBytes, nullPos + 1, requestBytes.length);
}

/**
* Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified class name bytes
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
private static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be a util method on ExtensionActionRequest or ExtensionTransportAction


// Write inner request to output stream and convert to byte array
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to always do this, but this is the method being invoked:

    @Override
    public void flush() {
        // nothing to do
    }

byte[] requestBytes = BytesReference.toBytes(out.bytes());

// Convert fully qualifed class name to byte array
byte[] requestClassBytes = ExtensionActionRequest.class.getName().getBytes(StandardCharsets.UTF_8);

// Generate ExtensionActionRequest responseByte array
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + requestBytes.length)
.put(requestClassBytes)
.put(ExtensionJobActionRequest.UNIT_SEPARATOR)
.put(requestBytes)
.array();

return proxyRequestBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public JobParameterRequest(StreamInput in) throws IOException {
* @throws IOException when message de-serialization fails.
*/
public JobParameterRequest(byte[] requestParams) throws IOException {
this(StreamInput.wrap(requestParams));
this(StreamInput.wrap(ExtensionJobActionRequest.trimRequestBytes(requestParams)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JobRunnerRequest(StreamInput in) throws IOException {
* @throws IOException when message de-serialization fails.
*/
public JobRunnerRequest(byte[] requestParams) throws IOException {
this(StreamInput.wrap(requestParams));
this(StreamInput.wrap(ExtensionJobActionRequest.trimRequestBytes(requestParams)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.jobscheduler.transport.response;

import java.io.IOException;

import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.jobscheduler.utils.JobDetailsService;

/**
* Response from extension job action, converts response params to a byte array
Expand All @@ -26,7 +28,25 @@ public class ExtensionJobActionResponse<T extends Writeable> extends ExtensionAc
* @throws IOException if serialization fails
*/
public ExtensionJobActionResponse(T actionResponse) throws IOException {
super(JobDetailsService.convertParamsToBytes(actionResponse));
super(convertParamsToBytes(actionResponse));
}

/**
* Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
private static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deja vu. We definitely should find a central location for these util classes that both Extensions and Plugins can use.

// Write all to output stream
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();

// convert bytes stream to byte array
return BytesReference.toBytes(out.bytes());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
Expand Down Expand Up @@ -503,24 +500,6 @@ private void updateJobDetails(final String documentId, final JobDetails updateJo
}
}

/**
* Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
public static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {
// Write all to output stream
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();

// convert bytes stream to byte array
return BytesReference.toBytes(out.bytes());
}

private String jobDetailsMapping() {
try {
InputStream in = JobDetailsService.class.getResourceAsStream(PLUGINS_JOB_DETAILS_MAPPING_FILE);
Expand Down