Skip to content

Commit

Permalink
Refactor static method for OpenSearch client utils (#377)
Browse files Browse the repository at this point in the history
* static factory method for create OS client

Signed-off-by: Sean Kao <[email protected]>

* rm flint os metadata log dependency on FlintClient

Signed-off-by: Sean Kao <[email protected]>

* extract restClientBuilder configuration to methods

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Jun 11, 2024
1 parent 76a9c57 commit 385a344
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,18 @@

import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
Expand All @@ -45,13 +32,10 @@
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.auth.ResourceBasedAWSRequestSigningApacheInterceptor;
import org.opensearch.flint.core.http.RetryableHttpAsyncClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand All @@ -67,8 +51,6 @@ public class FlintOpenSearchClient implements FlintClient {

private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName());

private static final String SERVICE_NAME = "es";


/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string.
Expand Down Expand Up @@ -117,7 +99,7 @@ public <T> OptimisticTransaction<T> startTransaction(String indexName, boolean f
}
}
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName));
new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName));
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
Expand Down Expand Up @@ -256,73 +238,7 @@ public FlintWriter createWriter(String indexName) {

@Override
public IRestHighLevelClient createClient() {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));

// SigV4 support
if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) {
// Use DefaultAWSCredentialsProviderChain by default.
final AtomicReference<AWSCredentialsProvider> customAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
String customProviderClass = options.getCustomAwsCredentialsProvider();
if (!Strings.isNullOrEmpty(customProviderClass)) {
instantiateProvider(customProviderClass, customAWSCredentialsProvider);
}

// Set metadataAccessAWSCredentialsProvider to customAWSCredentialsProvider by default for backwards compatibility
// unless a specific metadata access provider class name is provided
String metadataAccessProviderClass = options.getMetadataAccessAwsCredentialsProvider();
final AtomicReference<AWSCredentialsProvider> metadataAccessAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());

String systemIndexName = Strings.isNullOrEmpty(options.getSystemIndexName()) ? metaLogIndexName : options.getSystemIndexName();

if (Strings.isNullOrEmpty(metadataAccessProviderClass)) {
metadataAccessAWSCredentialsProvider.set(customAWSCredentialsProvider.get());
} else {
instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider);
}

restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.addInterceptorLast(
new ResourceBasedAWSRequestSigningApacheInterceptor(
SERVICE_NAME, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName));
return RetryableHttpAsyncClient.builder(delegate, options);
}
);
} else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(options.getUsername(), options.getPassword()));
restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.setDefaultCredentialsProvider(credentialsProvider);
return RetryableHttpAsyncClient.builder(delegate, options);
});
} else {
restClientBuilder.setHttpClientConfigCallback(delegate ->
RetryableHttpAsyncClient.builder(delegate, options));
}

final RequestConfigurator callback = new RequestConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);

return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
}

/**
* Attempts to instantiate the AWS credential provider using reflection.
*/
private void instantiateProvider(String providerClass, AtomicReference<AWSCredentialsProvider> provider) {
try {
Class<?> awsCredentialsProviderClass = Class.forName(providerClass);
Constructor<?> ctor = awsCredentialsProviderClass.getDeclaredConstructor();
ctor.setAccessible(true);
provider.set((AWSCredentialsProvider) ctor.newInstance());
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate AWSCredentialsProvider: " + providerClass, e);
}
return OpenSearchClientUtils.createClient(options);
}

/*
Expand All @@ -339,7 +255,7 @@ private FlintMetadata constructFlintMetadata(String indexName, String mapping, S
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
FlintOpenSearchMetadataLog metadataLog =
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName);
new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName);
latest = metadataLog.getLatest();
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
Expand All @@ -37,9 +37,9 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadat
private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLog.class.getName());

/**
* Flint client to create Rest OpenSearch client (This will be refactored later)
* Flint options to create Rest OpenSearch client
*/
private final FlintClient flintClient;
private final FlintOptions options;

/**
* Reuse query request index as Flint metadata log store
Expand All @@ -51,8 +51,8 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadat
*/
private final String latestId;

public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName, String metaLogIndexName) {
this.flintClient = flintClient;
public FlintOpenSearchMetadataLog(FlintOptions options, String flintIndexName, String metaLogIndexName) {
this.options = options;
this.metaLogIndexName = metaLogIndexName;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
}
Expand All @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
@Override
public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
GetResponse response =
client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

Expand All @@ -102,7 +102,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -151,7 +151,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
private FlintMetadataLogEntry writeLogEntry(
FlintMetadataLogEntry logEntry,
CheckedFunction<IRestHighLevelClient, DocWriteResponse> write) {
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
// Write (create or update) the doc
DocWriteResponse response = write.apply(client);

Expand All @@ -174,13 +174,17 @@ private FlintMetadataLogEntry writeLogEntry(

private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e);
}
}

private IRestHighLevelClient createOpenSearchClient() {
return OpenSearchClientUtils.createClient(options);
}

@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply(T t) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import java.lang.reflect.Constructor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
import org.opensearch.flint.core.auth.ResourceBasedAWSRequestSigningApacheInterceptor;
import org.opensearch.flint.core.http.RetryableHttpAsyncClient;

/**
* Utility functions to create {@link IRestHighLevelClient}.
*/
public class OpenSearchClientUtils {

private static final String SERVICE_NAME = "es";

/**
* Metadata log index name prefix
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

public static IRestHighLevelClient createClient(FlintOptions options) {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));

if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) {
restClientBuilder = configureSigV4Auth(restClientBuilder, options);
} else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) {
restClientBuilder = configureBasicAuth(restClientBuilder, options);
} else {
restClientBuilder = configureDefaultAuth(restClientBuilder, options);
}

final RequestConfigurator callback = new RequestConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);

return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
}

private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClientBuilder, FlintOptions options) {
// Use DefaultAWSCredentialsProviderChain by default.
final AtomicReference<AWSCredentialsProvider> customAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
String customProviderClass = options.getCustomAwsCredentialsProvider();
if (!Strings.isNullOrEmpty(customProviderClass)) {
instantiateProvider(customProviderClass, customAWSCredentialsProvider);
}

// Set metadataAccessAWSCredentialsProvider to customAWSCredentialsProvider by default for backwards compatibility
// unless a specific metadata access provider class name is provided
String metadataAccessProviderClass = options.getMetadataAccessAwsCredentialsProvider();
final AtomicReference<AWSCredentialsProvider> metadataAccessAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());

String metaLogIndexName = constructMetaLogIndexName(options.getDataSourceName());
String systemIndexName = Strings.isNullOrEmpty(options.getSystemIndexName()) ? metaLogIndexName : options.getSystemIndexName();

if (Strings.isNullOrEmpty(metadataAccessProviderClass)) {
metadataAccessAWSCredentialsProvider.set(customAWSCredentialsProvider.get());
} else {
instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider);
}

restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.addInterceptorLast(
new ResourceBasedAWSRequestSigningApacheInterceptor(
SERVICE_NAME, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName));
return RetryableHttpAsyncClient.builder(delegate, options);
}
);

return restClientBuilder;
}

private static RestClientBuilder configureBasicAuth(RestClientBuilder restClientBuilder, FlintOptions options) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(options.getUsername(), options.getPassword()));
restClientBuilder.setHttpClientConfigCallback(builder -> {
HttpAsyncClientBuilder delegate = builder.setDefaultCredentialsProvider(credentialsProvider);
return RetryableHttpAsyncClient.builder(delegate, options);
});

return restClientBuilder;
}

private static RestClientBuilder configureDefaultAuth(RestClientBuilder restClientBuilder, FlintOptions options) {
// No auth
restClientBuilder.setHttpClientConfigCallback(delegate ->
RetryableHttpAsyncClient.builder(delegate, options));
return restClientBuilder;
}

/**
* Attempts to instantiate the AWS credential provider using reflection.
*/
private static void instantiateProvider(String providerClass, AtomicReference<AWSCredentialsProvider> provider) {
try {
Class<?> awsCredentialsProviderClass = Class.forName(providerClass);
Constructor<?> ctor = awsCredentialsProviderClass.getDeclaredConstructor();
ctor.setAccessible(true);
provider.set((AWSCredentialsProvider) ctor.newInstance());
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate AWSCredentialsProvider: " + providerClass, e);
}
}

private static String constructMetaLogIndexName(String dataSourceName) {
return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName;
}
}

0 comments on commit 385a344

Please sign in to comment.