Skip to content

Commit

Permalink
Remove dataSourceName from args; read from options (#378)
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Jun 11, 2024
1 parent 5460acc commit 76a9c57
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,19 @@ public interface FlintClient {
* Start a new optimistic transaction.
*
* @param indexName index name
* @param dataSourceName TODO: read from elsewhere in future
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName);
<T> OptimisticTransaction<T> startTransaction(String indexName);

/**
*
* Start a new optimistic transaction.
*
* @param indexName index name
* @param dataSourceName TODO: read from elsewhere in future
* @param forceInit forceInit create empty translog if not exist.
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName,
boolean forceInit);
<T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit);

/**
* Create a Flint index with the metadata given.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,18 @@ public class FlintOpenSearchClient implements FlintClient {
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

private final FlintOptions options;
private final String dataSourceName;
private final String metaLogIndexName;

public FlintOpenSearchClient(FlintOptions options) {
this.options = options;
this.dataSourceName = options.getDataSourceName();
this.metaLogIndexName = constructMetaLogIndexName();
}

@Override
public <T> OptimisticTransaction<T> startTransaction(
String indexName, String dataSourceName, boolean forceInit) {
public <T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = constructMetaLogIndexName(dataSourceName);
try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
Expand All @@ -122,8 +124,8 @@ public <T> OptimisticTransaction<T> startTransaction(
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName) {
return startTransaction(indexName, dataSourceName, false);
public <T> OptimisticTransaction<T> startTransaction(String indexName) {
return startTransaction(indexName, false);
}

@Override
Expand Down Expand Up @@ -274,7 +276,6 @@ public IRestHighLevelClient createClient() {
final AtomicReference<AWSCredentialsProvider> metadataAccessAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());

String metaLogIndexName = constructMetaLogIndexName(options.getDataSourceName());
String systemIndexName = Strings.isNullOrEmpty(options.getSystemIndexName()) ? metaLogIndexName : options.getSystemIndexName();

if (Strings.isNullOrEmpty(metadataAccessProviderClass)) {
Expand Down Expand Up @@ -389,7 +390,7 @@ private String sanitizeIndexName(String indexName) {
return toLowercase(encoded);
}

private String constructMetaLogIndexName(String dataSourceName) {
private String constructMetaLogIndexName() {
return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,9 @@ class FlintSpark(val spark: SparkSession) extends Logging {
/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer

/**
* Data source name. Assign empty string in case of backward compatibility. TODO: remove this in
* future
*/
private val dataSourceName: String =
spark.conf.getOption("spark.flint.datasource.name").getOrElse("")

/** Flint Spark index monitor */
val flintIndexMonitor: FlintSparkIndexMonitor =
new FlintSparkIndexMonitor(spark, flintClient, dataSourceName)
new FlintSparkIndexMonitor(spark, flintClient)

/**
* Create index builder for creating index with fluent API.
Expand Down Expand Up @@ -106,7 +99,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
val metadata = index.metadata()
try {
flintClient
.startTransaction(indexName, dataSourceName, true)
.startTransaction(indexName, true)
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand Down Expand Up @@ -142,7 +135,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {

try {
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
Expand Down Expand Up @@ -249,7 +242,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest =>
latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED)
.transientLog(latest => latest.copy(state = DELETING))
Expand Down Expand Up @@ -284,7 +277,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
Expand Down Expand Up @@ -315,7 +308,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
if (index.exists(_.options.autoRefresh())) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
Expand Down Expand Up @@ -346,7 +339,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
Expand Down Expand Up @@ -436,7 +429,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
Expand All @@ -455,7 +448,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor
* Spark session
* @param flintClient
* Flint client
* @param dataSourceName
* data source name
*/
class FlintSparkIndexMonitor(
spark: SparkSession,
flintClient: FlintClient,
dataSourceName: String)
extends Logging {
class FlintSparkIndexMonitor(spark: SparkSession, flintClient: FlintClient) extends Logging {

/** Task execution initial delay in seconds */
private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds()
Expand Down Expand Up @@ -160,7 +154,7 @@ class FlintSparkIndexMonitor(
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
Expand Down Expand Up @@ -212,7 +206,7 @@ class FlintSparkIndexMonitor(
logInfo(s"Updating index state to failed for $indexName")
retry {
flintClient
.startTransaction(indexName, dataSourceName)
.startTransaction(indexName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE}
import org.apache.spark.sql.flint.config.FlintSparkConf.{DATA_SOURCE_NAME, REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE}

class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers {

Expand All @@ -31,8 +31,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
behavior of "Flint OpenSearch client"

it should "throw IllegalStateException if metadata log index doesn't exists" in {
val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource")
val flintClient = FlintClientBuilder.build(new FlintOptions(options.asJava))

the[IllegalStateException] thrownBy {
flintClient.startTransaction("test", "non-exist-index")
flintClient.startTransaction("test")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("empty metadata log entry content") {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.id shouldBe testLatestId
latest.state shouldBe EMPTY
Expand Down Expand Up @@ -102,7 +102,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
error = ""))

flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.id shouldBe testLatestId
latest.createTime shouldBe testCreateTime
Expand All @@ -126,7 +126,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should transit from initial to final log if initial log is empty") {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.state shouldBe EMPTY
true
Expand All @@ -140,7 +140,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should transit from initial to final log directly if no transient log") {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => latestLogEntry(testLatestId) should contain("state" -> "empty"))
Expand All @@ -162,7 +162,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
error = ""))

flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.state shouldBe ACTIVE
true
Expand All @@ -177,7 +177,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should exit if initial log entry doesn't meet precondition") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => false)
.transientLog(latest => latest.copy(state = ACTIVE))
.finalLog(latest => latest)
Expand All @@ -191,7 +191,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if initial log entry updated by others when updating transient log entry") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => {
// This update will happen first and thus cause version conflict as expected
Expand All @@ -207,7 +207,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if transient log entry updated by others when updating final log entry") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => {

Expand All @@ -225,7 +225,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
// Use create index scenario in this test case
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -250,7 +250,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => latest.copy(state = REFRESHING))
.finalLog(_ => throw new RuntimeException("Mock final log error"))
Expand All @@ -266,7 +266,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
// Use create index scenario in this test case
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => throw new RuntimeException("Mock operation error"))
Expand All @@ -279,7 +279,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("forceInit translog, even index is deleted before startTransaction") {
deleteIndex(testMetaLogIndex)
flintClient
.startTransaction(testFlintIndex, testDataSourceName, true)
.startTransaction(testFlintIndex, true)
.initialLog(latest => {
latest.id shouldBe testLatestId
latest.state shouldBe EMPTY
Expand All @@ -299,7 +299,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if index is deleted before initial operation") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => {
deleteIndex(testMetaLogIndex)
true
Expand All @@ -313,7 +313,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if index is deleted before transient operation") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => true)
.transientLog(latest => {
deleteIndex(testMetaLogIndex)
Expand All @@ -327,7 +327,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if index is deleted before final operation") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.startTransaction(testFlintIndex)
.initialLog(latest => true)
.transientLog(latest => { latest.copy(state = CREATING) })
.finalLog(latest => {
Expand Down

0 comments on commit 76a9c57

Please sign in to comment.