Skip to content

Commit

Permalink
add basic auth support when access OpenSearch
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Sep 27, 2023
1 parent 0305481 commit d6ce91e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 3 deletions.
12 changes: 11 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.host`: default is localhost.
- `spark.datasource.flint.port`: default is 9200.
- `spark.datasource.flint.scheme`: default is http. valid values [http, https]
- `spark.datasource.flint.auth`: default is false. valid values [false, sigv4]
- `spark.datasource.flint.auth`: default is false. valid values [false, sigv4, basic]
- `spark.datasource.flint.auth.username`: basic auth username.
- `spark.datasource.flint.auth.password`: basic auth password.
- `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.write.id_name`: no default value.
Expand Down Expand Up @@ -455,3 +457,11 @@ Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJa
--conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::AccountB:role/CrossAccountRoleB \
--conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::AccountBB:role/CrossAccountRoleB
```

### Basic Auth
Add Basic Auth configuration in Spark configuration. Replace username and password with correct one.
```
--conf spark.datasource.flint.auth=basic
--conf spark.datasource.flint.auth.username=username
--conf spark.datasource.flint.auth.password=password
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public class FlintOptions implements Serializable {

public static final String SIGV4_AUTH = "sigv4";

public static final String BASIC_AUTH = "basic";

public static final String USERNAME = "auth.username";

public static final String PASSWORD = "auth.password";

public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider";

/**
Expand Down Expand Up @@ -87,4 +93,12 @@ public String getAuth() {
public String getCustomAwsCredentialsProvider() {
return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, "");
}

public String getUsername() {
return options.getOrDefault(USERNAME, "flint");
}

public String getPassword() {
return options.getOrDefault(PASSWORD, "flint");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
Expand Down Expand Up @@ -172,6 +176,13 @@ private RestHighLevelClient createClient() {
restClientBuilder.setHttpClientConfigCallback(cb ->
cb.addInterceptorLast(new AWSRequestSigningApacheInterceptor(signer.getServiceName(),
signer, awsCredentialsProvider.get())));
} else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(options.getUsername(), options.getPassword()));
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return new RestHighLevelClient(restClientBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,19 @@ object FlintSparkConf {

val AUTH = FlintConfig("spark.datasource.flint.auth")
.datasourceOption()
.doc("authentication type. supported value: NONE_AUTH(false), SIGV4_AUTH(sigv4)")
.doc("authentication type. supported value: NONE_AUTH(false), SIGV4_AUTH(sigv4), BASIC_AUTH")
.createWithDefault(FlintOptions.NONE_AUTH)

val USERNAME = FlintConfig("spark.datasource.flint.auth.username")
.datasourceOption()
.doc("basic auth username")
.createWithDefault("flint")

val PASSWORD = FlintConfig("spark.datasource.flint.auth.password")
.datasourceOption()
.doc("basic auth password")
.createWithDefault("flint")

val REGION = FlintConfig("spark.datasource.flint.region")
.datasourceOption()
.doc("AWS service region")
Expand Down Expand Up @@ -144,7 +154,9 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
SCHEME,
AUTH,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER)
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap
.asJava)
Expand Down

0 comments on commit d6ce91e

Please sign in to comment.