Skip to content

Commit

Permalink
POC super admin auth provider
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Mar 14, 2024
1 parent 7b4614a commit 49a88fa
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,5 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
* Create {@link IRestHighLevelClient}.
* @return {@link IRestHighLevelClient}
*/
public IRestHighLevelClient createClient();
IRestHighLevelClient createClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public class FlintOptions implements Serializable {

public static final String CUSTOM_AWS_CREDENTIALS_PROVIDER = "customAWSCredentialsProvider";

public static final String SUPER_ADMIN_CREDENTIALS_PROVIDER = "superAdminAWSCredentialsProvider";

/**
* By default, customAWSCredentialsProvider is empty. use DefaultAWSCredentialsProviderChain.
*/
public static final String DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER = "";

public static final String SYSTEM_INDEX= "spark.flint.job.requestIndex";

/**
* Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader}
*/
Expand Down Expand Up @@ -119,7 +123,11 @@ public String getAuth() {
}

public String getCustomAwsCredentialsProvider() {
return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, "");
return options.getOrDefault(CUSTOM_AWS_CREDENTIALS_PROVIDER, DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER);
}

public String getSuperAdminAwsCredentialsProvider() {
return options.getOrDefault(SUPER_ADMIN_CREDENTIALS_PROVIDER, DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER);
}

public String getUsername() {
Expand All @@ -133,4 +141,8 @@ public String getPassword() {
public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}

public String getSystemIndex() {
return options.get(SYSTEM_INDEX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;

import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
Expand All @@ -32,10 +34,12 @@

/**
* From https://github.com/opensearch-project/sql-jdbc/blob/main/src/main/java/org/opensearch/jdbc/transport/http/auth/aws/AWSRequestSigningApacheInterceptor.java
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer} for SIGV4_AUTH
* and {@link AWSCredentialsProvider}.
*/
public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor {
private static final Logger LOG = Logger.getLogger(AWSRequestSigningApacheInterceptor.class.getName());

/**
* The service that we're connecting to. Technically not necessary.
* Could be used by a future Signer, though.
Expand All @@ -47,23 +51,31 @@ public class AWSRequestSigningApacheInterceptor implements HttpRequestIntercepto
*/
private final Signer signer;

private final String systemIndexName;

/**
* The source of AWS credentials for signing.
*/
private final AWSCredentialsProvider awsCredentialsProvider;

private final AWSCredentialsProvider superAdminAWSCredentialsProvider;

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
*/
public AWSRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AWSCredentialsProvider awsCredentialsProvider) {
final Signer signer,
final String systemIndexName,
final AWSCredentialsProvider awsCredentialsProvider,
final AWSCredentialsProvider superAdminAWSCredentialsProvider) {
this.service = service;
this.signer = signer;
this.systemIndexName = systemIndexName;
this.awsCredentialsProvider = awsCredentialsProvider;
this.superAdminAWSCredentialsProvider = superAdminAWSCredentialsProvider;
}

/**
Expand Down Expand Up @@ -106,7 +118,11 @@ public void process(final HttpRequest request, final HttpContext context)
signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));

// Sign it
signer.sign(signableRequest, awsCredentialsProvider.getCredentials());
if (this.service.equals("es") && isSystemIndexRequest(signableRequest.getResourcePath())) {
signer.sign(signableRequest, superAdminAWSCredentialsProvider.getCredentials());
} else {
signer.sign(signableRequest, awsCredentialsProvider.getCredentials());
}

// Now copy everything back
request.setHeaders(mapToHeaderArray(signableRequest.getHeaders()));
Expand Down Expand Up @@ -136,6 +152,12 @@ private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair
return parameterMap;
}

private boolean isSystemIndexRequest(String resourcePath) {
LOG.info(String.format("clingzhi - resourcePath: %s", resourcePath));
LOG.info(String.format("clingzhi - systemIndexName: %s", systemIndexName));
return resourcePath.contains(systemIndexName);
}

/**
* @param headers modeled Header objects
* @return a Map of header entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ public IRestHighLevelClient createClient() {
AWS4Signer signer = new AWS4Signer();
signer.setServiceName("es");
signer.setRegionName(options.getRegion());

// Use DefaultAWSCredentialsProviderChain by default.
final AtomicReference<AWSCredentialsProvider> awsCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
String providerClass = options.getCustomAwsCredentialsProvider();
if (!Strings.isNullOrEmpty(providerClass)) {
try {
Expand All @@ -259,11 +258,26 @@ public IRestHighLevelClient createClient() {
throw new RuntimeException(e);
}
}

final AtomicReference<AWSCredentialsProvider> superAdminAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
String superAdminproviderClass = options.getSuperAdminAwsCredentialsProvider();
if (!Strings.isNullOrEmpty(superAdminproviderClass)) {
try {
Class<?> superAdminAWSCredentialsProviderClass = Class.forName(superAdminproviderClass);
Constructor<?> ctor = superAdminAWSCredentialsProviderClass.getDeclaredConstructor();
ctor.setAccessible(true);
superAdminAWSCredentialsProvider.set((AWSCredentialsProvider) ctor.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate =
builder.addInterceptorLast(
new AWSRequestSigningApacheInterceptor(
signer.getServiceName(), signer, awsCredentialsProvider.get()));
signer.getServiceName(), signer, options.getSystemIndex(), awsCredentialsProvider.get(), superAdminAWSCredentialsProvider.get()));
return RetryableHttpAsyncClient.builder(delegate, options);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ object FlintSparkConf {
.doc("AWS customAWSCredentialsProvider")
.createWithDefault(FlintOptions.DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER)

val SUPER_ADMIN_CREDENTIALS_PROVIDER =
FlintConfig("spark.datasource.flint.superAdminAWSCredentialsProvider")
.datasourceOption()
.doc("AWS customAWSCredentialsProvider for super admin permission")
.createWithDefault(FlintOptions.DEFAULT_CUSTOM_AWS_CREDENTIALS_PROVIDER)

val DOC_ID_COLUMN_NAME = FlintConfig("spark.datasource.flint.write.id_name")
.datasourceOption()
.doc(
Expand Down

0 comments on commit 49a88fa

Please sign in to comment.