Skip to content

Commit

Permalink
Support custom metadata log service implementation (#389)
Browse files Browse the repository at this point in the history
* reflection to build metadata log service

Signed-off-by: Sean Kao <[email protected]>

* move common interface to flint-commons

Signed-off-by: Sean Kao <[email protected]>

* remove unused failLogEntry

Signed-off-by: Sean Kao <[email protected]>

* fix test

Signed-off-by: Sean Kao <[email protected]>

* scalafmtAll

Signed-off-by: Sean Kao <[email protected]>

* change service from interface to abstract class

Signed-off-by: Sean Kao <[email protected]>

* Revert "change service to abstract class"

This reverts commit 95b7a9d.

Signed-off-by: Sean Kao <[email protected]>

* add javadoc for metadata log service constructor

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Jun 23, 2024
1 parent 85fec7e commit 0f53448
Show file tree
Hide file tree
Showing 26 changed files with 132 additions and 59 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ lazy val root = (project in file("."))

lazy val flintCore = (project in file("flint-core"))
.disablePlugins(AssemblyPlugin)
.dependsOn(flintCommons)
.settings(
commonSettings,
name := "flint-core",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log
package org.opensearch.flint.common.metadata.log

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
Expand Down Expand Up @@ -161,14 +160,4 @@ object FlintMetadataLogEntry {
| "number_of_replicas": "0"
| }
|}""".stripMargin

def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry =
FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState.FAILED,
dataSourceName,
error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.Optional;

/**
* Flint metadata log service provides API for metadata log related operations on a Flint index
* regardless of underlying storage.
* <p>
* Custom implementations of this interface are expected to provide a public constructor with
* the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by
* the FlintMetadataLogServiceBuilder.
*/
public interface FlintMetadataLogService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.function.Function;
import java.util.function.Predicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_BATCH_BYTES = "1mb";

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -162,4 +164,8 @@ public int getBatchBytes() {
return (int) org.apache.spark.network.util.JavaUtils
.byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE);
}

public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ package org.opensearch.flint.core.metadata

import java.util

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand All @@ -16,6 +16,10 @@
import java.util.function.Predicate;
import java.util.logging.Logger;

import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.common.metadata.log.OptimisticTransaction;

/**
* Default optimistic transaction implementation that captures the basic workflow for
* transaction support by optimistic locking.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,33 @@

package org.opensearch.flint.core.metadata.log;

import java.lang.reflect.Constructor;
import org.apache.spark.SparkConf;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService;

/**
* {@link FlintMetadataLogService} builder.
* <p>
* Custom implementations of {@link FlintMetadataLogService} are expected to provide a public
* constructor with the signature {@code public MyCustomService(SparkConf sparkConf)} to be
* instantiated by this builder.
*/
public class FlintMetadataLogServiceBuilder {
public static FlintMetadataLogService build(FlintOptions options) {
return new FlintOpenSearchMetadataLogService(options);
public static FlintMetadataLogService build(FlintOptions options, SparkConf sparkConf) {
String className = options.getCustomFlintMetadataLogServiceClass();
if (className.isEmpty()) {
return new FlintOpenSearchMetadataLogService(options);
}

// Attempts to instantiate Flint metadata log service with sparkConf using reflection
try {
Class<?> flintMetadataLogServiceClass = Class.forName(className);
Constructor<?> constructor = flintMetadataLogServiceClass.getConstructor(SparkConf.class);
return (FlintMetadataLogService) constructor.newInstance(sparkConf);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate FlintMetadataLogService: " + className, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.common.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;

/**
* Flint metadata log service implementation for OpenSearch storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ object FlintSparkConf {
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS =
FlintConfig(FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS)
.datasourceOption()
.doc("custom Flint metadata log service class")
.createOptional()
val QUERY =
FlintConfig("spark.flint.job.query")
.doc("Flint query for batch and streaming job")
Expand Down Expand Up @@ -274,6 +279,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
val optionsWithoutDefault = Seq(
RETRYABLE_EXCEPTION_CLASS_NAMES,
DATA_SOURCE_NAME,
CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS,
SESSION_ID,
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import scala.collection.JavaConverters._

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -45,8 +46,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintMetadataLogService: FlintMetadataLogService =
FlintMetadataLogServiceBuilder.build(flintSparkConf.flintOptions())
private val flintMetadataLogService: FlintMetadataLogService = {
FlintMetadataLogServiceBuilder.build(
flintSparkConf.flintOptions(),
spark.sparkContext.getConf)
}

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package org.opensearch.flint.spark

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.datatype.FlintDataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import scala.sys.addShutdownHook
import dev.failsafe.{Failsafe, RetryPolicy}
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedRunnable
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.covering

import java.util

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.covering

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import java.util.Locale
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.skipping

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex._
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.covering

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.{Matcher, MatchResult}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package org.opensearch.flint.spark.skipping
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import org.opensearch.action.update.UpdateRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest}
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX
import org.opensearch.flint.spark.FlintSparkSuite

Expand Down
Loading

0 comments on commit 0f53448

Please sign in to comment.