Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Jul 30, 2024
1 parent 41e89e5 commit 2c53f50
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ public class FlintOptions implements Serializable {

public static final String SCHEME = "scheme";

public static final String INDEX_TYPE = "index.type";
public static final String INDEX_TYPE_AOS = "aos";
public static final String INDEX_TYPE_AOSS = "aoss";
/**
* Service name used for SigV4 signature.
* `es`: Amazon OpenSearch Service
* `aoss`: Amazon OpenSearch Serverless
*/
public static final String SERVICE_NAME = "auth.servicename";
public static final String SERVICE_NAME_ES = "es";
public static final String SERVICE_NAME_AOSS = "aoss";

public static final String AUTH = "auth";
public static final String NONE_AUTH = "noauth";
Expand Down Expand Up @@ -130,8 +135,8 @@ public String getAuth() {
return options.getOrDefault(AUTH, NONE_AUTH);
}

public String getIndexType() {
return options.getOrDefault(INDEX_TYPE, INDEX_TYPE_AOS);
public String getServiceName() {
return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES);
}

public String getCustomAwsCredentialsProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_CONTENT_SHA256;
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;

import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void process(final HttpRequest request, final HttpContext context)
signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));

if (OpenSearchClientUtils.AOSS_SIGV4_SERVICE_NAME.equals(service)) {
if (SERVICE_NAME_AOSS.equals(service)) {
enableContentBodySignature(signableRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
Expand Down Expand Up @@ -39,14 +35,6 @@ public class OpenSearchClientUtils {
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

public static final String AOS_SIGV4_SERVICE_NAME = "es";
public static final String AOSS_SIGV4_SERVICE_NAME = "aoss";

private static final Map<String, String> INDEX_TYPE_SERVICE_NAME_MAPPING = ImmutableMap.<String, String>builder()
.put(FlintOptions.INDEX_TYPE_AOS, AOS_SIGV4_SERVICE_NAME)
.put(FlintOptions.INDEX_TYPE_AOSS, AOSS_SIGV4_SERVICE_NAME)
.build();

/**
* Used in IT.
*/
Expand Down Expand Up @@ -97,24 +85,17 @@ private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClient
instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider);
}

String serviceName = getServiceNameForSigV4(options);
restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.addInterceptorLast(
new ResourceBasedAWSRequestSigningApacheInterceptor(
serviceName, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName));
options.getServiceName(), options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName));
return RetryableHttpAsyncClient.builder(delegate, options);
}
);

return restClientBuilder;
}

@VisibleForTesting
static String getServiceNameForSigV4(FlintOptions options) {
String indexType = options.getIndexType();
return Objects.requireNonNull(INDEX_TYPE_SERVICE_NAME_MAPPING.get(indexType), "Unknown index type was specified: " + indexType);
}

private static RestClientBuilder configureBasicAuth(RestClientBuilder restClientBuilder, FlintOptions options) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ object FlintSparkConf {
"noauth(no auth), sigv4(sigv4 auth), basic(basic auth)")
.createWithDefault(FlintOptions.NONE_AUTH)

val INDEX_TYPE = FlintConfig("spark.datasource.flint.index.type")
val SERVICE_NAME = FlintConfig("spark.datasource.flint.auth.servicename")
.datasourceOption()
.doc("type of index storage. supported value: " +
"aos (AWS OpenSearch Service), aoss (Amazon OpenSearch Serverless)")
.createWithDefault(FlintOptions.INDEX_TYPE_AOS)
.doc("service name used for SigV4 signature. " +
"es (AWS OpenSearch Service), aoss (Amazon OpenSearch Serverless)")
.createWithDefault(FlintOptions.SERVICE_NAME_ES)

val USERNAME = FlintConfig("spark.datasource.flint.auth.username")
.datasourceOption()
Expand Down Expand Up @@ -273,7 +273,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
RETRYABLE_HTTP_STATUS_CODES,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
INDEX_TYPE,
SERVICE_NAME,
USERNAME,
PASSWORD,
SOCKET_TIMEOUT_MILLIS,
Expand Down

0 comments on commit 2c53f50

Please sign in to comment.