From 76a9c577076da8c4e07a4438e1a5c191560a0dd3 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 11 Jun 2024 09:26:15 -0700 Subject: [PATCH] Remove dataSourceName from args; read from options (#378) Signed-off-by: Sean Kao --- .../opensearch/flint/core/FlintClient.java | 7 ++--- .../core/storage/FlintOpenSearchClient.java | 15 +++++----- .../opensearch/flint/spark/FlintSpark.scala | 25 ++++++---------- .../flint/spark/FlintSparkIndexMonitor.scala | 12 ++------ .../core/FlintOpenSearchClientSuite.scala | 7 +++-- .../flint/core/FlintTransactionITSuite.scala | 30 +++++++++---------- 6 files changed, 42 insertions(+), 54 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index ee78aa512..debf71a11 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -22,22 +22,19 @@ public interface FlintClient { * Start a new optimistic transaction. * * @param indexName index name - * @param dataSourceName TODO: read from elsewhere in future * @return transaction handle */ - OptimisticTransaction startTransaction(String indexName, String dataSourceName); + OptimisticTransaction startTransaction(String indexName); /** * * Start a new optimistic transaction. * * @param indexName index name - * @param dataSourceName TODO: read from elsewhere in future * @param forceInit forceInit create empty translog if not exist. * @return transaction handle */ - OptimisticTransaction startTransaction(String indexName, String dataSourceName, - boolean forceInit); + OptimisticTransaction startTransaction(String indexName, boolean forceInit); /** * Create a Flint index with the metadata given. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index c1b884241..d8bc6765d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -91,16 +91,18 @@ public class FlintOpenSearchClient implements FlintClient { public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; private final FlintOptions options; + private final String dataSourceName; + private final String metaLogIndexName; public FlintOpenSearchClient(FlintOptions options) { this.options = options; + this.dataSourceName = options.getDataSourceName(); + this.metaLogIndexName = constructMetaLogIndexName(); } @Override - public OptimisticTransaction startTransaction( - String indexName, String dataSourceName, boolean forceInit) { + public OptimisticTransaction startTransaction(String indexName, boolean forceInit) { LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); - String metaLogIndexName = constructMetaLogIndexName(dataSourceName); try (IRestHighLevelClient client = createClient()) { if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); @@ -122,8 +124,8 @@ public OptimisticTransaction startTransaction( } @Override - public OptimisticTransaction startTransaction(String indexName, String dataSourceName) { - return startTransaction(indexName, dataSourceName, false); + public OptimisticTransaction startTransaction(String indexName) { + return startTransaction(indexName, false); } @Override @@ -274,7 +276,6 @@ public IRestHighLevelClient createClient() { final AtomicReference metadataAccessAWSCredentialsProvider = new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); - String metaLogIndexName = constructMetaLogIndexName(options.getDataSourceName()); String systemIndexName = Strings.isNullOrEmpty(options.getSystemIndexName()) ? metaLogIndexName : options.getSystemIndexName(); if (Strings.isNullOrEmpty(metadataAccessProviderClass)) { @@ -389,7 +390,7 @@ private String sanitizeIndexName(String indexName) { return toLowercase(encoded); } - private String constructMetaLogIndexName(String dataSourceName) { + private String constructMetaLogIndexName() { return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 848bbe61f..ae8a9c064 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -46,16 +46,9 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer - /** - * Data source name. Assign empty string in case of backward compatibility. TODO: remove this in - * future - */ - private val dataSourceName: String = - spark.conf.getOption("spark.flint.datasource.name").getOrElse("") - /** Flint Spark index monitor */ val flintIndexMonitor: FlintSparkIndexMonitor = - new FlintSparkIndexMonitor(spark, flintClient, dataSourceName) + new FlintSparkIndexMonitor(spark, flintClient) /** * Create index builder for creating index with fluent API. @@ -106,7 +99,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val metadata = index.metadata() try { flintClient - .startTransaction(indexName, dataSourceName, true) + .startTransaction(indexName, true) .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -142,7 +135,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { try { flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == ACTIVE) .transientLog(latest => latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) @@ -249,7 +242,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { if (flintClient.exists(indexName)) { try { flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED) .transientLog(latest => latest.copy(state = DELETING)) @@ -284,7 +277,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { if (flintClient.exists(indexName)) { try { flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == DELETED) .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) @@ -315,7 +308,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { if (index.exists(_.options.autoRefresh())) { try { flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) .transientLog(latest => latest.copy(state = RECOVERING, createTime = System.currentTimeMillis())) @@ -346,7 +339,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { */ logWarning("Cleaning up metadata log as index data has been deleted") flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(_ => true) .finalLog(_ => NO_LOG_ENTRY) .commit(_ => {}) @@ -436,7 +429,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val indexName = index.name val indexLogEntry = index.latestLogEntry.get flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => latest.copy(state = UPDATING)) @@ -455,7 +448,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val indexLogEntry = index.latestLogEntry.get val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 594f99b02..2ca527f1e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -32,14 +32,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor * Spark session * @param flintClient * Flint client - * @param dataSourceName - * data source name */ -class FlintSparkIndexMonitor( - spark: SparkSession, - flintClient: FlintClient, - dataSourceName: String) - extends Logging { +class FlintSparkIndexMonitor(spark: SparkSession, flintClient: FlintClient) extends Logging { /** Task execution initial delay in seconds */ private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds() @@ -160,7 +154,7 @@ class FlintSparkIndexMonitor( if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING) .finalLog(latest => latest) // timestamp will update automatically .commit(_ => {}) @@ -212,7 +206,7 @@ class FlintSparkIndexMonitor( logInfo(s"Updating index state to failed for $indexName") retry { flintClient - .startTransaction(indexName, dataSourceName) + .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING) .finalLog(latest => latest.copy(state = FAILED)) .commit(_ => {}) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 6eab292e2..f2d1a1b60 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE} +import org.apache.spark.sql.flint.config.FlintSparkConf.{DATA_SOURCE_NAME, REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE} class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { @@ -31,8 +31,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M behavior of "Flint OpenSearch client" it should "throw IllegalStateException if metadata log index doesn't exists" in { + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") + val flintClient = FlintClientBuilder.build(new FlintOptions(options.asJava)) + the[IllegalStateException] thrownBy { - flintClient.startTransaction("test", "non-exist-index") + flintClient.startTransaction("test") } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 7dc5c695c..a34f0c7a5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -37,7 +37,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("empty metadata log entry content") { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => { latest.id shouldBe testLatestId latest.state shouldBe EMPTY @@ -102,7 +102,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { error = "")) flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => { latest.id shouldBe testLatestId latest.createTime shouldBe testCreateTime @@ -126,7 +126,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should transit from initial to final log if initial log is empty") { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => { latest.state shouldBe EMPTY true @@ -140,7 +140,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should transit from initial to final log directly if no transient log") { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => true) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(_ => latestLogEntry(testLatestId) should contain("state" -> "empty")) @@ -162,7 +162,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { error = "")) flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => { latest.state shouldBe ACTIVE true @@ -177,7 +177,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should exit if initial log entry doesn't meet precondition") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => false) .transientLog(latest => latest.copy(state = ACTIVE)) .finalLog(latest => latest) @@ -191,7 +191,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if initial log entry updated by others when updating transient log entry") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => { // This update will happen first and thus cause version conflict as expected @@ -207,7 +207,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if transient log entry updated by others when updating final log entry") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => { @@ -225,7 +225,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { // Use create index scenario in this test case the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -250,7 +250,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => latest.copy(state = REFRESHING)) .finalLog(_ => throw new RuntimeException("Mock final log error")) @@ -266,7 +266,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { // Use create index scenario in this test case the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(_ => true) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(_ => throw new RuntimeException("Mock operation error")) @@ -279,7 +279,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("forceInit translog, even index is deleted before startTransaction") { deleteIndex(testMetaLogIndex) flintClient - .startTransaction(testFlintIndex, testDataSourceName, true) + .startTransaction(testFlintIndex, true) .initialLog(latest => { latest.id shouldBe testLatestId latest.state shouldBe EMPTY @@ -299,7 +299,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if index is deleted before initial operation") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => { deleteIndex(testMetaLogIndex) true @@ -313,7 +313,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if index is deleted before transient operation") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => true) .transientLog(latest => { deleteIndex(testMetaLogIndex) @@ -327,7 +327,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if index is deleted before final operation") { the[IllegalStateException] thrownBy { flintClient - .startTransaction(testFlintIndex, testDataSourceName) + .startTransaction(testFlintIndex) .initialLog(latest => true) .transientLog(latest => { latest.copy(state = CREATING) }) .finalLog(latest => {