diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 845a5d77a..9858ffd1e 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -45,10 +45,10 @@ public class FlintOptions implements Serializable { public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider"; - public static final String SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER = "superAdminAWSCredentialsProvider"; + public static final String METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER = "spark.metadata.accessAWSCredentialsProvider"; /** - * By default, customAWSCredentialsProvider and superAdminAWSCredentialsProvider are empty. use DefaultAWSCredentialsProviderChain. + * By default, customAWSCredentialsProvider and accessAWSCredentialsProvider are empty. use DefaultAWSCredentialsProviderChain. */ public static final String DEFAULT_AWS_CREDENTIALS_PROVIDER = ""; @@ -127,8 +127,8 @@ public String getCustomAwsCredentialsProvider() { return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER); } - public String getSuperAdminAwsCredentialsProvider() { - return options.getOrDefault(SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER); + public String getMetadataAccessAwsCredentialsProvider() { + return options.getOrDefault(METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER); } public String getUsername() { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java index 001c909e6..14e2ad924 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java @@ -57,36 +57,38 @@ public class AWSRequestSigningApacheInterceptor implements HttpRequestIntercepto /** * Provides a source of AWS credentials that are used for signing requests requiring elevated permissions. - * This is particularly useful for accessing resources that are restricted to super-administrative operations, + * This is particularly useful for accessing resources that are restricted to metadata operations, * such as certain system indices or administrative APIs. These credentials are expected to have permissions * beyond those of the regular {@link #primaryCredentialsProvider}. */ - private final AWSCredentialsProvider superAdminAWSCredentialsProvider; + private final AWSCredentialsProvider metadataAccessAWSCredentialsProvider; /** - * Identifies data access operations that require super-admin credentials. This identifier can be used to + * Identifies operations that require metadata access credentials. This identifier can be used to * distinguish between regular and elevated data access needs, facilitating the decision to use - * {@link #superAdminAWSCredentialsProvider} over {@link #primaryCredentialsProvider} when accessing sensitive + * {@link #metadataAccessAWSCredentialsProvider} over {@link #primaryCredentialsProvider} when accessing sensitive * or restricted resources. */ - private final String superAdminDataAccessIdentifier; + private final String metadataAccessIdentifier; /** * * @param service service that we're connecting to * @param signer particular signer implementation * @param primaryCredentialsProvider source of AWS credentials for signing + * @param metadataAccessAWSCredentialsProvider source of AWS credentials for metadata access + * @param metadataAccessIdentifier identifier for metadata access */ public AWSRequestSigningApacheInterceptor(final String service, final Signer signer, final AWSCredentialsProvider primaryCredentialsProvider, - final AWSCredentialsProvider superAdminAWSCredentialsProvider, - final String superAdminDataAccessIdentifier) { - this.service = service; + final AWSCredentialsProvider metadataAccessAWSCredentialsProvider, + final String metadataAccessIdentifier) { + this.service = service == null ? "unknown" : service; this.signer = signer; this.primaryCredentialsProvider = primaryCredentialsProvider; - this.superAdminAWSCredentialsProvider = superAdminAWSCredentialsProvider; - this.superAdminDataAccessIdentifier = superAdminDataAccessIdentifier; + this.metadataAccessAWSCredentialsProvider = metadataAccessAWSCredentialsProvider; + this.metadataAccessIdentifier = metadataAccessIdentifier; } /** @@ -129,8 +131,8 @@ public void process(final HttpRequest request, final HttpContext context) signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders())); // Sign it - if (this.service.equals("es") && isSuperAdminDataAccess(signableRequest.getResourcePath())) { - signer.sign(signableRequest, superAdminAWSCredentialsProvider.getCredentials()); + if (this.service.equals("es") && isMetadataAccess(signableRequest.getResourcePath())) { + signer.sign(signableRequest, metadataAccessAWSCredentialsProvider.getCredentials()); } else { signer.sign(signableRequest, primaryCredentialsProvider.getCredentials()); } @@ -165,11 +167,11 @@ private static Map> nvpToMapParams(final List superAdminAWSCredentialsProvider = + // Set metadataAccessAWSCredentialsProvider to customAWSCredentialsProvider by default for backwards compatibility + // unless a specific metadata access provider class name is provided + String metadataAccessProviderClass = options.getMetadataAccessAwsCredentialsProvider(); + final AtomicReference metadataAccessAWSCredentialsProvider = new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); - if (Strings.isNullOrEmpty(superAdminProviderClass)) { - superAdminAWSCredentialsProvider.set(customAWSCredentialsProvider.get()); + if (Strings.isNullOrEmpty(metadataAccessProviderClass)) { + metadataAccessAWSCredentialsProvider.set(customAWSCredentialsProvider.get()); } else { - instantiateProvider(superAdminProviderClass, superAdminAWSCredentialsProvider); + instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider); } restClientBuilder.setHttpClientConfigCallback(builder -> { HttpAsyncClientBuilder delegate = builder.addInterceptorLast( new AWSRequestSigningApacheInterceptor( - signer.getServiceName(), signer, customAWSCredentialsProvider.get(), superAdminAWSCredentialsProvider.get(), options.getSystemIndexName())); + signer.getServiceName(), signer, customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), options.getSystemIndexName())); return RetryableHttpAsyncClient.builder(delegate, options); } ); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 6f9e2a94d..eb3a29adc 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -77,12 +77,6 @@ object FlintSparkConf { .doc("AWS customAWSCredentialsProvider") .createWithDefault(FlintOptions.DEFAULT_AWS_CREDENTIALS_PROVIDER) - val SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER = - FlintConfig("spark.datasource.flint.superAdminAWSCredentialsProvider") - .datasourceOption() - .doc("AWS credentials provider for super admin permission") - .createWithDefault(FlintOptions.DEFAULT_AWS_CREDENTIALS_PROVIDER) - val DOC_ID_COLUMN_NAME = FlintConfig("spark.datasource.flint.write.id_name") .datasourceOption() .doc( @@ -180,6 +174,10 @@ object FlintSparkConf { FlintConfig(s"spark.flint.job.inactivityLimitMillis") .doc("inactivity timeout") .createWithDefault(String.valueOf(FlintOptions.DEFAULT_INACTIVITY_LIMIT_MILLIS)) + val METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER = + FlintConfig("spark.metadata.accessAWSCredentialsProvider") + .doc("AWS credentials provider for metadata access permission") + .createOptional() } /** @@ -227,7 +225,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable RETRYABLE_HTTP_STATUS_CODES, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, - SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER, USERNAME, PASSWORD, SOCKET_TIMEOUT_MILLIS, @@ -241,6 +238,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable DATA_SOURCE_NAME, SESSION_ID, REQUEST_INDEX, + METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, EXCLUDE_JOB_IDS) .map(conf => (conf.optionKey, conf.readFrom(reader))) .flatMap { diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 9dfcf4482..3d643dde3 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -63,15 +63,15 @@ class FlintSparkConfSuite extends FlintSuite { retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException" } - test("test super admin AWS credentials provider option") { - withSparkConf("spark.datasource.flint.superAdminAWSCredentialsProvider") { + test("test metadata access AWS credentials provider option") { + withSparkConf("spark.metadata.accessAWSCredentialsProvider") { spark.conf.set( - "spark.datasource.flint.superAdminAWSCredentialsProvider", - "com.example.superAdminAWSCredentialsProvider") + "spark.metadata.accessAWSCredentialsProvider", + "com.example.MetadataAccessCredentialsProvider") val flintOptions = FlintSparkConf().flintOptions() assert(flintOptions.getCustomAwsCredentialsProvider == "") assert( - flintOptions.getSuperAdminAwsCredentialsProvider == "com.example.superAdminAWSCredentialsProvider") + flintOptions.getMetadataAccessAwsCredentialsProvider == "com.example.MetadataAccessCredentialsProvider") } }