diff --git a/build.sbt b/build.sbt index 9d419decb..b04aad2ee 100644 --- a/build.sbt +++ b/build.sbt @@ -54,6 +54,7 @@ lazy val root = (project in file(".")) lazy val flintCore = (project in file("flint-core")) .disablePlugins(AssemblyPlugin) + .dependsOn(flintCommons) .settings( commonSettings, name := "flint-core", diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java similarity index 92% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java index bbbfd86b2..8cd9bd6ea 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log; +package org.opensearch.flint.common.metadata.log; import java.util.Optional; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala similarity index 89% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index 5f229d412..f9ae8297d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -3,11 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log +package org.opensearch.flint.common.metadata.log -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState -import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState /** * Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move @@ -161,14 +160,4 @@ object FlintMetadataLogEntry { | "number_of_replicas": "0" | } |}""".stripMargin - - def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry = - FlintMetadataLogEntry( - "", - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - 0L, - IndexState.FAILED, - dataSourceName, - error) } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java similarity index 81% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java index a356a456f..352f4ecce 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java @@ -3,13 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log; +package org.opensearch.flint.common.metadata.log; import java.util.Optional; /** * Flint metadata log service provides API for metadata log related operations on a Flint index * regardless of underlying storage. + *

+ * Custom implementations of this interface are expected to provide a public constructor with + * the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by + * the FlintMetadataLogServiceBuilder. */ public interface FlintMetadataLogService { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/OptimisticTransaction.java similarity index 96% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/OptimisticTransaction.java index d1992959c..29d3b6135 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/OptimisticTransaction.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata.log; +package org.opensearch.flint.common.metadata.log; import java.util.function.Function; import java.util.function.Predicate; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 0cf643791..c49247f37 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -90,6 +90,8 @@ public class FlintOptions implements Serializable { public static final String DEFAULT_BATCH_BYTES = "1mb"; + public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass"; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -162,4 +164,8 @@ public int getBatchBytes() { return (int) org.apache.spark.network.util.JavaUtils .byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE); } + + public String getCustomFlintMetadataLogServiceClass() { + return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, ""); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index f62731643..f432af0d0 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -7,10 +7,10 @@ package org.opensearch.flint.core.metadata import java.util +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.core.metadata.FlintJsonHelper._ -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry /** * Flint metadata follows Flint index specification and defines metadata for a Flint index diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index f3ef364b3..1992e173d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -7,7 +7,7 @@ import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; -import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; +import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -16,6 +16,10 @@ import java.util.function.Predicate; import java.util.logging.Logger; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.common.metadata.log.OptimisticTransaction; + /** * Default optimistic transaction implementation that captures the basic workflow for * transaction support by optimistic locking. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java index 3e2556f57..9ec4ac2c4 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java @@ -5,14 +5,33 @@ package org.opensearch.flint.core.metadata.log; +import java.lang.reflect.Constructor; +import org.apache.spark.SparkConf; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService; /** * {@link FlintMetadataLogService} builder. + *

+ * Custom implementations of {@link FlintMetadataLogService} are expected to provide a public + * constructor with the signature {@code public MyCustomService(SparkConf sparkConf)} to be + * instantiated by this builder. */ public class FlintMetadataLogServiceBuilder { - public static FlintMetadataLogService build(FlintOptions options) { - return new FlintOpenSearchMetadataLogService(options); + public static FlintMetadataLogService build(FlintOptions options, SparkConf sparkConf) { + String className = options.getCustomFlintMetadataLogServiceClass(); + if (className.isEmpty()) { + return new FlintOpenSearchMetadataLogService(options); + } + + // Attempts to instantiate Flint metadata log service with sparkConf using reflection + try { + Class flintMetadataLogServiceClass = Class.forName(className); + Constructor constructor = flintMetadataLogServiceClass.getConstructor(SparkConf.class); + return (FlintMetadataLogService) constructor.newInstance(sparkConf); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate FlintMetadataLogService: " + className, e); + } } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 6aea13436..ebd719380 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -23,10 +23,10 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.flint.core.metadata.log.FlintMetadataLog; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index f04a3bc67..bd456d875 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -12,14 +12,14 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService; +import org.opensearch.flint.common.metadata.log.OptimisticTransaction; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; -import org.opensearch.flint.core.metadata.log.FlintMetadataLog; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction; /** * Flint metadata log service implementation for OpenSearch storage. diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index c6638c0b2..f2f680281 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -175,6 +175,11 @@ object FlintSparkConf { FlintConfig(s"spark.flint.datasource.name") .doc("data source name") .createOptional() + val CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = + FlintConfig(FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS) + .datasourceOption() + .doc("custom Flint metadata log service class") + .createOptional() val QUERY = FlintConfig("spark.flint.job.query") .doc("Flint query for batch and streaming job") @@ -274,6 +279,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable val optionsWithoutDefault = Seq( RETRYABLE_EXCEPTION_CLASS_NAMES, DATA_SOURCE_NAME, + CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, SESSION_ID, REQUEST_INDEX, METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index df7c92636..0ab24032d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -9,11 +9,12 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY +import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -45,8 +46,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) - private val flintMetadataLogService: FlintMetadataLogService = - FlintMetadataLogServiceBuilder.build(flintSparkConf.flintOptions()) + private val flintMetadataLogService: FlintMetadataLogService = { + FlintMetadataLogServiceBuilder.build( + flintSparkConf.flintOptions(), + spark.sparkContext.getConf) + } /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 702b1475e..34c2ae452 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 815dfa71a..343299a8c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -16,8 +16,8 @@ import scala.sys.addShutdownHook import dev.failsafe.{Failsafe, RetryPolicy} import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index 8c2620d0f..0234ec35a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.covering import java.util -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index edbab78b6..0fade2ee7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.covering import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index d7c6ddf81..48dfee50a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -10,8 +10,8 @@ import java.util.Locale import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.convert.ImplicitConversions.`map AsScala` +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 51d6cc802..8ce458055 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark.skipping import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index db56386f1..da73ea01e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.skipping import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index bef9118c7..5231bdfa6 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.covering import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.{Matcher, MatchResult} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index 38c91cf4a..c099a1a86 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -8,8 +8,8 @@ package org.opensearch.flint.spark.skipping import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index f37bb53f7..f18c4d3d1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -16,9 +16,9 @@ import org.opensearch.action.update.UpdateRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} import org.opensearch.common.xcontent.XContentType -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX import org.opensearch.flint.spark.FlintSparkSuite diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index f8a8c2164..877b5ce79 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -5,19 +5,22 @@ package org.opensearch.flint.core -import java.util.Base64 +import java.util.{Base64, Optional} import scala.collection.JavaConverters._ import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, OptimisticTransaction} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction +import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLog import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers -import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME +import org.apache.spark.SparkConf +import org.apache.spark.sql.flint.config.FlintSparkConf.{CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, DATA_SOURCE_NAME} class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { @@ -42,7 +45,24 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { flintMetadataLogService = new FlintOpenSearchMetadataLogService(flintOptions) } - test("should fail if metadata log index doesn't exists") { + test("should build metadata log service") { + val customOptions = + openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "org.opensearch.flint.core.TestMetadataLogService") + val customFlintOptions = new FlintOptions(customOptions.asJava) + val customFlintMetadataLogService = + FlintMetadataLogServiceBuilder.build(customFlintOptions, sparkConf) + customFlintMetadataLogService shouldBe a[TestMetadataLogService] + } + + test("should fail to build metadata log service if class name doesn't exist") { + val options = openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "dummy") + val flintOptions = new FlintOptions(options.asJava) + the[RuntimeException] thrownBy { + FlintMetadataLogServiceBuilder.build(flintOptions, sparkConf) + } + } + + test("should fail to start transaction if metadata log index doesn't exists") { val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") val flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) @@ -98,3 +118,24 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } } } + +case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLogService { + override def startTransaction[T]( + indexName: String, + forceInit: Boolean): OptimisticTransaction[T] = { + val flintOptions = new FlintOptions(Map[String, String]().asJava) + val metadataLog = new FlintOpenSearchMetadataLog(flintOptions, "", "") + new DefaultOptimisticTransaction("", metadataLog) + } + + override def startTransaction[T](indexName: String): OptimisticTransaction[T] = { + startTransaction(indexName, false) + } + + override def getIndexMetadataLog( + indexName: String): Optional[FlintMetadataLog[FlintMetadataLogEntry]] = { + Optional.empty() + } + + override def recordHeartbeat(indexName: String): Unit = {} +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 6da232389..d0bb7fa81 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -12,9 +12,8 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.{Formats, NoTypeHints} import org.json4s.native.{JsonMethods, Serialization} import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.{FlintMetadataLogEntry, FlintMetadataLogService} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 7b9624045..0525a2896 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -10,8 +10,8 @@ import java.util.Base64 import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers