diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 17358b9a9..c1f5d78c1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -23,10 +23,10 @@ public interface FlintClient { * Start a new optimistic transaction. * * @param indexName index name - * @param metaLogIndexName metadata log index name + * @param dataSourceName TODO: read from elsewhere in future * @return transaction handle */ - OptimisticTransaction startTransaction(String indexName, String metaLogIndexName); + OptimisticTransaction startTransaction(String indexName, String dataSourceName); /** * Create a Flint index with the metadata given. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index ea0fb0f98..5e2baceab 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -32,6 +32,8 @@ case class FlintMetadata( properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], /** Flint index schema */ schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + /** Optional latest metadata log entry */ + latestId: Option[String] = None, /** Optional Flint index settings. TODO: move elsewhere? */ indexSettings: Option[String]) { @@ -58,6 +60,9 @@ case class FlintMetadata( .field("source", source) .field("indexedColumns", indexedColumns) + if (latestId.isDefined) { + builder.field("latestId", latestId.get) + } optionalObjectField(builder, "options", options) optionalObjectField(builder, "properties", properties) } @@ -219,14 +224,14 @@ object FlintMetadata { def build(): FlintMetadata = { FlintMetadata( if (version == null) current() else version, - name, - kind, - source, - indexedColumns, - options, - properties, - schema, - indexSettings) + name = name, + kind = kind, + source = source, + indexedColumns = indexedColumns, + options = options, + properties = properties, + schema = schema, + indexSettings = indexSettings) } } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index ffb948b2f..2019d8812 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -24,6 +24,11 @@ public class DefaultOptimisticTransaction implements OptimisticTransaction private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName()); + /** + * Data source name. TODO: remove this in future. + */ + private final String dataSourceName; + /** * Flint metadata log */ @@ -34,7 +39,9 @@ public class DefaultOptimisticTransaction implements OptimisticTransaction private Function finalAction = null; public DefaultOptimisticTransaction( + String dataSourceName, FlintMetadataLog metadataLog) { + this.dataSourceName = dataSourceName; this.metadataLog = metadataLog; } @@ -95,7 +102,7 @@ private FlintMetadataLogEntry emptyLogEntry() { UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, IndexState$.MODULE$.EMPTY(), - "mys3", // TODO: get it from spark conf + dataSourceName, ""); } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index a456757b7..43f5af85d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -68,17 +68,26 @@ public class FlintOpenSearchClient implements FlintClient { new NamedXContentRegistry(new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + /** + * Metadata log index name prefix + */ + public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; + private final FlintOptions options; public FlintOpenSearchClient(FlintOptions options) { this.options = options; } - @Override public OptimisticTransaction startTransaction(String indexName, String metaLogIndexName) { + @Override + public OptimisticTransaction startTransaction(String indexName, String dataSourceName) { + String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX + : META_LOG_NAME_PREFIX + "_" + dataSourceName; LOG.info("Starting transaction on index " + indexName + " and metadata log index " + metaLogIndexName); + try (RestHighLevelClient client = createClient()) { if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { - return new DefaultOptimisticTransaction<>( + return new DefaultOptimisticTransaction<>(dataSourceName, new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName)); } else { return new NoOptimisticTransaction<>(); @@ -88,7 +97,8 @@ public FlintOpenSearchClient(FlintOptions options) { } } - @Override public void createIndex(String indexName, FlintMetadata metadata) { + @Override + public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { @@ -105,7 +115,8 @@ public FlintOpenSearchClient(FlintOptions options) { } } - @Override public boolean exists(String indexName) { + @Override + public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { @@ -115,7 +126,8 @@ public FlintOpenSearchClient(FlintOptions options) { } } - @Override public List getAllIndexMetadata(String indexNamePattern) { + @Override + public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = toLowercase(indexNamePattern); try (RestHighLevelClient client = createClient()) { @@ -132,7 +144,8 @@ public FlintOpenSearchClient(FlintOptions options) { } } - @Override public FlintMetadata getIndexMetadata(String indexName) { + @Override + public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { @@ -147,7 +160,8 @@ public FlintOpenSearchClient(FlintOptions options) { } } - @Override public void deleteIndex(String indexName) { + @Override + public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { @@ -163,10 +177,11 @@ public FlintOpenSearchClient(FlintOptions options) { * Create {@link FlintReader}. * * @param indexName index name. - * @param query DSL query. DSL query is null means match_all. + * @param query DSL query. DSL query is null means match_all. * @return {@link FlintReader}. */ - @Override public FlintReader createReader(String indexName, String query) { + @Override + public FlintReader createReader(String indexName, String query) { LOG.info("Creating Flint index reader for " + indexName + " with query " + query); try { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); @@ -190,7 +205,8 @@ public FlintWriter createWriter(String indexName) { return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); } - @Override public RestHighLevelClient createClient() { + @Override + public RestHighLevelClient createClient() { RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 43c20de91..dc2efc595 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -40,16 +40,16 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog getLatest() { LOG.info("Fetching latest log entry with id " + latestId); try (RestHighLevelClient client = flintClient.createClient()) { GetResponse response = - client.get(new GetRequest(indexName, latestId), RequestOptions.DEFAULT); + client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); if (response.isExists()) { FlintMetadataLogEntry latest = new FlintMetadataLogEntry( @@ -105,7 +105,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { return writeLogEntry(logEntryWithId, client -> client.index( new IndexRequest() - .index(indexName) + .index(metaLogIndexName) .id(logEntryWithId.id()) .source(logEntryWithId.toJson(), XContentType.JSON), RequestOptions.DEFAULT)); @@ -115,7 +115,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Updating log entry " + logEntry); return writeLogEntry(logEntry, client -> client.update( - new UpdateRequest(indexName, logEntry.id()) + new UpdateRequest(metaLogIndexName, logEntry.id()) .doc(logEntry.toJson(), XContentType.JSON) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .setIfSeqNo(logEntry.seqNo()) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 332b9618e..fd93ff26b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -50,13 +50,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { private val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) /** - * Metadata log index name with a default name for backward compatibility. If the index doesn't - * exist, the transaction support will be disabled in FlintClient. + * Data source name. Assign empty string in case of backward compatibility. TODO: remove this in + * future */ - private val metaLogIndexName: String = { - val indexName = spark.conf.getOption("spark.flint.job.requestIndex") - indexName.getOrElse(".query_execution_request") - } + private val dataSourceName: String = + spark.conf.getOption("spark.flint.datasource.name").getOrElse("") /** * Create index builder for creating index with fluent API. @@ -107,7 +105,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val metadata = index.metadata() try { flintClient - .startTransaction(indexName, metaLogIndexName) + .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == EMPTY) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -137,7 +135,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { try { flintClient - .startTransaction(indexName, metaLogIndexName) + .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == ACTIVE) .transientLog(latest => latest.copy(state = REFRESHING)) .finalLog(latest => { @@ -210,7 +208,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { if (flintClient.exists(indexName)) { try { flintClient - .startTransaction(indexName, metaLogIndexName) + .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING) .transientLog(latest => latest.copy(state = DELETING)) .finalLog(latest => latest.copy(state = DELETED)) @@ -251,7 +249,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo("Scheduler triggers index log entry update") try { flintClient - .startTransaction(indexName, metaLogIndexName) + .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == REFRESHING) .finalLog(latest => latest) // timestamp will update automatically .commit(latest => logInfo("Updating log entry to " + latest)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index f5ae10b5e..1e7077799 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -18,6 +18,7 @@ import org.opensearch.client.indices.CreateIndexRequest import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState +import org.opensearch.flint.core.storage.FlintOpenSearchClient._ import org.opensearch.flint.spark.FlintSparkSuite /** @@ -26,11 +27,12 @@ import org.opensearch.flint.spark.FlintSparkSuite */ trait OpenSearchTransactionSuite extends FlintSparkSuite { - val testMetaLogIndex = ".query_execution_request_mys3" + val testDataSourceName = "myglue" + lazy val testMetaLogIndex: String = META_LOG_NAME_PREFIX + "_" + testDataSourceName override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set("spark.flint.job.requestIndex", testMetaLogIndex) + spark.conf.set("spark.flint.datasource.name", testDataSourceName) } override def beforeEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 29eae82f4..a6a1dd889 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -29,7 +29,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should transit from initial to final log if initial log is empty") { flintClient - .startTransaction(testFlintIndex, testMetaLogIndex) + .startTransaction(testFlintIndex, testDataSourceName) .initialLog(latest => { latest.state shouldBe EMPTY true @@ -43,7 +43,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should transit from initial to final log directly if no transient log") { flintClient - .startTransaction(testFlintIndex, testMetaLogIndex) + .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => true) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(_ => latestLogEntry(testLatestId) should contain("state" -> "empty")) @@ -64,7 +64,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { error = "")) flintClient - .startTransaction(testFlintIndex, testMetaLogIndex) + .startTransaction(testFlintIndex, testDataSourceName) .initialLog(latest => { latest.state shouldBe ACTIVE true @@ -79,7 +79,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should exit if initial log entry doesn't meet precondition") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testMetaLogIndex) + .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => false) .transientLog(latest => latest) .finalLog(latest => latest) @@ -90,7 +90,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if initial log entry updated by others when updating transient log entry") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testMetaLogIndex) + .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => true) .transientLog(latest => { // This update will happen first and thus cause version conflict as expected @@ -106,7 +106,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if transient log entry updated by others when updating final log entry") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testMetaLogIndex) + .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => true) .transientLog(latest => {