Skip to content

Commit

Permalink
Rename vars
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Mar 29, 2024
1 parent ecdfa33 commit edcf989
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class FlintOptions implements Serializable {

public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider";

public static final String SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER = "superAdminAWSCredentialsProvider";
public static final String METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER = "spark.metadata.accessAWSCredentialsProvider";

/**
* By default, customAWSCredentialsProvider and superAdminAWSCredentialsProvider are empty. use DefaultAWSCredentialsProviderChain.
* By default, customAWSCredentialsProvider and accessAWSCredentialsProvider are empty. use DefaultAWSCredentialsProviderChain.
*/
public static final String DEFAULT_AWS_CREDENTIALS_PROVIDER = "";

Expand Down Expand Up @@ -127,8 +127,8 @@ public String getCustomAwsCredentialsProvider() {
return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER);
}

public String getSuperAdminAwsCredentialsProvider() {
return options.getOrDefault(SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER);
public String getMetadataAccessAwsCredentialsProvider() {
return options.getOrDefault(METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, DEFAULT_AWS_CREDENTIALS_PROVIDER);
}

public String getUsername() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ public class AWSRequestSigningApacheInterceptor implements HttpRequestIntercepto

/**
* Provides a source of AWS credentials that are used for signing requests requiring elevated permissions.
* This is particularly useful for accessing resources that are restricted to super-administrative operations,
* This is particularly useful for accessing resources that are restricted to metadata operations,
* such as certain system indices or administrative APIs. These credentials are expected to have permissions
* beyond those of the regular {@link #primaryCredentialsProvider}.
*/
private final AWSCredentialsProvider superAdminAWSCredentialsProvider;
private final AWSCredentialsProvider metadataAccessAWSCredentialsProvider;

/**
* Identifies data access operations that require super-admin credentials. This identifier can be used to
* Identifies operations that require metadata access credentials. This identifier can be used to
* distinguish between regular and elevated data access needs, facilitating the decision to use
* {@link #superAdminAWSCredentialsProvider} over {@link #primaryCredentialsProvider} when accessing sensitive
* {@link #metadataAccessAWSCredentialsProvider} over {@link #primaryCredentialsProvider} when accessing sensitive
* or restricted resources.
*/
private final String superAdminDataAccessIdentifier;
private final String metadataAccessIdentifier;

/**
*
Expand All @@ -80,13 +80,13 @@ public class AWSRequestSigningApacheInterceptor implements HttpRequestIntercepto
public AWSRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AWSCredentialsProvider primaryCredentialsProvider,
final AWSCredentialsProvider superAdminAWSCredentialsProvider,
final String superAdminDataAccessIdentifier) {
this.service = service;
final AWSCredentialsProvider metadataAccessAWSCredentialsProvider,
final String metadataAccessIdentifier) {
this.service = service == null ? "unknown" : service;
this.signer = signer;
this.primaryCredentialsProvider = primaryCredentialsProvider;
this.superAdminAWSCredentialsProvider = superAdminAWSCredentialsProvider;
this.superAdminDataAccessIdentifier = superAdminDataAccessIdentifier;
this.metadataAccessAWSCredentialsProvider = metadataAccessAWSCredentialsProvider;
this.metadataAccessIdentifier = metadataAccessIdentifier;
}

/**
Expand Down Expand Up @@ -129,8 +129,8 @@ public void process(final HttpRequest request, final HttpContext context)
signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));

// Sign it
if (this.service.equals("es") && isSuperAdminDataAccess(signableRequest.getResourcePath())) {
signer.sign(signableRequest, superAdminAWSCredentialsProvider.getCredentials());
if (this.service.equals("es") && isMetadataAccess(signableRequest.getResourcePath())) {
signer.sign(signableRequest, metadataAccessAWSCredentialsProvider.getCredentials());
} else {
signer.sign(signableRequest, primaryCredentialsProvider.getCredentials());
}
Expand Down Expand Up @@ -165,11 +165,11 @@ private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair

/**
* @param resourcePath The path of the resource being accessed.
* @return true if the resource path contains the super-admin data access identifier, indicating that
* the operation requires super-admin credentials; false otherwise.
* @return true if the resource path contains the metadata access identifier, indicating that
* the operation requires metadata access credentials; false otherwise.
*/
private boolean isSuperAdminDataAccess(String resourcePath) {
return resourcePath.contains(superAdminDataAccessIdentifier);
private boolean isMetadataAccess(String resourcePath) {
return resourcePath.contains(metadataAccessIdentifier);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,21 +269,21 @@ public IRestHighLevelClient createClient() {
instantiateProvider(customProviderClass, customAWSCredentialsProvider);
}

// Use the customAWSCredentialsProvider for superAdminAWSCredentialsProvider by default,
// unless a separate superAdmin provider class name is specified
String superAdminProviderClass = options.getSuperAdminAwsCredentialsProvider();
final AtomicReference<AWSCredentialsProvider> superAdminAWSCredentialsProvider =
// Set metadataAccessAWSCredentialsProvider to customAWSCredentialsProvider by default for backwards compatibility
// unless a specific metadata access provider class name is provided
String metadataAccessProviderClass = options.getMetadataAccessAwsCredentialsProvider();
final AtomicReference<AWSCredentialsProvider> metadataAccessAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
if (Strings.isNullOrEmpty(superAdminProviderClass)) {
superAdminAWSCredentialsProvider.set(customAWSCredentialsProvider.get());
if (Strings.isNullOrEmpty(metadataAccessProviderClass)) {
metadataAccessAWSCredentialsProvider.set(customAWSCredentialsProvider.get());
} else {
instantiateProvider(superAdminProviderClass, superAdminAWSCredentialsProvider);
instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider);
}

restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.addInterceptorLast(
new AWSRequestSigningApacheInterceptor(
signer.getServiceName(), signer, customAWSCredentialsProvider.get(), superAdminAWSCredentialsProvider.get(), options.getSystemIndexName()));
signer.getServiceName(), signer, customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), options.getSystemIndexName()));
return RetryableHttpAsyncClient.builder(delegate, options);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ object FlintSparkConf {
.doc("AWS customAWSCredentialsProvider")
.createWithDefault(FlintOptions.DEFAULT_AWS_CREDENTIALS_PROVIDER)

val SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER =
FlintConfig("spark.datasource.flint.superAdminAWSCredentialsProvider")
.datasourceOption()
.doc("AWS credentials provider for super admin permission")
.createWithDefault(FlintOptions.DEFAULT_AWS_CREDENTIALS_PROVIDER)

val DOC_ID_COLUMN_NAME = FlintConfig("spark.datasource.flint.write.id_name")
.datasourceOption()
.doc(
Expand Down Expand Up @@ -180,6 +174,10 @@ object FlintSparkConf {
FlintConfig(s"spark.flint.job.inactivityLimitMillis")
.doc("inactivity timeout")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_INACTIVITY_LIMIT_MILLIS))
val METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER =
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for super admin permission")
.createOptional()
}

/**
Expand Down Expand Up @@ -227,7 +225,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
RETRYABLE_HTTP_STATUS_CODES,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
SUPER_ADMIN_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD,
SOCKET_TIMEOUT_MILLIS,
Expand All @@ -241,6 +238,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
DATA_SOURCE_NAME,
SESSION_ID,
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
EXCLUDE_JOB_IDS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ class FlintSparkConfSuite extends FlintSuite {
retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException"
}

test("test super admin AWS credentials provider option") {
withSparkConf("spark.datasource.flint.superAdminAWSCredentialsProvider") {
test("test metadata access AWS credentials provider option") {
withSparkConf("spark.metadata.accessAWSCredentialsProvider") {
spark.conf.set(
"spark.datasource.flint.superAdminAWSCredentialsProvider",
"com.example.superAdminAWSCredentialsProvider")
"spark.metadata.accessAWSCredentialsProvider",
"com.example.MetadataAccessCredentialsProvider")
val flintOptions = FlintSparkConf().flintOptions()
assert(flintOptions.getCustomAwsCredentialsProvider == "")
assert(
flintOptions.getSuperAdminAwsCredentialsProvider == "com.example.superAdminAWSCredentialsProvider")
flintOptions.getMetadataAccessAwsCredentialsProvider == "com.example.MetadataAccessCredentialsProvider")
}
}

Expand Down

0 comments on commit edcf989

Please sign in to comment.