Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom metadata log service implementation #389

Merged
merged 8 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ lazy val root = (project in file("."))

lazy val flintCore = (project in file("flint-core"))
.disablePlugins(AssemblyPlugin)
.dependsOn(flintCommons)
.settings(
commonSettings,
name := "flint-core",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log
package org.opensearch.flint.common.metadata.log

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
Expand Down Expand Up @@ -161,14 +160,4 @@ object FlintMetadataLogEntry {
| "number_of_replicas": "0"
| }
|}""".stripMargin

def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry =
FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState.FAILED,
dataSourceName,
error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.function.Function;
import java.util.function.Predicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_BATCH_BYTES = "1mb";

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -162,4 +164,8 @@ public int getBatchBytes() {
return (int) org.apache.spark.network.util.JavaUtils
.byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE);
}

public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ package org.opensearch.flint.core.metadata

import java.util

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand All @@ -16,6 +16,10 @@
import java.util.function.Predicate;
import java.util.logging.Logger;

import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.common.metadata.log.OptimisticTransaction;

/**
* Default optimistic transaction implementation that captures the basic workflow for
* transaction support by optimistic locking.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,29 @@

package org.opensearch.flint.core.metadata.log;

import java.lang.reflect.Constructor;
import org.apache.spark.SparkConf;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService;

/**
* {@link FlintMetadataLogService} builder.
*/
public class FlintMetadataLogServiceBuilder {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should FlintMetadataLogServiceBuilder moved to common as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this needs access to FlintOpenSearchMetadataLogService

public static FlintMetadataLogService build(FlintOptions options) {
return new FlintOpenSearchMetadataLogService(options);
public static FlintMetadataLogService build(FlintOptions options, SparkConf sparkConf) {
String className = options.getCustomFlintMetadataLogServiceClass();
if (className.isEmpty()) {
return new FlintOpenSearchMetadataLogService(options);
}

// Attempts to instantiate Flint metadata log service with sparkConf using reflection
try {
Class<?> flintMetadataLogServiceClass = Class.forName(className);
Constructor<?> constructor = flintMetadataLogServiceClass.getConstructor(SparkConf.class);
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
return (FlintMetadataLogService) constructor.newInstance(sparkConf);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate FlintMetadataLogService: " + className, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.common.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;

/**
* Flint metadata log service implementation for OpenSearch storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ object FlintSparkConf {
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS =
FlintConfig(FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS)
.datasourceOption()
.doc("custom Flint metadata log service class")
.createOptional()
val QUERY =
FlintConfig("spark.flint.job.query")
.doc("Flint query for batch and streaming job")
Expand Down Expand Up @@ -274,6 +279,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
val optionsWithoutDefault = Seq(
RETRYABLE_EXCEPTION_CLASS_NAMES,
DATA_SOURCE_NAME,
CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS,
SESSION_ID,
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import scala.collection.JavaConverters._

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -45,8 +46,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintMetadataLogService: FlintMetadataLogService =
FlintMetadataLogServiceBuilder.build(flintSparkConf.flintOptions())
private val flintMetadataLogService: FlintMetadataLogService = {
FlintMetadataLogServiceBuilder.build(
flintSparkConf.flintOptions(),
spark.sparkContext.getConf)
}

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package org.opensearch.flint.spark

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.datatype.FlintDataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import scala.sys.addShutdownHook
import dev.failsafe.{Failsafe, RetryPolicy}
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedRunnable
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.covering

import java.util

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.covering

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import java.util.Locale
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.skipping

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex._
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.covering

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.{Matcher, MatchResult}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package org.opensearch.flint.spark.skipping
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import org.opensearch.action.update.UpdateRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest}
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX
import org.opensearch.flint.spark.FlintSparkSuite

Expand Down
Loading
Loading