Skip to content

Commit

Permalink
Minor Refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Oct 17, 2023
1 parent b76a15e commit ed77804
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 124 deletions.
2 changes: 1 addition & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,6 @@ primaryExpression
| qualifiedName DOT ASTERISK #star
| LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor
| LEFT_PAREN query RIGHT_PAREN #subqueryExpression
| IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN #identifierClause
| functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
(COMMA argument+=functionArgument)*)? RIGHT_PAREN
(FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)?
Expand Down Expand Up @@ -1196,6 +1195,7 @@ qualifiedNameList

functionName
: IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN
| identFunc=IDENTIFIER_KW // IDENTIFIER itself is also a valid function name.
| qualifiedName
| FILTER
| LEFT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* This POJO carries all the fields required for emr serverless job submission. Used as model in
* {@link EMRServerlessClient} interface.
*/
@Data
@EqualsAndHashCode
public class StartJobRequest {

public static final Long DEFAULT_JOB_TIMEOUT = 120L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class SparkQueryDispatcherTest {

private SparkQueryDispatcher sparkQueryDispatcher;

@Captor ArgumentCaptor<StartJobRequest> startJobRequestArgumentCaptor;

@BeforeEach
void setUp() {
sparkQueryDispatcher =
Expand Down Expand Up @@ -125,23 +129,24 @@ void testDispatchSelectQuery() {
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags,
false,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags,
false,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -183,24 +188,25 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"basicauth",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AUTH_USERNAME, "username");
put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password");
}
}),
tags,
false,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"basicauth",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AUTH_USERNAME, "username");
put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password");
}
}),
tags,
false,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -240,22 +246,23 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() {
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"noauth",
new HashMap<>() {
{
}
}),
tags,
false,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"noauth",
new HashMap<>() {
{
}
}),
tags,
false,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -302,24 +309,25 @@ void testDispatchIndexQuery() {
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -361,23 +369,24 @@ void testDispatchWithPPLQuery() {
LangType.PPL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags,
false,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags,
false,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -419,23 +428,24 @@ void testDispatchQueryWithoutATableAndDataSourceName() {
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags,
false,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:non-index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags,
false,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -483,24 +493,25 @@ void testDispatchIndexQueryWithoutADatasourceName() {
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1))
.startJobRun(
new StartJobRequest(
query,
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true,
any()));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
StartJobRequest expected =
new StartJobRequest(
query,
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true,
null);
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery());
verifyNoInteractions(flintIndexMetadataReader);
Expand Down Expand Up @@ -905,8 +916,8 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog "
+ authParamConfigBuilder
+ " --conf spark.flint.datasource.name=my_glue ";
+ " --conf spark.flint.datasource.name=my_glue "
+ authParamConfigBuilder;
}

private String withStructuredStreaming(String parameters) {
Expand Down

0 comments on commit ed77804

Please sign in to comment.