diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index d8bc6765d..202f3cc7d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -7,10 +7,7 @@ import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -18,20 +15,10 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import java.util.stream.Collectors; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; @@ -45,13 +32,10 @@ import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.flint.core.auth.ResourceBasedAWSRequestSigningApacheInterceptor; -import org.opensearch.flint.core.http.RetryableHttpAsyncClient; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; -import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -67,8 +51,6 @@ public class FlintOpenSearchClient implements FlintClient { private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName()); - private static final String SERVICE_NAME = "es"; - /** * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. @@ -117,7 +99,7 @@ public OptimisticTransaction startTransaction(String indexName, boolean f } } return new DefaultOptimisticTransaction<>(dataSourceName, - new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName)); + new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName)); } catch (IOException e) { throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e); } @@ -256,73 +238,7 @@ public FlintWriter createWriter(String indexName) { @Override public IRestHighLevelClient createClient() { - RestClientBuilder - restClientBuilder = - RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); - - // SigV4 support - if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) { - // Use DefaultAWSCredentialsProviderChain by default. - final AtomicReference customAWSCredentialsProvider = - new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); - String customProviderClass = options.getCustomAwsCredentialsProvider(); - if (!Strings.isNullOrEmpty(customProviderClass)) { - instantiateProvider(customProviderClass, customAWSCredentialsProvider); - } - - // Set metadataAccessAWSCredentialsProvider to customAWSCredentialsProvider by default for backwards compatibility - // unless a specific metadata access provider class name is provided - String metadataAccessProviderClass = options.getMetadataAccessAwsCredentialsProvider(); - final AtomicReference metadataAccessAWSCredentialsProvider = - new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); - - String systemIndexName = Strings.isNullOrEmpty(options.getSystemIndexName()) ? metaLogIndexName : options.getSystemIndexName(); - - if (Strings.isNullOrEmpty(metadataAccessProviderClass)) { - metadataAccessAWSCredentialsProvider.set(customAWSCredentialsProvider.get()); - } else { - instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider); - } - - restClientBuilder.setHttpClientConfigCallback(builder -> { - HttpAsyncClientBuilder delegate = builder.addInterceptorLast( - new ResourceBasedAWSRequestSigningApacheInterceptor( - SERVICE_NAME, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName)); - return RetryableHttpAsyncClient.builder(delegate, options); - } - ); - } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); - restClientBuilder.setHttpClientConfigCallback(builder -> { - HttpAsyncClientBuilder delegate = builder.setDefaultCredentialsProvider(credentialsProvider); - return RetryableHttpAsyncClient.builder(delegate, options); - }); - } else { - restClientBuilder.setHttpClientConfigCallback(delegate -> - RetryableHttpAsyncClient.builder(delegate, options)); - } - - final RequestConfigurator callback = new RequestConfigurator(options); - restClientBuilder.setRequestConfigCallback(callback); - - return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder)); - } - - /** - * Attempts to instantiate the AWS credential provider using reflection. - */ - private void instantiateProvider(String providerClass, AtomicReference provider) { - try { - Class awsCredentialsProviderClass = Class.forName(providerClass); - Constructor ctor = awsCredentialsProviderClass.getDeclaredConstructor(); - ctor.setAccessible(true); - provider.set((AWSCredentialsProvider) ctor.newInstance()); - } catch (Exception e) { - throw new RuntimeException("Failed to instantiate AWSCredentialsProvider: " + providerClass, e); - } + return OpenSearchClientUtils.createClient(options); } /* @@ -339,7 +255,7 @@ private FlintMetadata constructFlintMetadata(String indexName, String mapping, S if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); FlintOpenSearchMetadataLog metadataLog = - new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName); + new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName); latest = metadataLog.getLatest(); } } catch (IOException e) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 7195ae177..30c711e9a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -23,7 +23,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; @@ -37,9 +37,9 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog getLatest() { LOG.info("Fetching latest log entry with id " + latestId); - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { GetResponse response = client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -102,7 +102,7 @@ public Optional getLatest() { @Override public void purge() { LOG.info("Purging log entry with id " + latestId); - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { DeleteResponse response = client.delete( new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -151,7 +151,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { private FlintMetadataLogEntry writeLogEntry( FlintMetadataLogEntry logEntry, CheckedFunction write) { - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { // Write (create or update) the doc DocWriteResponse response = write.apply(client); @@ -174,13 +174,17 @@ private FlintMetadataLogEntry writeLogEntry( private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); } } + private IRestHighLevelClient createOpenSearchClient() { + return OpenSearchClientUtils.createClient(options); + } + @FunctionalInterface public interface CheckedFunction { R apply(T t) throws IOException; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java new file mode 100644 index 000000000..c047ced51 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import java.lang.reflect.Constructor; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.Strings; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.IRestHighLevelClient; +import org.opensearch.flint.core.RestHighLevelClientWrapper; +import org.opensearch.flint.core.auth.ResourceBasedAWSRequestSigningApacheInterceptor; +import org.opensearch.flint.core.http.RetryableHttpAsyncClient; + +/** + * Utility functions to create {@link IRestHighLevelClient}. + */ +public class OpenSearchClientUtils { + + private static final String SERVICE_NAME = "es"; + + /** + * Metadata log index name prefix + */ + public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; + + public static IRestHighLevelClient createClient(FlintOptions options) { + RestClientBuilder + restClientBuilder = + RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); + + if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) { + restClientBuilder = configureSigV4Auth(restClientBuilder, options); + } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { + restClientBuilder = configureBasicAuth(restClientBuilder, options); + } else { + restClientBuilder = configureDefaultAuth(restClientBuilder, options); + } + + final RequestConfigurator callback = new RequestConfigurator(options); + restClientBuilder.setRequestConfigCallback(callback); + + return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder)); + } + + private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClientBuilder, FlintOptions options) { + // Use DefaultAWSCredentialsProviderChain by default. + final AtomicReference customAWSCredentialsProvider = + new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); + String customProviderClass = options.getCustomAwsCredentialsProvider(); + if (!Strings.isNullOrEmpty(customProviderClass)) { + instantiateProvider(customProviderClass, customAWSCredentialsProvider); + } + + // Set metadataAccessAWSCredentialsProvider to customAWSCredentialsProvider by default for backwards compatibility + // unless a specific metadata access provider class name is provided + String metadataAccessProviderClass = options.getMetadataAccessAwsCredentialsProvider(); + final AtomicReference metadataAccessAWSCredentialsProvider = + new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); + + String metaLogIndexName = constructMetaLogIndexName(options.getDataSourceName()); + String systemIndexName = Strings.isNullOrEmpty(options.getSystemIndexName()) ? metaLogIndexName : options.getSystemIndexName(); + + if (Strings.isNullOrEmpty(metadataAccessProviderClass)) { + metadataAccessAWSCredentialsProvider.set(customAWSCredentialsProvider.get()); + } else { + instantiateProvider(metadataAccessProviderClass, metadataAccessAWSCredentialsProvider); + } + + restClientBuilder.setHttpClientConfigCallback(builder -> { + HttpAsyncClientBuilder delegate = builder.addInterceptorLast( + new ResourceBasedAWSRequestSigningApacheInterceptor( + SERVICE_NAME, options.getRegion(), customAWSCredentialsProvider.get(), metadataAccessAWSCredentialsProvider.get(), systemIndexName)); + return RetryableHttpAsyncClient.builder(delegate, options); + } + ); + + return restClientBuilder; + } + + private static RestClientBuilder configureBasicAuth(RestClientBuilder restClientBuilder, FlintOptions options) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); + restClientBuilder.setHttpClientConfigCallback(builder -> { + HttpAsyncClientBuilder delegate = builder.setDefaultCredentialsProvider(credentialsProvider); + return RetryableHttpAsyncClient.builder(delegate, options); + }); + + return restClientBuilder; + } + + private static RestClientBuilder configureDefaultAuth(RestClientBuilder restClientBuilder, FlintOptions options) { + // No auth + restClientBuilder.setHttpClientConfigCallback(delegate -> + RetryableHttpAsyncClient.builder(delegate, options)); + return restClientBuilder; + } + + /** + * Attempts to instantiate the AWS credential provider using reflection. + */ + private static void instantiateProvider(String providerClass, AtomicReference provider) { + try { + Class awsCredentialsProviderClass = Class.forName(providerClass); + Constructor ctor = awsCredentialsProviderClass.getDeclaredConstructor(); + ctor.setAccessible(true); + provider.set((AWSCredentialsProvider) ctor.newInstance()); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate AWSCredentialsProvider: " + providerClass, e); + } + } + + private static String constructMetaLogIndexName(String dataSourceName) { + return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; + } +}