Skip to content

Commit

Permalink
Abstract FlintClient
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Apr 30, 2024
1 parent a8a376f commit c874a38
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
FlintWriter createWriter(String indexName);

/**
* Create {@link IRestHighLevelClient}.
* @return {@link IRestHighLevelClient}
* Create {@link Object}.
* @return {@link Object}
*/
IRestHighLevelClient createClient();
Object createClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public <T> OptimisticTransaction<T> startTransaction(
String indexName, String dataSourceName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = constructMetaLogIndexName(dataSourceName);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
Expand Down Expand Up @@ -135,7 +135,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(mapping, XContentType.JSON);
if (settings.isDefined()) {
Expand All @@ -151,7 +151,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
return client.doesIndexExist(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e);
Expand All @@ -162,7 +162,7 @@ public boolean exists(String indexName) {
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

Expand All @@ -181,7 +181,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

Expand All @@ -197,7 +197,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
public void updateIndex(String indexName, FlintMetadata metadata) {
LOG.info("Updating Flint index " + indexName + " with metadata " + metadata);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
PutMappingRequest request = new PutMappingRequest(osIndexName);
request.source(metadata.getContent(), XContentType.JSON);
client.updateIndexMapping(request, RequestOptions.DEFAULT);
Expand All @@ -210,7 +210,7 @@ public void updateIndex(String indexName, FlintMetadata metadata) {
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);
client.deleteIndex(request, RequestOptions.DEFAULT);
} catch (Exception e) {
Expand All @@ -236,7 +236,7 @@ public FlintReader createReader(String indexName, String query) {
XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query);
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
return new OpenSearchScrollReader(createOpenSearchClient(),
sanitizeIndexName(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
Expand All @@ -247,11 +247,15 @@ public FlintReader createReader(String indexName, String query) {

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
return new OpenSearchWriter(createOpenSearchClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
}

@Override
public IRestHighLevelClient createClient() {
public Object createClient() {
return createOpenSearchClient();
}

private IRestHighLevelClient createOpenSearchClient() {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));
Expand Down Expand Up @@ -332,7 +336,7 @@ private FlintMetadata constructFlintMetadata(String indexName, String mapping, S
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Optional<FlintMetadataLogEntry> latest = Optional.empty();

try (IRestHighLevelClient client = createClient()) {
try (IRestHighLevelClient client = createOpenSearchClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
FlintOpenSearchMetadataLog metadataLog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
@Override
public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) {
GetResponse response =
client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

Expand All @@ -102,7 +102,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -151,7 +151,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
private FlintMetadataLogEntry writeLogEntry(
FlintMetadataLogEntry logEntry,
CheckedFunction<IRestHighLevelClient, DocWriteResponse> write) {
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) {
// Write (create or update) the doc
DocWriteResponse response = write.apply(client);

Expand All @@ -174,7 +174,7 @@ private FlintMetadataLogEntry writeLogEntry(

private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) {
return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void updateDocument(String id, String doc, boolean upsert, long seqNo, l
// credentials may expire.
// also, failure to close the client causes the job to be stuck in the running state as the client resource
// is not released.
try (IRestHighLevelClient client = flintClient.createClient()) {
try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest updateRequest = new UpdateRequest(indexName, id)
.doc(doc, XContentType.JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
new SearchModule(Settings.builder.build, new ArrayList[SearchPlugin]).getNamedXContents)
def getIndexMetadata(osIndexName: String): String = {

using(flintClient.createClient()) { client =>
using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client =>
val request = new GetIndexRequest(osIndexName)
try {
val response = client.getIndex(request, RequestOptions.DEFAULT)
Expand Down Expand Up @@ -70,7 +70,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
def createIndex(osIndexName: String, mapping: String): Unit = {
logInfo(s"create $osIndexName")

using(flintClient.createClient()) { client =>
using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client =>
val request = new CreateIndexRequest(osIndexName)
request.mapping(mapping, XContentType.JSON)

Expand Down Expand Up @@ -116,7 +116,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
new OpenSearchUpdater(indexName, flintClient)

def getDoc(osIndexName: String, id: String): GetResponse = {
using(flintClient.createClient()) { client =>
using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client =>
val request = new GetRequest(osIndexName, id)
val result = Try(client.get(request, RequestOptions.DEFAULT))
result match {
Expand Down Expand Up @@ -147,7 +147,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser)
}
new OpenSearchScrollReader(
flintClient.createClient(),
flintClient.createClient().asInstanceOf[IRestHighLevelClient],
indexName,
new SearchSourceBuilder().query(queryBuilder).sort(sort, SortOrder.ASC),
flintOptions)
Expand All @@ -157,7 +157,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
}

def doesIndexExist(indexName: String): Boolean = {
using(flintClient.createClient()) { client =>
using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client =>
try {
val request = new GetIndexRequest(indexName)
client.doesIndexExist(request, RequestOptions.DEFAULT)
Expand All @@ -179,8 +179,9 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query)
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser)
}

new OpenSearchQueryReader(
flintClient.createClient(),
flintClient.createClient().asInstanceOf[IRestHighLevelClient],
indexName,
new SearchSourceBuilder().query(queryBuilder).sort(sort, sortOrder))
} catch {
Expand Down

0 comments on commit c874a38

Please sign in to comment.