diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 8a0cd6d2b..9be01737c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -33,9 +33,14 @@ public class FlintOptions implements Serializable { public static final String SCHEME = "scheme"; - public static final String INDEX_TYPE = "index.type"; - public static final String INDEX_TYPE_AOS = "aos"; - public static final String INDEX_TYPE_AOSS = "aoss"; + /** + * Service name used for SigV4 signature. + * `es`: Amazon OpenSearch Service + * `aoss`: Amazon OpenSearch Serverless + */ + public static final String SERVICE_NAME = "auth.servicename"; + public static final String SERVICE_NAME_ES = "es"; + public static final String SERVICE_NAME_AOSS = "aoss"; public static final String AUTH = "auth"; public static final String NONE_AUTH = "noauth"; @@ -130,8 +135,8 @@ public String getAuth() { return options.getOrDefault(AUTH, NONE_AUTH); } - public String getIndexType() { - return options.getOrDefault(INDEX_TYPE, INDEX_TYPE_AOS); + public String getServiceName() { + return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES); } public String getCustomAwsCredentialsProvider() { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java index 97da8354b..172ac5ceb 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java @@ -7,6 +7,7 @@ import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_CONTENT_SHA256; import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS; import com.amazonaws.DefaultRequest; import com.amazonaws.auth.AWSCredentialsProvider; @@ -108,7 +109,7 @@ public void process(final HttpRequest request, final HttpContext context) signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams())); signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders())); - if (OpenSearchClientUtils.AOSS_SIGV4_SERVICE_NAME.equals(service)) { + if (SERVICE_NAME_AOSS.equals(service)) { enableContentBodySignature(signableRequest); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index 488bd1347..21241d7ab 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -7,11 +7,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import java.lang.reflect.Constructor; -import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -39,14 +35,6 @@ public class OpenSearchClientUtils { */ public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; - public static final String AOS_SIGV4_SERVICE_NAME = "es"; - public static final String AOSS_SIGV4_SERVICE_NAME = "aoss"; - - private static final Map INDEX_TYPE_SERVICE_NAME_MAPPING = ImmutableMap.builder() - .put(FlintOptions.INDEX_TYPE_AOS, AOS_SIGV4_SERVICE_NAME) - .put(FlintOptions.INDEX_TYPE_AOSS, AOSS_SIGV4_SERVICE_NAME) - .build(); - /** * Used in IT. */ @@ -97,11 +85,10 @@ private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClient instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider); } - String serviceName = getServiceNameForSigV4(options); restClientBuilder.setHttpClientConfigCallback(builder -> { HttpAsyncClientBuilder delegate = builder.addInterceptorLast( new ResourceBasedAWSRequestSigningApacheInterceptor( - serviceName, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName)); + options.getServiceName(), options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName)); return RetryableHttpAsyncClient.builder(delegate, options); } ); @@ -109,12 +96,6 @@ private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClient return restClientBuilder; } - @VisibleForTesting - static String getServiceNameForSigV4(FlintOptions options) { - String indexType = options.getIndexType(); - return Objects.requireNonNull(INDEX_TYPE_SERVICE_NAME_MAPPING.get(indexType), "Unknown index type was specified: " + indexType); - } - private static RestClientBuilder configureBasicAuth(RestClientBuilder restClientBuilder, FlintOptions options) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchClientUtilsTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchClientUtilsTest.java deleted file mode 100644 index ac484f648..000000000 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchClientUtilsTest.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.opensearch.flint.core.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.Test; -import org.opensearch.flint.core.FlintOptions; - -class OpenSearchClientUtilsTest { - - @Test - public void testGetServiceNameForSigV4() { - assertEquals("es", - OpenSearchClientUtils.getServiceNameForSigV4(getFlintOptions(FlintOptions.INDEX_TYPE_AOS))); - - assertEquals("aoss", OpenSearchClientUtils.getServiceNameForSigV4( - getFlintOptions(FlintOptions.INDEX_TYPE_AOSS))); - - assertThrows(NullPointerException.class, () -> OpenSearchClientUtils.getServiceNameForSigV4( - getFlintOptions("INVALID_INDEX_TYPE"))); - } - - private FlintOptions getFlintOptions(String indexType) { - return new FlintOptions(ImmutableMap.of(FlintOptions.INDEX_TYPE, indexType)); - } -} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index f74748e65..7ea284959 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -57,11 +57,11 @@ object FlintSparkConf { "noauth(no auth), sigv4(sigv4 auth), basic(basic auth)") .createWithDefault(FlintOptions.NONE_AUTH) - val INDEX_TYPE = FlintConfig("spark.datasource.flint.index.type") + val SERVICE_NAME = FlintConfig("spark.datasource.flint.auth.servicename") .datasourceOption() - .doc("type of index storage. supported value: " + - "aos (AWS OpenSearch Service), aoss (Amazon OpenSearch Serverless)") - .createWithDefault(FlintOptions.INDEX_TYPE_AOS) + .doc("service name used for SigV4 signature. " + + "es (AWS OpenSearch Service), aoss (Amazon OpenSearch Serverless)") + .createWithDefault(FlintOptions.SERVICE_NAME_ES) val USERNAME = FlintConfig("spark.datasource.flint.auth.username") .datasourceOption() @@ -273,7 +273,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable RETRYABLE_HTTP_STATUS_CODES, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, - INDEX_TYPE, + SERVICE_NAME, USERNAME, PASSWORD, SOCKET_TIMEOUT_MILLIS,