Skip to content

Commit

Permalink
Fix wrong 503 error response code
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Feb 2, 2024
1 parent 3d17b63 commit 82e6129
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
package org.opensearch.sql.datasources.rest;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.rest.RestStatus.NOT_FOUND;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.*;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -293,7 +293,7 @@ private void handleException(Exception e, RestChannel restChannel) {
reportError(restChannel, e, BAD_REQUEST);
} else {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS);
reportError(restChannel, e, SERVICE_UNAVAILABLE);
reportError(restChannel, e, INTERNAL_SERVER_ERROR);

Check warning on line 296 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L296

Added line #L296 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void queryExceedResourceLimitShouldFail() throws IOException {
String query = String.format("search source=%s age=20", TEST_INDEX_DOG);

ResponseException exception = expectThrows(ResponseException.class, () -> executeQuery(query));
assertEquals(503, exception.getResponse().getStatusLine().getStatusCode());
assertEquals(500, exception.getResponse().getStatusLine().getStatusCode());
assertThat(
exception.getMessage(),
Matchers.containsString("resource is not enough to run the" + " query, quit."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.opensearch.sql.legacy.plugin;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;

import com.alibaba.druid.sql.parser.ParserException;
import com.google.common.collect.ImmutableList;
Expand All @@ -23,6 +23,7 @@
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchSecurityException;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.inject.Injector;
Expand Down Expand Up @@ -171,21 +172,23 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
QueryAction queryAction = explainRequest(client, sqlRequest, format);
executeSqlRequest(request, queryAction, client, restChannel);
} catch (Exception e) {
logAndPublishMetrics(e);
reportError(restChannel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
handleException(restChannel, e);
}
},
(restChannel, exception) -> {
logAndPublishMetrics(exception);
reportError(
restChannel,
exception,
isClientError(exception) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
});
this::handleException);
} catch (Exception e) {
logAndPublishMetrics(e);
return channel ->
reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
return channel -> handleException(channel, e);
}
}

private void handleException(RestChannel restChannel, Exception exception) {
logAndPublishMetrics(exception);
if (exception instanceof OpenSearchSecurityException) {
OpenSearchSecurityException securityException = (OpenSearchSecurityException) exception;
reportError(restChannel, exception, securityException.status());
} else {
reportError(
restChannel, exception, isClientError(exception) ? BAD_REQUEST : INTERNAL_SERVER_ERROR);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.sql.legacy.plugin;

import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
Expand Down Expand Up @@ -84,8 +84,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel ->
channel.sendResponse(
new BytesRestResponse(
SERVICE_UNAVAILABLE,
ErrorMessageFactory.createErrorMessage(e, SERVICE_UNAVAILABLE.getStatus())
INTERNAL_SERVER_ERROR,
ErrorMessageFactory.createErrorMessage(e, INTERNAL_SERVER_ERROR.getStatus())
.toString()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,15 @@ public void collect(
indexToType.put(tableName, null);
} else if (sqlExprTableSource.getExpr() instanceof SQLBinaryOpExpr) {
SQLBinaryOpExpr sqlBinaryOpExpr = (SQLBinaryOpExpr) sqlExprTableSource.getExpr();
tableName = ((SQLIdentifierExpr) sqlBinaryOpExpr.getLeft()).getName();
SQLExpr leftSideOfExpression = sqlBinaryOpExpr.getLeft();
if (leftSideOfExpression instanceof SQLIdentifierExpr) {
tableName = ((SQLIdentifierExpr) sqlBinaryOpExpr.getLeft()).getName();
} else {
throw new ParserException(
"Left side of the expression ["
+ leftSideOfExpression.toString()
+ "] is expected to be an identifier");
}
SQLExpr rightSideOfExpression = sqlBinaryOpExpr.getRight();

// This assumes that right side of the expression is different name in query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.parser.ParserException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -100,6 +101,15 @@ public void testSelectTheFieldWithConflictMappingShouldThrowException() {
rewriteTerm(sql);
}

@Test
public void testIssue2391_WithWrongBinaryOperation() {
String sql = "SELECT * from I_THINK/IM/A_URL";
exception.expect(ParserException.class);
exception.expectMessage(
"Left side of the expression [I_THINK / IM] is expected to be an identifier");
rewriteTerm(sql);
}

private String rewriteTerm(String sql) {
SQLQueryExpr sqlQueryExpr = SqlParserUtils.parse(sql);
sqlQueryExpr.accept(new TermFieldRewriter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
Expand Down Expand Up @@ -130,7 +129,7 @@ public void onFailure(Exception e) {
Metrics.getInstance()
.getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS)
.increment();
reportError(channel, e, SERVICE_UNAVAILABLE);
reportError(channel, e, INTERNAL_SERVER_ERROR);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.sql.plugin.rest;

import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,8 +79,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel ->
channel.sendResponse(
new BytesRestResponse(
SERVICE_UNAVAILABLE,
ErrorMessageFactory.createErrorMessage(e, SERVICE_UNAVAILABLE.getStatus())
INTERNAL_SERVER_ERROR,
ErrorMessageFactory.createErrorMessage(e, INTERNAL_SERVER_ERROR.getStatus())
.toString()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.StateStore;
Expand All @@ -22,6 +25,9 @@ public class OpensearchAsyncQueryJobMetadataStorageService

private final StateStore stateStore;

private static final Logger LOGGER =
LogManager.getLogger(OpensearchAsyncQueryJobMetadataStorageService.class);

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
Expand All @@ -30,8 +36,13 @@ public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
AsyncQueryId queryId = new AsyncQueryId(qid);
return StateStore.getJobMetaData(stateStore, queryId.getDataSourceName())
.apply(queryId.docId());
try {
AsyncQueryId queryId = new AsyncQueryId(qid);
return StateStore.getJobMetaData(stateStore, queryId.getDataSourceName())
.apply(queryId.docId());
} catch (Exception e) {
LOGGER.error("Error while fetching the job metadata.", e);
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", qid));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ public static AsyncQueryId newAsyncQueryId(String datasourceName) {
return new AsyncQueryId(encode(datasourceName));
}

public static boolean isValidQueryId(String id) {
try {
decode(id);
return true;
} catch (Throwable e) {
return false;

Check warning on line 27 in spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryId.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryId.java#L24-L27

Added lines #L24 - L27 were not covered by tests
}
}

public String getDataSourceName() {
return decode(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.sql.spark.rest;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.rest.RestStatus.TOO_MANY_REQUESTS;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
Expand All @@ -26,10 +26,12 @@
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -112,12 +114,12 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
}
}

private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CREATE_API_REQUEST_COUNT);
CreateAsyncQueryRequest submitJobRequest =
CreateAsyncQueryRequest.fromXContentParser(restRequest.contentParser());
return restChannel ->
private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) {
return restChannel -> {

Check warning on line 118 in spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java#L118

Added line #L118 was not covered by tests
try {
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CREATE_API_REQUEST_COUNT);
CreateAsyncQueryRequest submitJobRequest =
CreateAsyncQueryRequest.fromXContentParser(restRequest.contentParser());

Check warning on line 122 in spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java#L120-L122

Added lines #L120 - L122 were not covered by tests
Scheduler.schedule(
nodeClient,
() ->
Expand All @@ -140,6 +142,10 @@ public void onFailure(Exception e) {
handleException(e, restChannel, restRequest.method());
}
}));
} catch (Exception e) {
handleException(e, restChannel, restRequest.method());
}
};

Check warning on line 148 in spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java#L145-L148

Added lines #L145 - L148 were not covered by tests
}

private RestChannelConsumer executeGetAsyncQueryResultRequest(
Expand Down Expand Up @@ -187,7 +193,7 @@ private void handleException(
reportError(restChannel, e, BAD_REQUEST);
addCustomerErrorMetric(requestMethod);
} else {
reportError(restChannel, e, SERVICE_UNAVAILABLE);
reportError(restChannel, e, INTERNAL_SERVER_ERROR);

Check warning on line 196 in spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java#L196

Added line #L196 was not covered by tests
addSystemErrorMetric(requestMethod);
}
}
Expand Down Expand Up @@ -227,7 +233,10 @@ private void reportError(final RestChannel channel, final Exception e, final Res
}

private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException || e instanceof IllegalStateException;
return e instanceof IllegalArgumentException
|| e instanceof IllegalStateException
|| e instanceof DataSourceNotFoundException
|| e instanceof AsyncQueryNotFoundException;
}

private void addSystemErrorMetric(RestRequest.Method requestMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,28 @@ public static CreateAsyncQueryRequest fromXContentParser(XContentParser parser)
LangType lang = null;
String datasource = null;
String sessionId = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else if (fieldName.equals("lang")) {
String langString = parser.textOrNull();
lang = LangType.fromString(langString);
} else if (fieldName.equals("datasource")) {
datasource = parser.textOrNull();
} else if (fieldName.equals(SESSION_ID)) {
sessionId = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
try {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else if (fieldName.equals("lang")) {
String langString = parser.textOrNull();
lang = LangType.fromString(langString);
} else if (fieldName.equals("datasource")) {
datasource = parser.textOrNull();
} else if (fieldName.equals(SESSION_ID)) {
sessionId = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);

Check warning on line 59 in spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java#L59

Added line #L59 was not covered by tests
}
}
return new CreateAsyncQueryRequest(query, datasource, lang, sessionId);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Error while parsing the request body: %s", e.getMessage()));

Check warning on line 65 in spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java#L63-L65

Added lines #L63 - L65 were not covered by tests
}
return new CreateAsyncQueryRequest(query, datasource, lang, sessionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.StateStore;
Expand All @@ -22,6 +24,7 @@ public class OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest
public static final String DS_NAME = "mys3";
private static final String MOCK_SESSION_ID = "sessionId";
private static final String MOCK_RESULT_INDEX = "resultIndex";
private static final String MOCK_QUERY_ID = "00fdo6u94n7abo0q";
private OpensearchAsyncQueryJobMetadataStorageService opensearchJobMetadataStorageService;

@Before
Expand Down Expand Up @@ -69,4 +72,25 @@ public void testStoreJobMetadataWithResultExtraData() {
assertEquals("resultIndex", actual.get().getResultIndex());
assertEquals(MOCK_SESSION_ID, actual.get().getSessionId());
}

@Test
public void testGetJobMetadataWithMalformedQueryId() {
AsyncQueryNotFoundException asyncQueryNotFoundException =
Assertions.assertThrows(
AsyncQueryNotFoundException.class,
() -> opensearchJobMetadataStorageService.getJobMetadata(MOCK_QUERY_ID));
Assertions.assertEquals(
String.format("Invalid QueryId: %s", MOCK_QUERY_ID),
asyncQueryNotFoundException.getMessage());
}

@Test
public void testGetJobMetadataWithEmptyQueryId() {
AsyncQueryNotFoundException asyncQueryNotFoundException =
Assertions.assertThrows(
AsyncQueryNotFoundException.class,
() -> opensearchJobMetadataStorageService.getJobMetadata(""));
Assertions.assertEquals("Invalid QueryId: ", asyncQueryNotFoundException.getMessage());
}

}

0 comments on commit 82e6129

Please sign in to comment.