diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index a089e4088..602e90917 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -98,5 +98,5 @@ OptimisticTransaction startTransaction(String indexName, String dataSourc * Create {@link IRestHighLevelClient}. * @return {@link IRestHighLevelClient} */ - public IRestHighLevelClient createClient(); + IRestHighLevelClient createClient(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 1282e1c94..521e52788 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -46,11 +46,15 @@ public class FlintOptions implements Serializable { public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider"; + public static final String SUPER_ADMIN_CREDENTIALS_PROVIDER = "superAdminAWSCredentialsProvider"; + /** * By default, customAWSCredentialsProvider is empty. use DefaultAWSCredentialsProviderChain. */ public static final String DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER = ""; + public static final String SYSTEM_INDEX= "spark.flint.job.requestIndex"; + /** * Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader} */ @@ -121,7 +125,11 @@ public String getAuth() { } public String getCustomAwsCredentialsProvider() { - return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, ""); + return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER); + } + + public String getSuperAdminAwsCredentialsProvider() { + return options.getOrDefault(SUPER_ADMIN_CREDENTIALS_PROVIDER, DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER); } public String getUsername() { @@ -139,4 +147,8 @@ public int getSocketTimeoutMillis() { public String getDataSourceName() { return options.getOrDefault(DATA_SOURCE_NAME, ""); } + + public String getSystemIndex() { + return options.get(SYSTEM_INDEX); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java index c11677c3f..4776e616c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/auth/AWSRequestSigningApacheInterceptor.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.logging.Logger; + import org.apache.http.Header; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpException; @@ -32,10 +34,12 @@ /** * From https://github.com/opensearch-project/sql-jdbc/blob/main/src/main/java/org/opensearch/jdbc/transport/http/auth/aws/AWSRequestSigningApacheInterceptor.java - * An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer} + * An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer} for SIGV4_AUTH * and {@link AWSCredentialsProvider}. */ public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor { + private static final Logger LOG = Logger.getLogger(AWSRequestSigningApacheInterceptor.class.getName()); + /** * The service that we're connecting to. Technically not necessary. * Could be used by a future Signer, though. @@ -47,11 +51,15 @@ public class AWSRequestSigningApacheInterceptor implements HttpRequestIntercepto */ private final Signer signer; + private final String systemIndexName; + /** * The source of AWS credentials for signing. */ private final AWSCredentialsProvider awsCredentialsProvider; + private final AWSCredentialsProvider superAdminAWSCredentialsProvider; + /** * * @param service service that we're connecting to @@ -59,11 +67,15 @@ public class AWSRequestSigningApacheInterceptor implements HttpRequestIntercepto * @param awsCredentialsProvider source of AWS credentials for signing */ public AWSRequestSigningApacheInterceptor(final String service, - final Signer signer, - final AWSCredentialsProvider awsCredentialsProvider) { + final Signer signer, + final String systemIndexName, + final AWSCredentialsProvider awsCredentialsProvider, + final AWSCredentialsProvider superAdminAWSCredentialsProvider) { this.service = service; this.signer = signer; + this.systemIndexName = systemIndexName; this.awsCredentialsProvider = awsCredentialsProvider; + this.superAdminAWSCredentialsProvider = superAdminAWSCredentialsProvider; } /** @@ -106,7 +118,13 @@ public void process(final HttpRequest request, final HttpContext context) signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders())); // Sign it - signer.sign(signableRequest, awsCredentialsProvider.getCredentials()); + if (this.service.equals("es") && isSystemIndexRequest(signableRequest.getResourcePath())) { + LOG.info(String.format("POC - sign - use superAdminAWSCredentialsProvider")); + signer.sign(signableRequest, superAdminAWSCredentialsProvider.getCredentials()); + } else { + LOG.info(String.format("POC - sign - use awsCredentialsProvider")); + signer.sign(signableRequest, awsCredentialsProvider.getCredentials()); + } // Now copy everything back request.setHeaders(mapToHeaderArray(signableRequest.getHeaders())); @@ -136,6 +154,10 @@ private static Map> nvpToMapParams(final List awsCredentialsProvider = - new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); + new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); String providerClass = options.getCustomAwsCredentialsProvider(); if (!Strings.isNullOrEmpty(providerClass)) { try { @@ -261,11 +260,26 @@ public IRestHighLevelClient createClient() { throw new RuntimeException(e); } } + + final AtomicReference superAdminAWSCredentialsProvider = + new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); + String superAdminproviderClass = options.getSuperAdminAwsCredentialsProvider(); + if (!Strings.isNullOrEmpty(superAdminproviderClass)) { + try { + Class superAdminAWSCredentialsProviderClass = Class.forName(superAdminproviderClass); + Constructor ctor = superAdminAWSCredentialsProviderClass.getDeclaredConstructor(); + ctor.setAccessible(true); + superAdminAWSCredentialsProvider.set((AWSCredentialsProvider) ctor.newInstance()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + restClientBuilder.setHttpClientConfigCallback(builder -> { HttpAsyncClientBuilder delegate = builder.addInterceptorLast( new AWSRequestSigningApacheInterceptor( - signer.getServiceName(), signer, awsCredentialsProvider.get())); + signer.getServiceName(), signer, options.getSystemIndex(), awsCredentialsProvider.get(), superAdminAWSCredentialsProvider.get())); return RetryableHttpAsyncClient.builder(delegate, options); } ); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index fbbea9176..c2909ad93 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -77,6 +77,12 @@ object FlintSparkConf { .doc("AWS customAWSCredentialsProvider") .createWithDefault(FlintOptions.DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER) + val SUPER_ADMIN_CREDENTIALS_PROVIDER = + FlintConfig("spark.datasource.flint.superAdminAWSCredentialsProvider") + .datasourceOption() + .doc("AWS customAWSCredentialsProvider for super admin permission") + .createWithDefault(FlintOptions.DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER) + val DOC_ID_COLUMN_NAME = FlintConfig("spark.datasource.flint.write.id_name") .datasourceOption() .doc(