Skip to content

Commit

Permalink
Refactor to follow the principle of programming to an interface
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jan 26, 2024
1 parent b47386e commit ece16a0
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;

import java.io.Closeable;
import java.io.IOException;

/**
* Interface for wrapping the OpenSearch High Level REST Client with additional functionality,
* such as metrics tracking.
*/
public interface IRestHighLevelClient {
public interface IRestHighLevelClient extends Closeable {

BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
* with integrated metrics tracking.
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable {
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
*/
FlintWriter createWriter(String indexName);
/**
* Create {@link RestHighLevelClientWrapper}.
* @return {@link RestHighLevelClientWrapper}
* Create {@link IRestHighLevelClient}.
* @return {@link IRestHighLevelClient}
*/
public RestHighLevelClientWrapper createClient();
public IRestHighLevelClient createClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor;
import org.opensearch.flint.core.http.RetryableHttpAsyncClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
Expand Down Expand Up @@ -97,7 +98,7 @@ public <T> OptimisticTransaction<T> startTransaction(String indexName, String da
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
try (RestHighLevelClientWrapper client = createClient()) {
try (IRestHighLevelClient client = createClient()) {
if (client.isIndexExists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
Expand Down Expand Up @@ -131,7 +132,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClientWrapper client = createClient()) {
try (IRestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(mapping, XContentType.JSON);
if (settings.isDefined()) {
Expand All @@ -147,7 +148,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClientWrapper client = createClient()) {
try (IRestHighLevelClient client = createClient()) {
return client.isIndexExists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e);
Expand All @@ -158,7 +159,7 @@ public boolean exists(String indexName) {
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (RestHighLevelClientWrapper client = createClient()) {
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

Expand All @@ -176,7 +177,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClientWrapper client = createClient()) {
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

Expand All @@ -192,7 +193,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClientWrapper client = createClient()) {
try (IRestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);
client.deleteIndex(request, RequestOptions.DEFAULT);
} catch (Exception e) {
Expand Down Expand Up @@ -233,7 +234,7 @@ public FlintWriter createWriter(String indexName) {
}

@Override
public RestHighLevelClientWrapper createClient() {
public IRestHighLevelClient createClient() {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.RestHighLevelClientWrapper;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
Expand Down Expand Up @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
@Override
public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
try (IRestHighLevelClient client = flintClient.createClient()) {
GetResponse response =
client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

Expand All @@ -102,7 +102,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
try (IRestHighLevelClient client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -150,8 +150,8 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {

private FlintMetadataLogEntry writeLogEntry(
FlintMetadataLogEntry logEntry,
CheckedFunction<RestHighLevelClientWrapper, DocWriteResponse> write) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
CheckedFunction<IRestHighLevelClient, DocWriteResponse> write) {
try (IRestHighLevelClient client = flintClient.createClient()) {
// Write (create or update) the doc
DocWriteResponse response = write.apply(client);

Expand All @@ -174,7 +174,7 @@ private FlintMetadataLogEntry writeLogEntry(

private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
try (IRestHighLevelClient client = flintClient.createClient()) {
return client.isIndexExists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.SearchHit;

import java.io.IOException;
Expand All @@ -24,14 +24,14 @@ public abstract class OpenSearchReader implements FlintReader {
/** Search request source builder. */
private final SearchRequest searchRequest;

protected final RestHighLevelClientWrapper client;
protected final IRestHighLevelClient client;

/**
* iterator of one-shot search result.
*/
private Iterator<SearchHit> iterator = null;

public OpenSearchReader(RestHighLevelClientWrapper client, SearchRequest searchRequest) {
public OpenSearchReader(IRestHighLevelClient client, SearchRequest searchRequest) {
this.client = client;
this.searchRequest = searchRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
Expand All @@ -35,7 +35,7 @@ public class OpenSearchScrollReader extends OpenSearchReader {

private String scrollId = null;

public OpenSearchScrollReader(RestHighLevelClientWrapper client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
public OpenSearchScrollReader(IRestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize())));
this.options = options;
this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
import org.opensearch.flint.core.IRestHighLevelClient;

import java.io.IOException;
import java.util.logging.Level;
Expand All @@ -30,7 +30,7 @@ public void upsert(String id, String doc) {
// credentials may expire.
// also, failure to close the client causes the job to be stuck in the running state as the client resource
// is not released.
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
try (IRestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
Expand All @@ -47,7 +47,7 @@ public void upsert(String id, String doc) {
}

public void update(String id, String doc) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
try (IRestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
Expand All @@ -63,7 +63,7 @@ public void update(String id, String doc) {
}

public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
try (IRestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
Expand All @@ -80,7 +80,7 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
}
}

private void assertIndexExist(RestHighLevelClientWrapper client, String indexName) throws IOException {
private void assertIndexExist(IRestHighLevelClient client, String indexName) throws IOException {
LOG.info("Checking if index exists " + indexName);
if (!client.isIndexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
String errorMsg = "Index not found " + indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.RestHighLevelClientWrapper;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
Expand All @@ -29,9 +29,9 @@ public class OpenSearchWriter extends FlintWriter {

private StringBuilder sb;

private RestHighLevelClientWrapper client;
private IRestHighLevelClient client;

public OpenSearchWriter(RestHighLevelClientWrapper client, String indexName, String refreshPolicy) {
public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) {
this.client = client;
this.indexName = indexName;
this.sb = new StringBuilder();
Expand Down

0 comments on commit ece16a0

Please sign in to comment.