Skip to content

Commit

Permalink
rm flint os client dep on metadata log service
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jun 14, 2024
1 parent 8bad367 commit 32f581b
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package org.opensearch.flint.core;

import java.util.List;
import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

Expand Down Expand Up @@ -38,9 +37,10 @@ public interface FlintClient {
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return all matched index metadata
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
List<FlintMetadata> getAllIndexMetadata(String indexNamePattern);
Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern);

/**
* Retrieve metadata in a Flint index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
package org.opensearch.flint.core;

import org.opensearch.flint.core.storage.FlintOpenSearchClient;
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService;

/**
* {@link FlintClient} builder.
*/
public class FlintClientBuilder {

public static FlintClient build(FlintOptions options) {
return new FlintOpenSearchClient(options, new FlintOpenSearchMetadataLogService(options));
return new FlintOpenSearchClient(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -33,10 +32,6 @@
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -68,15 +63,9 @@ public class FlintOpenSearchClient implements FlintClient {
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

private final FlintOptions options;
private final FlintMetadataLogService metadataLogService;

public FlintOpenSearchClient(FlintOptions options, FlintMetadataLogService metadataLogService) {
this.options = options;
this.metadataLogService = metadataLogService;
}

public FlintOpenSearchClient(FlintOptions options) {
this(options, new FlintOpenSearchMetadataLogService(options));
this.options = options;
}

@Override
Expand Down Expand Up @@ -112,19 +101,21 @@ public boolean exists(String indexName) {
}

@Override
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
public Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
.map(index -> constructFlintMetadata(
index,
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
.collect(Collectors.toMap(
index -> index,
index -> FlintMetadata.apply(
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()
)
));
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e);
}
Expand All @@ -140,7 +131,7 @@ public FlintMetadata getIndexMetadata(String indexName) {

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return constructFlintMetadata(indexName, mapping.source().string(), settings.toString());
return FlintMetadata.apply(mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
Expand Down Expand Up @@ -210,17 +201,6 @@ public IRestHighLevelClient createClient() {
return OpenSearchClientUtils.createClient(options);
}

/*
* Constructs Flint metadata with latest metadata log entry attached if it's available.
*/
private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) {
Optional<FlintMetadataLogEntry> latest = metadataLogService.getIndexMetadataLog(indexName)
.flatMap(FlintMetadataLog::getLatest);
return latest
.map(entry -> FlintMetadata.apply(mapping, settings, entry))
.orElseGet(() -> FlintMetadata.apply(mapping, settings));
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import scala.collection.JavaConverters._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, FlintMetadataLogServiceBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
Expand Down Expand Up @@ -176,6 +177,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map { case (indexName, metadata) =>
attachLatestLogEntry(indexName, metadata)
}
.toList
.flatMap(FlintSparkIndexFactory.create)
} else {
Seq.empty
Expand All @@ -194,7 +199,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Describing index name $indexName")
if (flintClient.exists(indexName)) {
val metadata = flintClient.getIndexMetadata(indexName)
FlintSparkIndexFactory.create(metadata)
val metadataWithEntry = attachLatestLogEntry(indexName, metadata)
FlintSparkIndexFactory.create(metadataWithEntry)
} else {
Option.empty
}
Expand Down Expand Up @@ -386,6 +392,27 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

/**
* Attaches latest log entry to metadata if available.
*
* @param indexName
* index name
* @param metadata
* base flint metadata
* @return
* flint metadata with latest log entry attached if available
*/
private def attachLatestLogEntry(indexName: String, metadata: FlintMetadata): FlintMetadata = {
val latest = flintMetadataLogService
.getIndexMetadataLog(indexName)
.flatMap(_.getLatest)
if (latest.isPresent) {
metadata.copy(latestLogEntry = Some(latest.get()))
} else {
metadata
}
}

/**
* Validate the index update options are allowed.
* @param originalOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ import java.util.Base64

import scala.collection.JavaConverters._

import org.mockito.Mockito.when
import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME

Expand Down Expand Up @@ -92,34 +89,4 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers {
val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex, true)
metadataLog.isPresent shouldBe true
}

test("should get index metadata with latest log entry") {
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.indexSettings).thenReturn(None)
when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry))

flintClient.createIndex(testFlintIndex, metadata)
createLatestLogEntry(flintMetadataLogEntry)

val latest = flintClient.getIndexMetadata(testFlintIndex).latestLogEntry
latest.isDefined shouldBe true
latest.get.id shouldBe testLatestId
latest.get.createTime shouldBe testCreateTime
latest.get.dataSource shouldBe testDataSourceName
latest.get.error shouldBe ""

deleteTestIndex(testFlintIndex)
}

test("should get index metadata without log entry") {
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.indexSettings).thenReturn(None)
flintClient.createIndex(testFlintIndex, metadata)

flintClient.getIndexMetadata(testFlintIndex).latestLogEntry shouldBe empty

deleteTestIndex(testFlintIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M

val allMetadata = flintClient.getAllIndexMetadata("flint_*_index")
allMetadata should have size 2
allMetadata.forEach(metadata => metadata.getContent should not be empty)
allMetadata.forEach(metadata => metadata.indexSettings should not be empty)
allMetadata.values.forEach(metadata => metadata.getContent should not be empty)
allMetadata.values.forEach(metadata => metadata.indexSettings should not be empty)
}

it should "convert index name to all lowercase" in {
Expand Down

0 comments on commit 32f581b

Please sign in to comment.