From 0287f62cb04162c71deb4985a814f1f292feb5a3 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Fri, 26 Apr 2024 16:26:28 +0000 Subject: [PATCH] Delete Spark datasource Signed-off-by: Tomoyuki Morita --- .../org/opensearch/sql/plugin/SQLPlugin.java | 2 - .../SparkSqlFunctionImplementation.java | 106 ------- .../SparkSqlTableFunctionResolver.java | 81 ----- .../SparkSqlFunctionTableScanBuilder.java | 32 -- .../SparkSqlFunctionTableScanOperator.java | 69 ----- .../sql/spark/storage/SparkScan.java | 50 --- .../sql/spark/storage/SparkStorageEngine.java | 32 -- .../spark/storage/SparkStorageFactory.java | 132 -------- .../sql/spark/storage/SparkTable.java | 62 ---- .../SparkSqlFunctionImplementationTest.java | 78 ----- .../SparkSqlFunctionTableScanBuilderTest.java | 46 --- ...SparkSqlFunctionTableScanOperatorTest.java | 292 ------------------ .../SparkSqlTableFunctionResolverTest.java | 140 --------- .../sql/spark/storage/SparkScanTest.java | 40 --- .../spark/storage/SparkStorageEngineTest.java | 46 --- .../storage/SparkStorageFactoryTest.java | 182 ----------- .../sql/spark/storage/SparkTableTest.java | 77 ----- spark/src/test/resources/all_data_type.json | 22 -- spark/src/test/resources/issue2210.json | 17 - spark/src/test/resources/spark_data_type.json | 13 - 20 files changed, 1519 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java delete mode 100644 spark/src/test/resources/all_data_type.json delete mode 100644 spark/src/test/resources/issue2210.json delete mode 100644 spark/src/test/resources/spark_data_type.json diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 08386b797e..bc0a084f8c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -84,7 +84,6 @@ import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; -import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportGetAsyncQueryResultAction; @@ -285,7 +284,6 @@ private DataSourceServiceImpl createDataSourceService() { new OpenSearchDataSourceFactory( new OpenSearchNodeClient(this.client), pluginSettings)) .add(new PrometheusStorageFactory(pluginSettings)) - .add(new SparkStorageFactory(this.client, pluginSettings)) .add(new GlueDataSourceFactory(pluginSettings)) .build(), dataSourceMetadataStorage, diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java deleted file mode 100644 index 914aa80085..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.implementation; - -import static org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver.QUERY; - -import java.util.List; -import java.util.stream.Collectors; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.exception.ExpressionEvaluationException; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.FunctionExpression; -import org.opensearch.sql.expression.NamedArgumentExpression; -import org.opensearch.sql.expression.env.Environment; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.expression.function.TableFunctionImplementation; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkTable; -import org.opensearch.sql.storage.Table; - -/** Spark SQL function implementation. */ -public class SparkSqlFunctionImplementation extends FunctionExpression - implements TableFunctionImplementation { - - private final FunctionName functionName; - private final List arguments; - private final SparkClient sparkClient; - - /** - * Constructor for spark sql function. - * - * @param functionName name of the function - * @param arguments a list of expressions - * @param sparkClient spark client - */ - public SparkSqlFunctionImplementation( - FunctionName functionName, List arguments, SparkClient sparkClient) { - super(functionName, arguments); - this.functionName = functionName; - this.arguments = arguments; - this.sparkClient = sparkClient; - } - - @Override - public ExprValue valueOf(Environment valueEnv) { - throw new UnsupportedOperationException( - String.format( - "Spark defined function [%s] is only " - + "supported in SOURCE clause with spark connector catalog", - functionName)); - } - - @Override - public ExprType type() { - return ExprCoreType.STRUCT; - } - - @Override - public String toString() { - List args = - arguments.stream() - .map( - arg -> - String.format( - "%s=%s", - ((NamedArgumentExpression) arg).getArgName(), - ((NamedArgumentExpression) arg).getValue().toString())) - .collect(Collectors.toList()); - return String.format("%s(%s)", functionName, String.join(", ", args)); - } - - @Override - public Table applyArguments() { - return new SparkTable(sparkClient, buildQueryFromSqlFunction(arguments)); - } - - /** - * This method builds a spark query request. - * - * @param arguments spark sql function arguments - * @return spark query request - */ - private SparkQueryRequest buildQueryFromSqlFunction(List arguments) { - - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - arguments.forEach( - arg -> { - String argName = ((NamedArgumentExpression) arg).getArgName(); - Expression argValue = ((NamedArgumentExpression) arg).getValue(); - ExprValue literalValue = argValue.valueOf(); - if (argName.equals(QUERY)) { - sparkQueryRequest.setSql((String) literalValue.value()); - } else { - throw new ExpressionEvaluationException( - String.format("Invalid Function Argument:%s", argName)); - } - }); - return sparkQueryRequest; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java deleted file mode 100644 index a4f2a6c0fe..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.resolver; - -import static org.opensearch.sql.data.type.ExprCoreType.STRING; - -import java.util.ArrayList; -import java.util.List; -import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.sql.exception.SemanticCheckException; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.NamedArgumentExpression; -import org.opensearch.sql.expression.function.FunctionBuilder; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.expression.function.FunctionSignature; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; - -/** Function resolver for sql function of spark connector. */ -@RequiredArgsConstructor -public class SparkSqlTableFunctionResolver implements FunctionResolver { - private final SparkClient sparkClient; - - public static final String SQL = "sql"; - public static final String QUERY = "query"; - - @Override - public Pair resolve(FunctionSignature unresolvedSignature) { - FunctionName functionName = FunctionName.of(SQL); - FunctionSignature functionSignature = new FunctionSignature(functionName, List.of(STRING)); - final List argumentNames = List.of(QUERY); - - FunctionBuilder functionBuilder = - (functionProperties, arguments) -> { - Boolean argumentsPassedByName = - arguments.stream() - .noneMatch( - arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - Boolean argumentsPassedByPosition = - arguments.stream() - .allMatch( - arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - if (!(argumentsPassedByName || argumentsPassedByPosition)) { - throw new SemanticCheckException( - "Arguments should be either passed by name or position"); - } - - if (arguments.size() != argumentNames.size()) { - throw new SemanticCheckException( - String.format( - "Missing arguments:[%s]", - String.join( - ",", argumentNames.subList(arguments.size(), argumentNames.size())))); - } - - if (argumentsPassedByPosition) { - List namedArguments = new ArrayList<>(); - for (int i = 0; i < arguments.size(); i++) { - namedArguments.add( - new NamedArgumentExpression( - argumentNames.get(i), - ((NamedArgumentExpression) arguments.get(i)).getValue())); - } - return new SparkSqlFunctionImplementation(functionName, namedArguments, sparkClient); - } - return new SparkSqlFunctionImplementation(functionName, arguments, sparkClient); - }; - return Pair.of(functionSignature, functionBuilder); - } - - @Override - public FunctionName getFunctionName() { - return FunctionName.of(SQL); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java deleted file mode 100644 index aea8f72f36..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.scan; - -import lombok.AllArgsConstructor; -import org.opensearch.sql.planner.logical.LogicalProject; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; -import org.opensearch.sql.storage.read.TableScanBuilder; - -/** TableScanBuilder for sql function of spark connector. */ -@AllArgsConstructor -public class SparkSqlFunctionTableScanBuilder extends TableScanBuilder { - - private final SparkClient sparkClient; - - private final SparkQueryRequest sparkQueryRequest; - - @Override - public TableScanOperator build() { - return new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - } - - @Override - public boolean pushDownProject(LogicalProject project) { - return true; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java deleted file mode 100644 index a2e44affd5..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.scan; - -import java.io.IOException; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Locale; -import lombok.RequiredArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.response.DefaultSparkSqlFunctionResponseHandle; -import org.opensearch.sql.spark.functions.response.SparkSqlFunctionResponseHandle; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; - -/** This a table scan operator to handle sql table function. */ -@RequiredArgsConstructor -public class SparkSqlFunctionTableScanOperator extends TableScanOperator { - private final SparkClient sparkClient; - private final SparkQueryRequest request; - private SparkSqlFunctionResponseHandle sparkResponseHandle; - private static final Logger LOG = LogManager.getLogger(); - - @Override - public void open() { - super.open(); - this.sparkResponseHandle = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - try { - JSONObject responseObject = sparkClient.sql(request.getSql()); - return new DefaultSparkSqlFunctionResponseHandle(responseObject); - } catch (IOException e) { - LOG.error(e.getMessage()); - throw new RuntimeException( - String.format("Error fetching data from spark server: %s", e.getMessage())); - } - }); - } - - @Override - public boolean hasNext() { - return this.sparkResponseHandle.hasNext(); - } - - @Override - public ExprValue next() { - return this.sparkResponseHandle.next(); - } - - @Override - public String explain() { - return String.format(Locale.ROOT, "sql(%s)", request.getSql()); - } - - @Override - public ExecutionEngine.Schema schema() { - return this.sparkResponseHandle.schema(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java deleted file mode 100644 index 395e1685a6..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; - -/** Spark scan operator. */ -@EqualsAndHashCode(onlyExplicitlyIncluded = true, callSuper = false) -@ToString(onlyExplicitlyIncluded = true) -public class SparkScan extends TableScanOperator { - - private final SparkClient sparkClient; - - @EqualsAndHashCode.Include @Getter @Setter @ToString.Include private SparkQueryRequest request; - - /** - * Constructor. - * - * @param sparkClient sparkClient. - */ - public SparkScan(SparkClient sparkClient) { - this.sparkClient = sparkClient; - this.request = new SparkQueryRequest(); - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public ExprValue next() { - return null; - } - - @Override - public String explain() { - return getRequest().toString(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java deleted file mode 100644 index 84c9c05e79..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import java.util.Collection; -import java.util.Collections; -import lombok.RequiredArgsConstructor; -import org.opensearch.sql.DataSourceSchemaName; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.Table; - -/** Spark storage engine implementation. */ -@RequiredArgsConstructor -public class SparkStorageEngine implements StorageEngine { - private final SparkClient sparkClient; - - @Override - public Collection getFunctions() { - return Collections.singletonList(new SparkSqlTableFunctionResolver(sparkClient)); - } - - @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { - throw new RuntimeException("Unable to get table from storage engine."); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java deleted file mode 100644 index 467bacbaea..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD; - -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; -import java.security.AccessController; -import java.security.InvalidParameterException; -import java.security.PrivilegedAction; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.opensearch.client.Client; -import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.AuthenticationType; -import org.opensearch.sql.spark.client.EmrClientImpl; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.helper.FlintHelper; -import org.opensearch.sql.spark.response.SparkResponse; -import org.opensearch.sql.storage.DataSourceFactory; -import org.opensearch.sql.storage.StorageEngine; - -/** Storage factory implementation for spark connector. */ -@RequiredArgsConstructor -public class SparkStorageFactory implements DataSourceFactory { - private final Client client; - private final Settings settings; - - // Spark datasource configuration properties - public static final String CONNECTOR_TYPE = "spark.connector"; - public static final String SPARK_SQL_APPLICATION = "spark.sql.application"; - - // EMR configuration properties - public static final String EMR_CLUSTER = "emr.cluster"; - public static final String EMR_AUTH_TYPE = "emr.auth.type"; - public static final String EMR_REGION = "emr.auth.region"; - public static final String EMR_ROLE_ARN = "emr.auth.role_arn"; - public static final String EMR_ACCESS_KEY = "emr.auth.access_key"; - public static final String EMR_SECRET_KEY = "emr.auth.secret_key"; - - // Flint integration jar configuration properties - public static final String FLINT_INTEGRATION = "spark.datasource.flint.integration"; - public static final String FLINT_HOST = "spark.datasource.flint.host"; - public static final String FLINT_PORT = "spark.datasource.flint.port"; - public static final String FLINT_SCHEME = "spark.datasource.flint.scheme"; - public static final String FLINT_AUTH = "spark.datasource.flint.auth"; - public static final String FLINT_REGION = "spark.datasource.flint.region"; - - @Override - public DataSourceType getDataSourceType() { - return DataSourceType.SPARK; - } - - @Override - public DataSource createDataSource(DataSourceMetadata metadata) { - return new DataSource( - metadata.getName(), DataSourceType.SPARK, getStorageEngine(metadata.getProperties())); - } - - /** - * This function gets spark storage engine. - * - * @param requiredConfig spark config options - * @return spark storage engine object - */ - StorageEngine getStorageEngine(Map requiredConfig) { - SparkClient sparkClient; - if (requiredConfig.get(CONNECTOR_TYPE).equals(EMR)) { - sparkClient = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - validateEMRConfigProperties(requiredConfig); - return new EmrClientImpl( - getEMRClient( - requiredConfig.get(EMR_ACCESS_KEY), - requiredConfig.get(EMR_SECRET_KEY), - requiredConfig.get(EMR_REGION)), - requiredConfig.get(EMR_CLUSTER), - new FlintHelper( - requiredConfig.get(FLINT_INTEGRATION), - requiredConfig.get(FLINT_HOST), - requiredConfig.get(FLINT_PORT), - requiredConfig.get(FLINT_SCHEME), - requiredConfig.get(FLINT_AUTH), - requiredConfig.get(FLINT_REGION)), - new SparkResponse(client, null, STEP_ID_FIELD), - requiredConfig.get(SPARK_SQL_APPLICATION)); - }); - } else { - throw new InvalidParameterException("Spark connector type is invalid."); - } - return new SparkStorageEngine(sparkClient); - } - - private void validateEMRConfigProperties(Map dataSourceMetadataConfig) - throws IllegalArgumentException { - if (dataSourceMetadataConfig.get(EMR_CLUSTER) == null - || dataSourceMetadataConfig.get(EMR_AUTH_TYPE) == null) { - throw new IllegalArgumentException("EMR config properties are missing."); - } else if (dataSourceMetadataConfig - .get(EMR_AUTH_TYPE) - .equals(AuthenticationType.AWSSIGV4AUTH.getName()) - && (dataSourceMetadataConfig.get(EMR_ACCESS_KEY) == null - || dataSourceMetadataConfig.get(EMR_SECRET_KEY) == null)) { - throw new IllegalArgumentException("EMR auth keys are missing."); - } else if (!dataSourceMetadataConfig - .get(EMR_AUTH_TYPE) - .equals(AuthenticationType.AWSSIGV4AUTH.getName())) { - throw new IllegalArgumentException("Invalid auth type."); - } - } - - private AmazonElasticMapReduce getEMRClient( - String emrAccessKey, String emrSecretKey, String emrRegion) { - return AmazonElasticMapReduceClientBuilder.standard() - .withCredentials( - new AWSStaticCredentialsProvider(new BasicAWSCredentials(emrAccessKey, emrSecretKey))) - .withRegion(emrRegion) - .build(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java deleted file mode 100644 index 731c3df672..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import java.util.HashMap; -import java.util.Map; -import lombok.Getter; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.planner.DefaultImplementor; -import org.opensearch.sql.planner.logical.LogicalPlan; -import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.Table; -import org.opensearch.sql.storage.read.TableScanBuilder; - -/** Spark table implementation. This can be constructed from SparkQueryRequest. */ -public class SparkTable implements Table { - - private final SparkClient sparkClient; - - @Getter private final SparkQueryRequest sparkQueryRequest; - - /** Constructor for entire Sql Request. */ - public SparkTable(SparkClient sparkService, SparkQueryRequest sparkQueryRequest) { - this.sparkClient = sparkService; - this.sparkQueryRequest = sparkQueryRequest; - } - - @Override - public boolean exists() { - throw new UnsupportedOperationException( - "Exists operation is not supported in spark datasource"); - } - - @Override - public void create(Map schema) { - throw new UnsupportedOperationException( - "Create operation is not supported in spark datasource"); - } - - @Override - public Map getFieldTypes() { - return new HashMap<>(); - } - - @Override - public PhysicalPlan implement(LogicalPlan plan) { - SparkScan metricScan = new SparkScan(sparkClient); - metricScan.setRequest(sparkQueryRequest); - return plan.accept(new DefaultImplementor(), metricScan); - } - - @Override - public TableScanBuilder createScanBuilder() { - return new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java deleted file mode 100644 index 120747e0d3..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import java.util.List; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.exception.ExpressionEvaluationException; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkTable; - -@ExtendWith(MockitoExtension.class) -public class SparkSqlFunctionImplementationTest { - @Mock private SparkClient client; - - @Test - void testValueOfAndTypeToString() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList = - List.of(DSL.namedArgument("query", DSL.literal(QUERY))); - SparkSqlFunctionImplementation sparkSqlFunctionImplementation = - new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - UnsupportedOperationException exception = - assertThrows( - UnsupportedOperationException.class, () -> sparkSqlFunctionImplementation.valueOf()); - assertEquals( - "Spark defined function [sql] is only " - + "supported in SOURCE clause with spark connector catalog", - exception.getMessage()); - assertEquals("sql(query=\"select 1\")", sparkSqlFunctionImplementation.toString()); - assertEquals(ExprCoreType.STRUCT, sparkSqlFunctionImplementation.type()); - } - - @Test - void testApplyArguments() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList = - List.of(DSL.namedArgument("query", DSL.literal(QUERY))); - SparkSqlFunctionImplementation sparkSqlFunctionImplementation = - new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - SparkTable sparkTable = (SparkTable) sparkSqlFunctionImplementation.applyArguments(); - assertNotNull(sparkTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = sparkTable.getSparkQueryRequest(); - assertEquals(QUERY, sparkQueryRequest.getSql()); - } - - @Test - void testApplyArgumentsException() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList = - List.of( - DSL.namedArgument("query", DSL.literal(QUERY)), - DSL.namedArgument("tmp", DSL.literal(12345))); - SparkSqlFunctionImplementation sparkSqlFunctionImplementation = - new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - ExpressionEvaluationException exception = - assertThrows( - ExpressionEvaluationException.class, - () -> sparkSqlFunctionImplementation.applyArguments()); - assertEquals("Invalid Function Argument:tmp", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java deleted file mode 100644 index 212056eb15..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.opensearch.sql.planner.logical.LogicalProject; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanOperator; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; - -public class SparkSqlFunctionTableScanBuilderTest { - @Mock private SparkClient sparkClient; - - @Mock private LogicalProject logicalProject; - - @Test - void testBuild() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanBuilder sparkSqlFunctionTableScanBuilder = - new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - TableScanOperator sqlFunctionTableScanOperator = sparkSqlFunctionTableScanBuilder.build(); - Assertions.assertTrue( - sqlFunctionTableScanOperator instanceof SparkSqlFunctionTableScanOperator); - } - - @Test - void testPushProject() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanBuilder sparkSqlFunctionTableScanBuilder = - new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - Assertions.assertTrue(sparkSqlFunctionTableScanBuilder.pushDownProject(logicalProject)); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java deleted file mode 100644 index d44e3d271a..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; -import static org.opensearch.sql.data.model.ExprValueUtils.nullValue; -import static org.opensearch.sql.data.model.ExprValueUtils.stringValue; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; -import static org.opensearch.sql.spark.utils.TestUtils.getJson; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import lombok.SneakyThrows; -import org.json.JSONArray; -import org.json.JSONObject; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.data.model.ExprBooleanValue; -import org.opensearch.sql.data.model.ExprByteValue; -import org.opensearch.sql.data.model.ExprDateValue; -import org.opensearch.sql.data.model.ExprDoubleValue; -import org.opensearch.sql.data.model.ExprFloatValue; -import org.opensearch.sql.data.model.ExprIntegerValue; -import org.opensearch.sql.data.model.ExprLongValue; -import org.opensearch.sql.data.model.ExprNullValue; -import org.opensearch.sql.data.model.ExprShortValue; -import org.opensearch.sql.data.model.ExprStringValue; -import org.opensearch.sql.data.model.ExprTimestampValue; -import org.opensearch.sql.data.model.ExprTupleValue; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.data.type.SparkDataType; -import org.opensearch.sql.spark.data.value.SparkExprValue; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanOperator; -import org.opensearch.sql.spark.request.SparkQueryRequest; - -@ExtendWith(MockitoExtension.class) -public class SparkSqlFunctionTableScanOperatorTest { - - @Mock private SparkClient sparkClient; - - @Test - @SneakyThrows - void testEmptyQueryWithException() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenThrow(new IOException("Error Message")); - RuntimeException runtimeException = - assertThrows(RuntimeException.class, sparkSqlFunctionTableScanOperator::open); - assertEquals( - "Error fetching data from spark server: Error Message", runtimeException.getMessage()); - } - - @Test - @SneakyThrows - void testClose() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - sparkSqlFunctionTableScanOperator.close(); - } - - @Test - @SneakyThrows - void testExplain() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - Assertions.assertEquals("sql(select 1)", sparkSqlFunctionTableScanOperator.explain()); - } - - @Test - @SneakyThrows - void testQueryResponseIterator() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("select_query_response.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertTrue(sparkSqlFunctionTableScanOperator.hasNext()); - ExprTupleValue firstRow = - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("1", new ExprIntegerValue(1)); - } - }); - assertEquals(firstRow, sparkSqlFunctionTableScanOperator.next()); - Assertions.assertFalse(sparkSqlFunctionTableScanOperator.hasNext()); - } - - @Test - @SneakyThrows - void testQueryResponseAllTypes() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("all_data_type.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertTrue(sparkSqlFunctionTableScanOperator.hasNext()); - ExprTupleValue firstRow = - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("boolean", ExprBooleanValue.of(true)); - put("long", new ExprLongValue(922337203)); - put("integer", new ExprIntegerValue(2147483647)); - put("short", new ExprShortValue(32767)); - put("byte", new ExprByteValue(127)); - put("double", new ExprDoubleValue(9223372036854.775807)); - put("float", new ExprFloatValue(21474.83647)); - put("timestamp", new ExprDateValue("2023-07-01 10:31:30")); - put("date", new ExprTimestampValue("2023-07-01 10:31:30")); - put("string", new ExprStringValue("ABC")); - put("char", new SparkExprValue(new SparkDataType("char"), "A")); - } - }); - assertEquals(firstRow, sparkSqlFunctionTableScanOperator.next()); - Assertions.assertFalse(sparkSqlFunctionTableScanOperator.hasNext()); - } - - @Test - @SneakyThrows - void testQueryResponseSparkDataType() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("spark_data_type.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put( - "struct_column", - new SparkExprValue( - new SparkDataType("struct"), - new JSONObject("{\"struct_value\":\"value\"}}").toMap())); - put( - "array_column", - new SparkExprValue( - new SparkDataType("array"), new JSONArray("[1,2]").toList())); - } - }), - sparkSqlFunctionTableScanOperator.next()); - } - - @Test - @SneakyThrows - void testQuerySchema() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("select_query_response.json"))); - sparkSqlFunctionTableScanOperator.open(); - ArrayList columns = new ArrayList<>(); - columns.add(new ExecutionEngine.Schema.Column("1", "1", ExprCoreType.INTEGER)); - ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns); - assertEquals(expectedSchema, sparkSqlFunctionTableScanOperator.schema()); - } - - /** https://github.com/opensearch-project/sql/issues/2210. */ - @Test - @SneakyThrows - void issue2210() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("issue2210.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertTrue(sparkSqlFunctionTableScanOperator.hasNext()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("day")); - put("data_type", stringValue("int")); - put("comment", nullValue()); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("# Partition Information")); - put("data_type", stringValue("")); - put("comment", stringValue("")); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("# col_name")); - put("data_type", stringValue("data_type")); - put("comment", stringValue("comment")); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("day")); - put("data_type", stringValue("int")); - put("comment", nullValue()); - } - }), - sparkSqlFunctionTableScanOperator.next()); - Assertions.assertFalse(sparkSqlFunctionTableScanOperator.hasNext()); - } - - @Test - @SneakyThrows - public void issue2367MissingFields() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())) - .thenReturn( - new JSONObject( - "{\n" - + " \"data\": {\n" - + " \"result\": [\n" - + " \"{}\",\n" - + " \"{'srcPort':20641}\"\n" - + " ],\n" - + " \"schema\": [\n" - + " \"{'column_name':'srcPort','data_type':'long'}\"\n" - + " ]\n" - + " }\n" - + "}")); - sparkSqlFunctionTableScanOperator.open(); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("srcPort", ExprNullValue.of()); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("srcPort", new ExprLongValue(20641L)); - } - }), - sparkSqlFunctionTableScanOperator.next()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java deleted file mode 100644 index a828ac76c4..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.opensearch.sql.data.type.ExprCoreType.STRING; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import java.util.List; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.exception.SemanticCheckException; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.function.FunctionBuilder; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.expression.function.FunctionProperties; -import org.opensearch.sql.expression.function.FunctionSignature; -import org.opensearch.sql.expression.function.TableFunctionImplementation; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; -import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkTable; - -@ExtendWith(MockitoExtension.class) -public class SparkSqlTableFunctionResolverTest { - @Mock private SparkClient client; - - @Mock private FunctionProperties functionProperties; - - @Test - void testResolve() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = List.of(DSL.namedArgument("query", DSL.literal(QUERY))); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - FunctionBuilder functionBuilder = resolution.getValue(); - TableFunctionImplementation functionImplementation = - (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SparkSqlFunctionImplementation); - SparkTable sparkTable = (SparkTable) functionImplementation.applyArguments(); - assertNotNull(sparkTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = sparkTable.getSparkQueryRequest(); - assertEquals(QUERY, sparkQueryRequest.getSql()); - } - - @Test - void testArgumentsPassedByPosition() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = List.of(DSL.namedArgument(null, DSL.literal(QUERY))); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - FunctionBuilder functionBuilder = resolution.getValue(); - TableFunctionImplementation functionImplementation = - (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SparkSqlFunctionImplementation); - SparkTable sparkTable = (SparkTable) functionImplementation.applyArguments(); - assertNotNull(sparkTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = sparkTable.getSparkQueryRequest(); - assertEquals(QUERY, sparkQueryRequest.getSql()); - } - - @Test - void testMixedArgumentTypes() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = - List.of( - DSL.namedArgument("query", DSL.literal(QUERY)), - DSL.namedArgument(null, DSL.literal(12345))); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - SemanticCheckException exception = - assertThrows( - SemanticCheckException.class, - () -> resolution.getValue().apply(functionProperties, expressions)); - - assertEquals("Arguments should be either passed by name or position", exception.getMessage()); - } - - @Test - void testWrongArgumentsSizeWhenPassedByName() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = List.of(); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - SemanticCheckException exception = - assertThrows( - SemanticCheckException.class, - () -> resolution.getValue().apply(functionProperties, expressions)); - - assertEquals("Missing arguments:[query]", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java deleted file mode 100644 index 971db3c33c..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.spark.client.SparkClient; - -@ExtendWith(MockitoExtension.class) -public class SparkScanTest { - @Mock private SparkClient sparkClient; - - @Test - @SneakyThrows - void testQueryResponseIteratorForQueryRangeFunction() { - SparkScan sparkScan = new SparkScan(sparkClient); - sparkScan.getRequest().setSql(QUERY); - Assertions.assertFalse(sparkScan.hasNext()); - assertNull(sparkScan.next()); - } - - @Test - @SneakyThrows - void testExplain() { - SparkScan sparkScan = new SparkScan(sparkClient); - sparkScan.getRequest().setSql(QUERY); - assertEquals("SparkQueryRequest(sql=select 1)", sparkScan.explain()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java deleted file mode 100644 index 5e7ec76cdb..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Collection; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.DataSourceSchemaName; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; - -@ExtendWith(MockitoExtension.class) -public class SparkStorageEngineTest { - @Mock private SparkClient client; - - @Test - public void getFunctions() { - SparkStorageEngine engine = new SparkStorageEngine(client); - Collection functionResolverCollection = engine.getFunctions(); - assertNotNull(functionResolverCollection); - assertEquals(1, functionResolverCollection.size()); - assertTrue( - functionResolverCollection.iterator().next() instanceof SparkSqlTableFunctionResolver); - } - - @Test - public void getTable() { - SparkStorageEngine engine = new SparkStorageEngine(client); - RuntimeException exception = - assertThrows( - RuntimeException.class, - () -> engine.getTable(new DataSourceSchemaName("spark", "default"), "")); - assertEquals("Unable to get table from storage engine.", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java deleted file mode 100644 index ebe3c8f3a9..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.opensearch.sql.spark.constants.TestConstants.EMR_CLUSTER_ID; - -import java.security.InvalidParameterException; -import java.util.HashMap; -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.client.Client; -import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.storage.StorageEngine; - -@ExtendWith(MockitoExtension.class) -public class SparkStorageFactoryTest { - @Mock private Settings settings; - - @Mock private Client client; - - @Test - void testGetConnectorType() { - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - Assertions.assertEquals(DataSourceType.SPARK, sparkStorageFactory.getDataSourceType()); - } - - @Test - @SneakyThrows - void testGetStorageEngine() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "access_key"); - properties.put("emr.auth.secret_key", "secret_key"); - properties.put("emr.auth.region", "region"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - StorageEngine storageEngine = sparkStorageFactory.getStorageEngine(properties); - Assertions.assertTrue(storageEngine instanceof SparkStorageEngine); - } - - @Test - @SneakyThrows - void testInvalidConnectorType() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "random"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - InvalidParameterException exception = - Assertions.assertThrows( - InvalidParameterException.class, - () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("Spark connector type is invalid.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingAuth() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR config properties are missing.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testUnsupportedEmrAuth() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "basic"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("Invalid auth type.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingCluster() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.auth.type", "awssigv4"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR config properties are missing.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingAuthKeys() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR auth keys are missing.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingAuthSecretKey() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "test"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR auth keys are missing.", exception.getMessage()); - } - - @Test - void testCreateDataSourceSuccess() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "access_key"); - properties.put("emr.auth.secret_key", "secret_key"); - properties.put("emr.auth.region", "region"); - properties.put("spark.datasource.flint.host", "localhost"); - properties.put("spark.datasource.flint.port", "9200"); - properties.put("spark.datasource.flint.scheme", "http"); - properties.put("spark.datasource.flint.auth", "false"); - properties.put("spark.datasource.flint.region", "us-west-2"); - - DataSourceMetadata metadata = - new DataSourceMetadata.Builder() - .setName("spark") - .setConnector(DataSourceType.SPARK) - .setProperties(properties) - .build(); - - DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); - Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine); - } - - @Test - void testSetSparkJars() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("spark.sql.application", "s3://spark/spark-sql-job.jar"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "access_key"); - properties.put("emr.auth.secret_key", "secret_key"); - properties.put("emr.auth.region", "region"); - properties.put("spark.datasource.flint.integration", "s3://spark/flint-spark-integration.jar"); - - DataSourceMetadata metadata = - new DataSourceMetadata.Builder() - .setName("spark") - .setConnector(DataSourceType.SPARK) - .setProperties(properties) - .build(); - - DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); - Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java deleted file mode 100644 index a70d4ba69e..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanOperator; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.read.TableScanBuilder; - -@ExtendWith(MockitoExtension.class) -public class SparkTableTest { - @Mock private SparkClient client; - - @Test - void testUnsupportedOperation() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - SparkTable sparkTable = new SparkTable(client, sparkQueryRequest); - - assertThrows(UnsupportedOperationException.class, sparkTable::exists); - assertThrows( - UnsupportedOperationException.class, () -> sparkTable.create(Collections.emptyMap())); - } - - @Test - void testCreateScanBuilderWithSqlTableFunction() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - SparkTable sparkTable = new SparkTable(client, sparkQueryRequest); - TableScanBuilder tableScanBuilder = sparkTable.createScanBuilder(); - Assertions.assertNotNull(tableScanBuilder); - Assertions.assertTrue(tableScanBuilder instanceof SparkSqlFunctionTableScanBuilder); - } - - @Test - @SneakyThrows - void testGetFieldTypesFromSparkQueryRequest() { - SparkTable sparkTable = new SparkTable(client, new SparkQueryRequest()); - Map expectedFieldTypes = new HashMap<>(); - Map fieldTypes = sparkTable.getFieldTypes(); - - assertEquals(expectedFieldTypes, fieldTypes); - verifyNoMoreInteractions(client); - assertNotNull(sparkTable.getSparkQueryRequest()); - } - - @Test - void testImplementWithSqlFunction() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - SparkTable sparkMetricTable = new SparkTable(client, sparkQueryRequest); - PhysicalPlan plan = - sparkMetricTable.implement(new SparkSqlFunctionTableScanBuilder(client, sparkQueryRequest)); - assertTrue(plan instanceof SparkSqlFunctionTableScanOperator); - } -} diff --git a/spark/src/test/resources/all_data_type.json b/spark/src/test/resources/all_data_type.json deleted file mode 100644 index a046912319..0000000000 --- a/spark/src/test/resources/all_data_type.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "data": { - "result": [ - "{'boolean':true,'long':922337203,'integer':2147483647,'short':32767,'byte':127,'double':9223372036854.775807,'float':21474.83647,'timestamp':'2023-07-01 10:31:30','date':'2023-07-01 10:31:30','string':'ABC','char':'A'}" - ], - "schema": [ - "{'column_name':'boolean','data_type':'boolean'}", - "{'column_name':'long','data_type':'long'}", - "{'column_name':'integer','data_type':'integer'}", - "{'column_name':'short','data_type':'short'}", - "{'column_name':'byte','data_type':'byte'}", - "{'column_name':'double','data_type':'double'}", - "{'column_name':'float','data_type':'float'}", - "{'column_name':'timestamp','data_type':'timestamp'}", - "{'column_name':'date','data_type':'date'}", - "{'column_name':'string','data_type':'string'}", - "{'column_name':'char','data_type':'char'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -} diff --git a/spark/src/test/resources/issue2210.json b/spark/src/test/resources/issue2210.json deleted file mode 100644 index dec24efdc2..0000000000 --- a/spark/src/test/resources/issue2210.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "data": { - "result": [ - "{'col_name':'day','data_type':'int'}", - "{'col_name':'# Partition Information','data_type':'','comment':''}", - "{'col_name':'# col_name','data_type':'data_type','comment':'comment'}", - "{'col_name':'day','data_type':'int'}" - ], - "schema": [ - "{'column_name':'col_name','data_type':'string'}", - "{'column_name':'data_type','data_type':'string'}", - "{'column_name':'comment','data_type':'string'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -} diff --git a/spark/src/test/resources/spark_data_type.json b/spark/src/test/resources/spark_data_type.json deleted file mode 100644 index 79bd047f27..0000000000 --- a/spark/src/test/resources/spark_data_type.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "data": { - "result": [ - "{'struct_column':{'struct_value':'value'},'array_column':[1,2]}" - ], - "schema": [ - "{'column_name':'struct_column','data_type':'struct'}", - "{'column_name':'array_column','data_type':'array'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -}