diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java index 4cb1305d2..0e525aaf8 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogService.java @@ -6,12 +6,31 @@ package org.opensearch.flint.common.metadata.log; import java.util.Optional; +import org.apache.spark.SparkConf; /** * Flint metadata log service provides API for metadata log related operations on a Flint index * regardless of underlying storage. */ -public interface FlintMetadataLogService { +public abstract class FlintMetadataLogService { + + protected final SparkConf sparkConf; + + /** + * Constructor. + * + * @param sparkConf spark configuration + */ + public FlintMetadataLogService(SparkConf sparkConf) { + this.sparkConf = sparkConf; + } + + /** + * Default constructor. + */ + public FlintMetadataLogService() { + this(null); + } /** * Start a new optimistic transaction. @@ -20,7 +39,7 @@ public interface FlintMetadataLogService { * @param forceInit force init transaction and create empty metadata log if not exist * @return transaction handle */ - OptimisticTransaction startTransaction(String indexName, boolean forceInit); + public abstract OptimisticTransaction startTransaction(String indexName, boolean forceInit); /** * Start a new optimistic transaction. @@ -28,7 +47,7 @@ public interface FlintMetadataLogService { * @param indexName index name * @return transaction handle */ - default OptimisticTransaction startTransaction(String indexName) { + public OptimisticTransaction startTransaction(String indexName) { return startTransaction(indexName, false); } @@ -38,12 +57,12 @@ default OptimisticTransaction startTransaction(String indexName) { * @param indexName index name * @return optional metadata log */ - Optional> getIndexMetadataLog(String indexName); + public abstract Optional> getIndexMetadataLog(String indexName); /** * Record heartbeat timestamp for index streaming job. * * @param indexName index name */ - void recordHeartbeat(String indexName); + public abstract void recordHeartbeat(String indexName); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index bd456d875..18be8efbf 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -24,7 +24,7 @@ /** * Flint metadata log service implementation for OpenSearch storage. */ -public class FlintOpenSearchMetadataLogService implements FlintMetadataLogService { +public class FlintOpenSearchMetadataLogService extends FlintMetadataLogService { private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLogService.class.getName()); diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 877b5ce79..82bea2882 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -119,7 +119,7 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } } -case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLogService { +case class TestMetadataLogService(sparkConfParam: SparkConf) extends FlintMetadataLogService(sparkConfParam) { override def startTransaction[T]( indexName: String, forceInit: Boolean): OptimisticTransaction[T] = {