Skip to content

Commit

Permalink
[Extensions] Add all fields of LockModel to RestGetLockAction response (
Browse files Browse the repository at this point in the history
#342)

* Modifies the RestGetLockAction response to include all fields of the lock model object, rather than the lock model object itself

Signed-off-by: Joshua Palis <[email protected]>

* Removing unused fields

Signed-off-by: Joshua Palis <[email protected]>

* reverting field removal

Signed-off-by: Joshua Palis <[email protected]>

* Adding lockID to back to response

Signed-off-by: Joshua Palis <[email protected]>

* Adding checks to multinode get lock rest integration tests to ensure that all response fields are populated

Signed-off-by: Joshua Palis <[email protected]>

* fixing lock time parsing

Signed-off-by: Joshua Palis <[email protected]>

* Adds a new Response class to house serde logic for RestGetLockAction response

Signed-off-by: Joshua Palis <[email protected]>

* Moving parser.nextToken() within AcquireLockResponse.parse()

Signed-off-by: Joshua Palis <[email protected]>

* Implementing toXContentObject for AcquireLockRequest

Signed-off-by: Joshua Palis <[email protected]>

* Moving AcquireLockResponse to transpor package

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored Mar 10, 2023
1 parent 90d25f0 commit 4649c3d
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.jobscheduler.JobSchedulerPlugin;

import org.opensearch.jobscheduler.transport.AcquireLockResponse;
import org.opensearch.jobscheduler.transport.AcquireLockRequest;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.jobscheduler.spi.LockModel;
Expand All @@ -38,10 +39,6 @@
import static org.opensearch.rest.RestRequest.Method.GET;

import static org.opensearch.jobscheduler.spi.LockModel.GET_LOCK_ACTION;
import static org.opensearch.jobscheduler.spi.LockModel.SEQUENCE_NUMBER;
import static org.opensearch.jobscheduler.spi.LockModel.PRIMARY_TERM;
import static org.opensearch.jobscheduler.spi.LockModel.LOCK_ID;
import static org.opensearch.jobscheduler.spi.LockModel.LOCK_MODEL;

/**
* This class consists of the REST handler to GET a lock model for extensions
Expand Down Expand Up @@ -116,23 +113,19 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
// Prepare response
RestStatus restStatus = RestStatus.OK;
String restResponseString = lockModelResponseHolder != null ? "success" : "failed";

builder.startObject();
builder.field("response", restResponseString);
if (restResponseString.equals("success")) {
// Create Response
AcquireLockResponse acquireLockResponse = new AcquireLockResponse(
lockModelResponseHolder,
LockModel.generateLockId(jobIndexName, jobId),
lockModelResponseHolder.getSeqNo(),
lockModelResponseHolder.getPrimaryTerm()
);
acquireLockResponse.toXContent(builder, ToXContent.EMPTY_PARAMS);

// Prepare response fields
long seqNo = lockModelResponseHolder.getSeqNo();
long primaryTerm = lockModelResponseHolder.getPrimaryTerm();

builder.field(LOCK_ID, LockModel.generateLockId(jobIndexName, jobId));
builder.field(LOCK_MODEL, lockModelResponseHolder);
builder.field(SEQUENCE_NUMBER, seqNo);
builder.field(PRIMARY_TERM, primaryTerm);
} else {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
builder.endObject();
bytesRestResponse = new BytesRestResponse(restStatus, builder);
channel.sendResponse(bytesRestResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentParserUtils;

/**
* Request from extensions to acquire a lock for scheduled job execution
*/
public class AcquireLockRequest extends ActionRequest {
public class AcquireLockRequest extends ActionRequest implements ToXContentObject {

/**
* the id of the job
Expand Down Expand Up @@ -122,4 +124,14 @@ public static AcquireLockRequest parse(XContentParser parser) throws IOException
return new AcquireLockRequest(jobId, jobIndexName, lockDurationSeconds);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID, jobId);
builder.field(JOB_INDEX_NAME, jobIndexName);
builder.field(LOCK_DURATION_SECONDS, lockDurationSeconds);
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport;

import java.io.IOException;

import org.opensearch.common.xcontent.XContentParserUtils;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.jobscheduler.spi.LockModel;

import static java.util.Objects.requireNonNull;

/**
* Response class used to facilitate serialization/deserialization of the GetLock response
*/
public class AcquireLockResponse implements ToXContentObject {
private final LockModel lock;
private final String lockId;
private final long seqNo;
private final long primaryTerm;

public AcquireLockResponse(final LockModel lock, final String lockId, final long seqNo, final long primaryTerm) {
this.lock = lock;
this.lockId = lockId;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public LockModel getLock() {
return this.lock;
}

public String getLockId() {
return this.lockId;
}

public long getSeqNo() {
return this.seqNo;
}

public long getPrimaryTerm() {
return this.primaryTerm;
}

public static AcquireLockResponse parse(final XContentParser parser) throws IOException {
LockModel lock = null;
String lockId = null;
Long seqNo = null;
Long primaryTerm = null;

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case LockModel.LOCK_ID:
lockId = parser.text();
break;
case LockModel.SEQUENCE_NUMBER:
seqNo = parser.longValue();
break;
case LockModel.PRIMARY_TERM:
primaryTerm = parser.longValue();
break;
case LockModel.LOCK_MODEL:
lock = LockModel.parse(parser, seqNo, primaryTerm);
break;
default:
throw new IllegalArgumentException("Unknown field " + fieldName);
}
}
return new AcquireLockResponse(
requireNonNull(lock, "LockModel cannot be null"),
requireNonNull(lockId, "LockId cannot be null"),
requireNonNull(seqNo, "Sequence Number cannot be null"),
requireNonNull(primaryTerm, "Primary Term cannot be null")
);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(LockModel.LOCK_ID, lockId);
builder.field(LockModel.SEQUENCE_NUMBER, seqNo);
builder.field(LockModel.PRIMARY_TERM, primaryTerm);
builder.field(LockModel.LOCK_MODEL, lock);
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@

import com.google.common.collect.ImmutableMap;

import static org.junit.Assert.assertEquals;

import java.util.Map;
import java.io.IOException;

import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.jobscheduler.ODFERestTestCase;
import org.opensearch.jobscheduler.TestHelpers;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.transport.AcquireLockResponse;
import org.opensearch.test.OpenSearchIntegTestCase;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2)
Expand Down Expand Up @@ -46,7 +48,7 @@ public void setUp() throws Exception {

public void testGetLockRestAPI() throws Exception {

String initialLockId = validateResponseAndGetLockId(entityAsMap(this.initialGetLockResponse));
String initialLockId = validateResponseAndGetLockId(initialGetLockResponse);
assertEquals(TestHelpers.generateExpectedLockId(initialJobIndexName, initialJobId), initialLockId);

// Submit 10 requests to generate new lock models for different job indexes
Expand All @@ -60,13 +62,25 @@ public void testGetLockRestAPI() throws Exception {
null
);

String lockId = validateResponseAndGetLockId(entityAsMap(getLockResponse));
String lockId = validateResponseAndGetLockId(getLockResponse);

assertEquals(TestHelpers.generateExpectedLockId(String.valueOf(i), String.valueOf(i)), lockId);
}
}

private String validateResponseAndGetLockId(Map<String, Object> responseMap) {
assertEquals("success", responseMap.get("response"));
return (String) responseMap.get(LockModel.LOCK_ID);
private String validateResponseAndGetLockId(Response response) throws IOException {

XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getEntity().getContent());

AcquireLockResponse acquireLockResponse = AcquireLockResponse.parse(parser);

// Validate response map fields
assertNotNull(acquireLockResponse.getLockId());
assertNotNull(acquireLockResponse.getSeqNo());
assertNotNull(acquireLockResponse.getPrimaryTerm());
assertNotNull(acquireLockResponse.getLock());

return acquireLockResponse.getLockId();
}
}

0 comments on commit 4649c3d

Please sign in to comment.