From 73bd6e87e9006bceae473c9a55008f4ea428b3d5 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 29 Sep 2023 15:47:26 -0700 Subject: [PATCH] add basic auth (#49) * add basic auth support when access OpenSearch Signed-off-by: Peng Huo * address comments Signed-off-by: Peng Huo --------- Signed-off-by: Peng Huo --- docs/index.md | 12 +++++++++++- .../org/opensearch/flint/core/FlintOptions.java | 16 +++++++++++++++- .../core/storage/FlintOpenSearchClient.java | 11 +++++++++++ .../spark/sql/flint/config/FlintSparkConf.scala | 17 +++++++++++++++-- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/docs/index.md b/docs/index.md index 8f7a79a2e..bd593548c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -271,7 +271,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.host`: default is localhost. - `spark.datasource.flint.port`: default is 9200. - `spark.datasource.flint.scheme`: default is http. valid values [http, https] -- `spark.datasource.flint.auth`: default is false. valid values [false, sigv4] +- `spark.datasource.flint.auth`: default is noauth. valid values [noauth, sigv4, basic] +- `spark.datasource.flint.auth.username`: basic auth username. +- `spark.datasource.flint.auth.password`: basic auth password. - `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4 - `spark.datasource.flint.customAWSCredentialsProvider`: default is empty. - `spark.datasource.flint.write.id_name`: no default value. @@ -455,3 +457,11 @@ Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJa --conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::AccountB:role/CrossAccountRoleB \ --conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::AccountBB:role/CrossAccountRoleB ``` + +### Basic Auth +Add Basic Auth configuration in Spark configuration. Replace username and password with correct one. +``` +--conf spark.datasource.flint.auth=basic +--conf spark.datasource.flint.auth.username=username +--conf spark.datasource.flint.auth.password=password +``` diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index c9d4eb1cf..60b4bfc8c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -27,10 +27,16 @@ public class FlintOptions implements Serializable { public static final String AUTH = "auth"; - public static final String NONE_AUTH = "false"; + public static final String NONE_AUTH = "noauth"; public static final String SIGV4_AUTH = "sigv4"; + public static final String BASIC_AUTH = "basic"; + + public static final String USERNAME = "auth.username"; + + public static final String PASSWORD = "auth.password"; + public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider"; /** @@ -87,4 +93,12 @@ public String getAuth() { public String getCustomAwsCredentialsProvider() { return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, ""); } + + public String getUsername() { + return options.getOrDefault(USERNAME, "flint"); + } + + public String getPassword() { + return options.getOrDefault(PASSWORD, "flint"); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 4e3c5a76c..8ad13c881 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -17,6 +17,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; @@ -172,6 +176,13 @@ private RestHighLevelClient createClient() { restClientBuilder.setHttpClientConfigCallback(cb -> cb.addInterceptorLast(new AWSRequestSigningApacheInterceptor(signer.getServiceName(), signer, awsCredentialsProvider.get()))); + } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); + restClientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } return new RestHighLevelClient(restClientBuilder); } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index e794867a5..45d80202a 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -51,9 +51,20 @@ object FlintSparkConf { val AUTH = FlintConfig("spark.datasource.flint.auth") .datasourceOption() - .doc("authentication type. supported value: NONE_AUTH(false), SIGV4_AUTH(sigv4)") + .doc("authentication type. supported value: " + + "noauth(no auth), sigv4(sigv4 auth), basic(basic auth)") .createWithDefault(FlintOptions.NONE_AUTH) + val USERNAME = FlintConfig("spark.datasource.flint.auth.username") + .datasourceOption() + .doc("basic auth username") + .createWithDefault("flint") + + val PASSWORD = FlintConfig("spark.datasource.flint.auth.password") + .datasourceOption() + .doc("basic auth password") + .createWithDefault("flint") + val REGION = FlintConfig("spark.datasource.flint.region") .datasourceOption() .doc("AWS service region") @@ -144,7 +155,9 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable SCHEME, AUTH, REGION, - CUSTOM_AWS_CREDENTIALS_PROVIDER) + CUSTOM_AWS_CREDENTIALS_PROVIDER, + USERNAME, + PASSWORD) .map(conf => (conf.optionKey, conf.readFrom(reader))) .toMap .asJava)