Skip to content

Commit

Permalink
Move classes from spark to async-query-core and async-query
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Jun 11, 2024
1 parent b959039 commit e36f894
Show file tree
Hide file tree
Showing 219 changed files with 265 additions and 202 deletions.
21 changes: 19 additions & 2 deletions async-query-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tasks.register('downloadG4Files', Exec) {
}

generateGrammarSource {
arguments += ['-visitor', '-package', 'org.opensearch.sql.asyncquery.antlr.parser']
arguments += ['-visitor', '-package', 'org.opensearch.sql.spark.antlr.parser']
source = sourceSets.main.antlr
outputDirectory = file("build/generated-src/antlr/main/org/opensearch/sql/asyncquery/antlr/parser")
}
Expand All @@ -44,6 +44,11 @@ generateGrammarSource.dependsOn downloadG4Files
dependencies {
antlr "org.antlr:antlr4:4.7.1"

implementation project(':core')
implementation project(':spark') // TODO: dependency to spark should be eliminated
implementation project(':datasources') // TODO: dependency to datasources should be eliminated
implementation project(':legacy') // TODO: dependency to legacy should be eliminated
implementation group: 'org.json', name: 'json', version: '20231013'
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}"
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}"
Expand Down Expand Up @@ -108,7 +113,19 @@ jacocoTestCoverageVerification {
violationRules {
rule {
element = 'CLASS'
excludes = []
// TODO: Add unit tests in async-query-core and remove exclusions
excludes = [
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.dispatcher.*',
'org.opensearch.sql.spark.execution.session.*',
'org.opensearch.sql.spark.execution.statement.*',
'org.opensearch.sql.spark.flint.*',
'org.opensearch.sql.spark.flint.operation.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.utils.SQLQueryUtils.*'
]
limit {
counter = 'LINE'
minimum = 1.0
Expand Down
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.sql.spark.data.constants;

public class SparkConstants {
public static final String EMR = "emr";
public static final String STEP_ID_FIELD = "stepId.keyword";

public static final String JOB_ID_FIELD = "jobRunId";

Expand All @@ -21,16 +19,11 @@ public class SparkConstants {
public static final String SPARK_SQL_APPLICATION_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.3.0-SNAPSHOT.jar";
public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.3.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_DEFAULT_CLUSTER_NAME = "opensearch-cluster";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
public static final String FLINT_DEFAULT_AUTH = "noauth";
public static final String FLINT_DEFAULT_REGION = "us-west-2";
public static final String DEFAULT_CLASS_NAME = "org.apache.spark.sql.FlintJob";
public static final String S3_AWS_CREDENTIALS_PROVIDER_KEY =
"spark.hadoop.fs.s3.customAWSCredentialsProvider";
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.config.OpenSearchSparkSubmitParameterModifier;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkSubmitParameterModifier;
Expand Down Expand Up @@ -114,8 +114,10 @@ void testCreateAsyncQuery() {

@Test
void testCreateAsyncQueryWithExtraSparkSubmitParameter() {
OpenSearchSparkSubmitParameterModifier modifier =
new OpenSearchSparkSubmitParameterModifier("--conf spark.dynamicAllocation.enabled=false");
SparkSubmitParameterModifier modifier =
(SparkSubmitParameters parameters) -> {
parameters.setExtraParameters("--conf spark.dynamicAllocation.enabled=false");
};
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(
SparkExecutionEngineConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.constants;

public class TestConstants {
public static final String QUERY = "select 1";
public static final String EMR_JOB_ID = "job-123xxx";
public static final String EMRS_APPLICATION_ID = "app-xxxxx";
public static final String EMRS_EXECUTION_ROLE = "execution_role";
public static final String EMRS_JOB_NAME = "job_name";
public static final String SPARK_SUBMIT_PARAMETERS = "--conf org.flint.sql.SQLJob";
public static final String TEST_CLUSTER_NAME = "TEST_CLUSTER";
public static final String MOCK_SESSION_ID = "s-0123456";
public static final String MOCK_STATEMENT_ID = "st-0123456";
public static final String ENTRY_POINT_START_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.3.0-SNAPSHOT.jar";
public static final String DEFAULT_RESULT_INDEX = "query_execution_result_ds1";
public static final String US_EAST_REGION = "us-east-1";
public static final String US_WEST_REGION = "us-west-1";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.leasemanager;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class ConcurrencyLimitExceededExceptionTest {
@Test
public void test() {
ConcurrencyLimitExceededException e = new ConcurrencyLimitExceededException("Test");

assertEquals("Test", e.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class IDUtilsTest {
public static final String DATASOURCE_NAME = "DATASOURCE_NAME";

@Test
public void encodeAndDecode() {
String id = IDUtils.encode(DATASOURCE_NAME);
String decoded = IDUtils.decode(id);

assertTrue(id.length() > IDUtils.PREFIX_LEN);
assertEquals(DATASOURCE_NAME, decoded);
}

@Test
public void generateUniqueIds() {
String id1 = IDUtils.encode(DATASOURCE_NAME);
String id2 = IDUtils.encode(DATASOURCE_NAME);

assertNotEquals(id1, id2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class RealTimeProviderTest {
@Test
public void testCurrentEpochMillis() {
RealTimeProvider realTimeProvider = new RealTimeProvider();

assertTrue(realTimeProvider.currentEpochMillis() > 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URL;
import java.util.Objects;
import lombok.SneakyThrows;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;

public class TestUtils {
public static String getJson(String filename) throws IOException {
ClassLoader classLoader = TestUtils.class.getClassLoader();
return new String(
Objects.requireNonNull(classLoader.getResourceAsStream(filename)).readAllBytes());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
12 changes: 12 additions & 0 deletions async-query-core/src/test/resources/select_query_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"data": {
"result": [
"{'1':1}"
],
"schema": [
"{'column_name':'1','data_type':'integer'}"
],
"stepId": "s-123456789",
"applicationId": "application-abc"
}
}
2 changes: 1 addition & 1 deletion async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repositories {

dependencies {
api project(':core')
implementation project(':async-query-core')
api project(':async-query-core')
implementation project(':protocol')
implementation project(':datasources')
implementation project(':legacy')
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import lombok.AllArgsConstructor;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec {
@Before
public void doSetUp() {
mockIndexState =
new MockFlintSparkJob(flintIndexStateModelService, mockIndex.latestId, MYS3_DATASOURCE);
new MockFlintSparkJob(
flintIndexStateModelService,
mockIndex.latestId,
MYS3_DATASOURCE);
}

@Test
Expand Down Expand Up @@ -439,7 +442,8 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) {
});
this.createQueryResponse =
queryService.createAsyncQuery(
new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null),
new CreateAsyncQueryRequest(
query, MYS3_DATASOURCE, LangType.SQL, null),
asyncQueryRequestContext);
}

Expand Down Expand Up @@ -515,7 +519,10 @@ void emrJobWriteResultDoc(Map<String, Object> resultDoc) {

/** Simulate EMR-S updates query_execution_request with state */
void emrJobUpdateStatementState(StatementState newState) {
StatementModel stmt = statementStorageService.getStatement(queryId, MYS3_DATASOURCE).get();
StatementModel stmt =
statementStorageService
.getStatement(queryId, MYS3_DATASOURCE)
.get();
statementStorageService.updateStatementState(stmt, newState);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.spark.cluster;

import static org.opensearch.sql.datasource.model.DataSourceStatus.DISABLED;
import static org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec.MYGLUE_DATASOURCE;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.constants;

public class TestConstants {
public static final String TEST_DATASOURCE_NAME = "test_datasource_name";
public static final String EMR_JOB_ID = "job-123xxx";
public static final String EMRS_APPLICATION_ID = "app-xxxxx";
public static final String EMRS_EXECUTION_ROLE = "execution_role";
public static final String SPARK_SUBMIT_PARAMETERS = "--conf org.flint.sql.SQLJob";
public static final String TEST_CLUSTER_NAME = "TEST_CLUSTER";
public static final String MOCK_SESSION_ID = "s-0123456";
public static final String US_WEST_REGION = "us-west-1";
}
Loading

0 comments on commit e36f894

Please sign in to comment.