From 538b5d0b8c3c05bd2777b4fac73402b44054791c Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 19 Jun 2024 13:55:59 -0700 Subject: [PATCH 1/8] reflection to build metadata log service Signed-off-by: Sean Kao --- .../opensearch/flint/core/FlintOptions.java | 6 +++ .../log/FlintMetadataLogServiceBuilder.java | 18 ++++++- .../sql/flint/config/FlintSparkConf.scala | 6 +++ .../opensearch/flint/spark/FlintSpark.scala | 7 ++- .../flint/core/FlintMetadataLogITSuite.scala | 49 +++++++++++++++++-- 5 files changed, 79 insertions(+), 7 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 0cf643791..c49247f37 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -90,6 +90,8 @@ public class FlintOptions implements Serializable { public static final String DEFAULT_BATCH_BYTES = "1mb"; + public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass"; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -162,4 +164,8 @@ public int getBatchBytes() { return (int) org.apache.spark.network.util.JavaUtils .byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE); } + + public String getCustomFlintMetadataLogServiceClass() { + return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, ""); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java index 3e2556f57..00dc657cb 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java @@ -5,6 +5,8 @@ package org.opensearch.flint.core.metadata.log; +import java.lang.reflect.Constructor; +import org.apache.spark.SparkConf; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService; @@ -12,7 +14,19 @@ * {@link FlintMetadataLogService} builder. */ public class FlintMetadataLogServiceBuilder { - public static FlintMetadataLogService build(FlintOptions options) { - return new FlintOpenSearchMetadataLogService(options); + public static FlintMetadataLogService build(FlintOptions options, SparkConf sparkConf) { + String className = options.getCustomFlintMetadataLogServiceClass(); + if (className.isEmpty()) { + return new FlintOpenSearchMetadataLogService(options); + } + + // Attempts to instantiate Flint metadata log service with sparkConf using reflection + try { + Class flintMetadataLogServiceClass = Class.forName(className); + Constructor constructor = flintMetadataLogServiceClass.getConstructor(SparkConf.class); + return (FlintMetadataLogService) constructor.newInstance(sparkConf); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate FlintMetadataLogService: " + className, e); + } } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index c6638c0b2..f2f680281 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -175,6 +175,11 @@ object FlintSparkConf { FlintConfig(s"spark.flint.datasource.name") .doc("data source name") .createOptional() + val CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = + FlintConfig(FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS) + .datasourceOption() + .doc("custom Flint metadata log service class") + .createOptional() val QUERY = FlintConfig("spark.flint.job.query") .doc("Flint query for batch and streaming job") @@ -274,6 +279,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable val optionsWithoutDefault = Seq( RETRYABLE_EXCEPTION_CLASS_NAMES, DATA_SOURCE_NAME, + CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, SESSION_ID, REQUEST_INDEX, METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index df7c92636..9c108023a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -45,8 +45,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) - private val flintMetadataLogService: FlintMetadataLogService = - FlintMetadataLogServiceBuilder.build(flintSparkConf.flintOptions()) + private val flintMetadataLogService: FlintMetadataLogService = { + FlintMetadataLogServiceBuilder.build( + flintSparkConf.flintOptions(), + spark.sparkContext.getConf) + } /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index f8a8c2164..81645428a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -5,19 +5,25 @@ package org.opensearch.flint.core -import java.util.Base64 +import java.util.{Base64, Optional} import scala.collection.JavaConverters._ import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction +import org.opensearch.flint.core.metadata.log.FlintMetadataLog import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder +import org.opensearch.flint.core.metadata.log.OptimisticTransaction +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLog import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers -import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME +import org.apache.spark.SparkConf +import org.apache.spark.sql.flint.config.FlintSparkConf.{CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, DATA_SOURCE_NAME} class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { @@ -42,7 +48,23 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { flintMetadataLogService = new FlintOpenSearchMetadataLogService(flintOptions) } - test("should fail if metadata log index doesn't exists") { + test("should build metadata log service") { + val options = + openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "org.opensearch.flint.core.TestMetadataLogService") + val flintOptions = new FlintOptions(options.asJava) + flintMetadataLogService = FlintMetadataLogServiceBuilder.build(flintOptions, sparkConf) + flintMetadataLogService shouldBe a[TestMetadataLogService] + } + + test("should fail to build metadata log service if class name doesn't exist") { + val options = openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "dummy") + val flintOptions = new FlintOptions(options.asJava) + the[RuntimeException] thrownBy { + FlintMetadataLogServiceBuilder.build(flintOptions, sparkConf) + } + } + + test("should fail to start transaction if metadata log index doesn't exists") { val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") val flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) @@ -98,3 +120,24 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } } } + +case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLogService { + override def startTransaction[T]( + indexName: String, + forceInit: Boolean): OptimisticTransaction[T] = { + new DefaultOptimisticTransaction( + "", + new FlintOpenSearchMetadataLog(new FlintOptions(Map[String, String]().asJava), "", "")) + } + + override def startTransaction[T](indexName: String): OptimisticTransaction[T] = { + startTransaction(indexName, false) + } + + override def getIndexMetadataLog( + indexName: String): Optional[FlintMetadataLog[FlintMetadataLogEntry]] = { + Optional.empty() + } + + override def recordHeartbeat(indexName: String): Unit = {} +} From b9ce3ee486d06f695150e1677c9356174d2d9685 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 19 Jun 2024 18:13:52 -0700 Subject: [PATCH 2/8] move common interface to flint-commons Signed-off-by: Sean Kao --- build.sbt | 1 + .../flint/common}/metadata/log/FlintMetadataLog.java | 2 +- .../common}/metadata/log/FlintMetadataLogEntry.scala | 7 +++---- .../common}/metadata/log/FlintMetadataLogService.java | 2 +- .../common}/metadata/log/OptimisticTransaction.java | 2 +- .../opensearch/flint/core/metadata/FlintMetadata.scala | 2 +- .../metadata/log/DefaultOptimisticTransaction.java | 6 +++++- .../metadata/log/FlintMetadataLogServiceBuilder.java | 1 + .../flint/core/storage/FlintOpenSearchMetadataLog.java | 4 ++-- .../storage/FlintOpenSearchMetadataLogService.java | 10 +++++----- .../scala/org/opensearch/flint/spark/FlintSpark.scala | 7 ++++--- .../org/opensearch/flint/spark/FlintSparkIndex.scala | 2 +- .../flint/spark/FlintSparkIndexMonitor.scala | 4 ++-- .../spark/covering/ApplyFlintSparkCoveringIndex.scala | 2 +- .../flint/spark/covering/FlintSparkCoveringIndex.scala | 2 +- .../flint/spark/mv/FlintSparkMaterializedView.scala | 2 +- .../spark/skipping/ApplyFlintSparkSkippingIndex.scala | 2 +- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 2 +- .../covering/ApplyFlintSparkCoveringIndexSuite.scala | 4 ++-- .../skipping/ApplyFlintSparkSkippingIndexSuite.scala | 4 ++-- .../opensearch/flint/OpenSearchTransactionSuite.scala | 6 +++--- .../flint/core/FlintMetadataLogITSuite.scala | 7 ++----- .../flint/core/FlintTransactionITSuite.scala | 5 ++--- .../flint/spark/FlintSparkIndexJobITSuite.scala | 4 ++-- 24 files changed, 46 insertions(+), 44 deletions(-) rename {flint-core/src/main/scala/org/opensearch/flint/core => flint-commons/src/main/scala/org/opensearch/flint/common}/metadata/log/FlintMetadataLog.java (92%) rename {flint-core/src/main/scala/org/opensearch/flint/core => flint-commons/src/main/scala/org/opensearch/flint/common}/metadata/log/FlintMetadataLogEntry.scala (93%) rename {flint-core/src/main/scala/org/opensearch/flint/core => flint-commons/src/main/scala/org/opensearch/flint/common}/metadata/log/FlintMetadataLogService.java (96%) rename {flint-core/src/main/scala/org/opensearch/flint/core => flint-commons/src/main/scala/org/opensearch/flint/common}/metadata/log/OptimisticTransaction.java (96%) diff --git a/build.sbt b/build.sbt index 9d419decb..b04aad2ee 100644 --- a/build.sbt +++ b/build.sbt @@ -54,6 +54,7 @@ lazy val root = (project in file(".")) lazy val flintCore = (project in file("flint-core")) .disablePlugins(AssemblyPlugin) + .dependsOn(flintCommons) .settings( commonSettings, name := "flint-core", diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java similarity index 92% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java index bbbfd86b2..8cd9bd6ea 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log; +package org.opensearch.flint.common.metadata.log; import java.util.Optional; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala similarity index 93% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index 5f229d412..21ed7aac6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -3,11 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log +package org.opensearch.flint.common.metadata.log -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState -import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState /** * Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java similarity index 96% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java index a356a456f..4cb1305d2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log; +package org.opensearch.flint.common.metadata.log; import java.util.Optional; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/OptimisticTransaction.java similarity index 96% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/OptimisticTransaction.java index d1992959c..29d3b6135 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/OptimisticTransaction.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log; +package org.opensearch.flint.common.metadata.log; import java.util.function.Function; import java.util.function.Predicate; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index f62731643..f432af0d0 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -7,10 +7,10 @@ package org.opensearch.flint.core.metadata import java.util +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.core.metadata.FlintJsonHelper._ -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry /** * Flint metadata follows Flint index specification and defines metadata for a Flint index diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index f3ef364b3..1992e173d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -7,7 +7,7 @@ import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; -import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; +import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -16,6 +16,10 @@ import java.util.function.Predicate; import java.util.logging.Logger; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.common.metadata.log.OptimisticTransaction; + /** * Default optimistic transaction implementation that captures the basic workflow for * transaction support by optimistic locking. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java index 00dc657cb..51d049050 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java @@ -7,6 +7,7 @@ import java.lang.reflect.Constructor; import org.apache.spark.SparkConf; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 6aea13436..ebd719380 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -23,10 +23,10 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.flint.core.metadata.log.FlintMetadataLog; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index f04a3bc67..bd456d875 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -12,14 +12,14 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService; +import org.opensearch.flint.common.metadata.log.OptimisticTransaction; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; -import org.opensearch.flint.core.metadata.log.FlintMetadataLog; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction; /** * Flint metadata log service implementation for OpenSearch storage. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9c108023a..0ab24032d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -9,11 +9,12 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY +import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 702b1475e..34c2ae452 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 815dfa71a..343299a8c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -16,8 +16,8 @@ import scala.sys.addShutdownHook import dev.failsafe.{Failsafe, RetryPolicy} import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index 8c2620d0f..0234ec35a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.covering import java.util -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index edbab78b6..0fade2ee7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.covering import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index d7c6ddf81..48dfee50a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -10,8 +10,8 @@ import java.util.Locale import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.convert.ImplicitConversions.`map AsScala` +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 51d6cc802..8ce458055 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark.skipping import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index db56386f1..da73ea01e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.skipping import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index bef9118c7..5231bdfa6 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.covering import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.{Matcher, MatchResult} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index 38c91cf4a..c099a1a86 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -8,8 +8,8 @@ package org.opensearch.flint.spark.skipping import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index f37bb53f7..f18c4d3d1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -16,9 +16,9 @@ import org.opensearch.action.update.UpdateRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} import org.opensearch.common.xcontent.XContentType -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX import org.opensearch.flint.spark.FlintSparkSuite diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 81645428a..0adb45393 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -10,13 +10,10 @@ import java.util.{Base64, Optional} import scala.collection.JavaConverters._ import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, OptimisticTransaction} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction -import org.opensearch.flint.core.metadata.log.FlintMetadataLog -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder -import org.opensearch.flint.core.metadata.log.OptimisticTransaction import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLog import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 6da232389..d0bb7fa81 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -12,9 +12,8 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.{Formats, NoTypeHints} import org.json4s.native.{JsonMethods, Serialization} import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.{FlintMetadataLogEntry, FlintMetadataLogService} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 7b9624045..0525a2896 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -10,8 +10,8 @@ import java.util.Base64 import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers From 23b0b07beea8522f9ac80dd028f7ffec884f7384 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 19 Jun 2024 18:36:52 -0700 Subject: [PATCH 3/8] remove unused failLogEntry Signed-off-by: Sean Kao --- .../common/metadata/log/FlintMetadataLogEntry.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index 21ed7aac6..f9ae8297d 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -160,14 +160,4 @@ object FlintMetadataLogEntry { | "number_of_replicas": "0" | } |}""".stripMargin - - def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry = - FlintMetadataLogEntry( - "", - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - 0L, - IndexState.FAILED, - dataSourceName, - error) } From 3929ca42b30022b8e2f5657d7defe293eac19e6e Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 19 Jun 2024 21:06:15 -0700 Subject: [PATCH 4/8] fix test Signed-off-by: Sean Kao --- .../flint/core/FlintMetadataLogITSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 0adb45393..444bb14ed 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -46,11 +46,11 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } test("should build metadata log service") { - val options = + val customOptions = openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "org.opensearch.flint.core.TestMetadataLogService") - val flintOptions = new FlintOptions(options.asJava) - flintMetadataLogService = FlintMetadataLogServiceBuilder.build(flintOptions, sparkConf) - flintMetadataLogService shouldBe a[TestMetadataLogService] + val customFlintOptions = new FlintOptions(customOptions.asJava) + val customFlintMetadataLogService = FlintMetadataLogServiceBuilder.build(customFlintOptions, sparkConf) + customFlintMetadataLogService shouldBe a[TestMetadataLogService] } test("should fail to build metadata log service if class name doesn't exist") { @@ -122,9 +122,9 @@ case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLog override def startTransaction[T]( indexName: String, forceInit: Boolean): OptimisticTransaction[T] = { - new DefaultOptimisticTransaction( - "", - new FlintOpenSearchMetadataLog(new FlintOptions(Map[String, String]().asJava), "", "")) + val flintOptions = new FlintOptions(Map[String, String]().asJava) + val metadataLog = new FlintOpenSearchMetadataLog(flintOptions, "", "") + new DefaultOptimisticTransaction("", metadataLog) } override def startTransaction[T](indexName: String): OptimisticTransaction[T] = { From 8929ddbf4e6398c99cbe48fc22a519faa66c1d7e Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 19 Jun 2024 22:35:00 -0700 Subject: [PATCH 5/8] scalafmtAll Signed-off-by: Sean Kao --- .../org/opensearch/flint/core/FlintMetadataLogITSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 444bb14ed..877b5ce79 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -49,7 +49,8 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { val customOptions = openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "org.opensearch.flint.core.TestMetadataLogService") val customFlintOptions = new FlintOptions(customOptions.asJava) - val customFlintMetadataLogService = FlintMetadataLogServiceBuilder.build(customFlintOptions, sparkConf) + val customFlintMetadataLogService = + FlintMetadataLogServiceBuilder.build(customFlintOptions, sparkConf) customFlintMetadataLogService shouldBe a[TestMetadataLogService] } From 95b7a9d59de0effaa68f67bf35f562477c33d671 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 21 Jun 2024 15:01:22 -0700 Subject: [PATCH 6/8] change service from interface to abstract class Signed-off-by: Sean Kao --- .../metadata/log/FlintMetadataLogService.java | 29 +++++++++++++++---- .../FlintOpenSearchMetadataLogService.java | 2 +- .../flint/core/FlintMetadataLogITSuite.scala | 3 +- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java index 4cb1305d2..0e525aaf8 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java @@ -6,12 +6,31 @@ package org.opensearch.flint.common.metadata.log; import java.util.Optional; +import org.apache.spark.SparkConf; /** * Flint metadata log service provides API for metadata log related operations on a Flint index * regardless of underlying storage. */ -public interface FlintMetadataLogService { +public abstract class FlintMetadataLogService { + + protected final SparkConf sparkConf; + + /** + * Constructor. + * + * @param sparkConf spark configuration + */ + public FlintMetadataLogService(SparkConf sparkConf) { + this.sparkConf = sparkConf; + } + + /** + * Default constructor. + */ + public FlintMetadataLogService() { + this(null); + } /** * Start a new optimistic transaction. @@ -20,7 +39,7 @@ public interface FlintMetadataLogService { * @param forceInit force init transaction and create empty metadata log if not exist * @return transaction handle */ - OptimisticTransaction startTransaction(String indexName, boolean forceInit); + public abstract OptimisticTransaction startTransaction(String indexName, boolean forceInit); /** * Start a new optimistic transaction. @@ -28,7 +47,7 @@ public interface FlintMetadataLogService { * @param indexName index name * @return transaction handle */ - default OptimisticTransaction startTransaction(String indexName) { + public OptimisticTransaction startTransaction(String indexName) { return startTransaction(indexName, false); } @@ -38,12 +57,12 @@ default OptimisticTransaction startTransaction(String indexName) { * @param indexName index name * @return optional metadata log */ - Optional> getIndexMetadataLog(String indexName); + public abstract Optional> getIndexMetadataLog(String indexName); /** * Record heartbeat timestamp for index streaming job. * * @param indexName index name */ - void recordHeartbeat(String indexName); + public abstract void recordHeartbeat(String indexName); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index bd456d875..18be8efbf 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -24,7 +24,7 @@ /** * Flint metadata log service implementation for OpenSearch storage. */ -public class FlintOpenSearchMetadataLogService implements FlintMetadataLogService { +public class FlintOpenSearchMetadataLogService extends FlintMetadataLogService { private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLogService.class.getName()); diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 877b5ce79..968244734 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -119,7 +119,8 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } } -case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLogService { +case class TestMetadataLogService(sparkConfParam: SparkConf) + extends FlintMetadataLogService(sparkConfParam) { override def startTransaction[T]( indexName: String, forceInit: Boolean): OptimisticTransaction[T] = { From 84c483a05114d68a55327dacddba41c4dbcceef7 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Sat, 22 Jun 2024 10:40:22 -0700 Subject: [PATCH 7/8] Revert "change service to abstract class" This reverts commit 95b7a9d59de0effaa68f67bf35f562477c33d671. Signed-off-by: Sean Kao --- .../metadata/log/FlintMetadataLogService.java | 29 ++++--------------- .../FlintOpenSearchMetadataLogService.java | 2 +- .../flint/core/FlintMetadataLogITSuite.scala | 3 +- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java index 0e525aaf8..4cb1305d2 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java @@ -6,31 +6,12 @@ package org.opensearch.flint.common.metadata.log; import java.util.Optional; -import org.apache.spark.SparkConf; /** * Flint metadata log service provides API for metadata log related operations on a Flint index * regardless of underlying storage. */ -public abstract class FlintMetadataLogService { - - protected final SparkConf sparkConf; - - /** - * Constructor. - * - * @param sparkConf spark configuration - */ - public FlintMetadataLogService(SparkConf sparkConf) { - this.sparkConf = sparkConf; - } - - /** - * Default constructor. - */ - public FlintMetadataLogService() { - this(null); - } +public interface FlintMetadataLogService { /** * Start a new optimistic transaction. @@ -39,7 +20,7 @@ public FlintMetadataLogService() { * @param forceInit force init transaction and create empty metadata log if not exist * @return transaction handle */ - public abstract OptimisticTransaction startTransaction(String indexName, boolean forceInit); + OptimisticTransaction startTransaction(String indexName, boolean forceInit); /** * Start a new optimistic transaction. @@ -47,7 +28,7 @@ public FlintMetadataLogService() { * @param indexName index name * @return transaction handle */ - public OptimisticTransaction startTransaction(String indexName) { + default OptimisticTransaction startTransaction(String indexName) { return startTransaction(indexName, false); } @@ -57,12 +38,12 @@ public OptimisticTransaction startTransaction(String indexName) { * @param indexName index name * @return optional metadata log */ - public abstract Optional> getIndexMetadataLog(String indexName); + Optional> getIndexMetadataLog(String indexName); /** * Record heartbeat timestamp for index streaming job. * * @param indexName index name */ - public abstract void recordHeartbeat(String indexName); + void recordHeartbeat(String indexName); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index 18be8efbf..bd456d875 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -24,7 +24,7 @@ /** * Flint metadata log service implementation for OpenSearch storage. */ -public class FlintOpenSearchMetadataLogService extends FlintMetadataLogService { +public class FlintOpenSearchMetadataLogService implements FlintMetadataLogService { private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLogService.class.getName()); diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 968244734..877b5ce79 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -119,8 +119,7 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } } -case class TestMetadataLogService(sparkConfParam: SparkConf) - extends FlintMetadataLogService(sparkConfParam) { +case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLogService { override def startTransaction[T]( indexName: String, forceInit: Boolean): OptimisticTransaction[T] = { From 68f68444a99a0808338780091d0e00e9f1ab6d06 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Sat, 22 Jun 2024 10:58:00 -0700 Subject: [PATCH 8/8] add javadoc for metadata log service constructor Signed-off-by: Sean Kao --- .../flint/common/metadata/log/FlintMetadataLogService.java | 4 ++++ .../core/metadata/log/FlintMetadataLogServiceBuilder.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java index 4cb1305d2..352f4ecce 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java @@ -10,6 +10,10 @@ /** * Flint metadata log service provides API for metadata log related operations on a Flint index * regardless of underlying storage. + *

+ * Custom implementations of this interface are expected to provide a public constructor with + * the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by + * the FlintMetadataLogServiceBuilder. */ public interface FlintMetadataLogService { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java index 51d049050..9ec4ac2c4 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java @@ -13,6 +13,10 @@ /** * {@link FlintMetadataLogService} builder. + *

+ * Custom implementations of {@link FlintMetadataLogService} are expected to provide a public + * constructor with the signature {@code public MyCustomService(SparkConf sparkConf)} to be + * instantiated by this builder. */ public class FlintMetadataLogServiceBuilder { public static FlintMetadataLogService build(FlintOptions options, SparkConf sparkConf) {