Skip to content

Commit

Permalink
Pass datasource name to metadata log
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 31, 2023
1 parent fe76b4b commit ceb5c4c
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ public interface FlintClient {
* Start a new optimistic transaction.
*
* @param indexName index name
* @param metaLogIndexName metadata log index name
* @param dataSourceName TODO: read from elsewhere in future
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String metaLogIndexName);
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName);

/**
* Create a Flint index with the metadata given.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ case class FlintMetadata(
properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index schema */
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry */
latestId: Option[String] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {

Expand All @@ -58,6 +60,9 @@ case class FlintMetadata(
.field("source", source)
.field("indexedColumns", indexedColumns)

if (latestId.isDefined) {
builder.field("latestId", latestId.get)
}
optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)
}
Expand Down Expand Up @@ -219,14 +224,14 @@ object FlintMetadata {
def build(): FlintMetadata = {
FlintMetadata(
if (version == null) current() else version,
name,
kind,
source,
indexedColumns,
options,
properties,
schema,
indexSettings)
name = name,
kind = kind,
source = source,
indexedColumns = indexedColumns,
options = options,
properties = properties,
schema = schema,
indexSettings = indexSettings)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T>

private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName());

/**
* Data source name. TODO: remove this in future.
*/
private final String dataSourceName;

/**
* Flint metadata log
*/
Expand All @@ -34,7 +39,9 @@ public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T>
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> finalAction = null;

public DefaultOptimisticTransaction(
String dataSourceName,
FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.dataSourceName = dataSourceName;
this.metadataLog = metadataLog;
}

Expand Down Expand Up @@ -95,7 +102,7 @@ private FlintMetadataLogEntry emptyLogEntry() {
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
IndexState$.MODULE$.EMPTY(),
"mys3", // TODO: get it from spark conf
dataSourceName,
"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,26 @@ public class FlintOpenSearchClient implements FlintClient {
new NamedXContentRegistry(new SearchModule(Settings.builder().build(),
new ArrayList<>()).getNamedXContents());

/**
* Metadata log index name prefix
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

private final FlintOptions options;

public FlintOpenSearchClient(FlintOptions options) {
this.options = options;
}

@Override public <T> OptimisticTransaction<T> startTransaction(String indexName, String metaLogIndexName) {
@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName) {
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
LOG.info("Starting transaction on index " + indexName + " and metadata log index " + metaLogIndexName);

try (RestHighLevelClient client = createClient()) {
if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
return new DefaultOptimisticTransaction<>(
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName));
} else {
return new NoOptimisticTransaction<>();
Expand All @@ -88,7 +97,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}
}

@Override public void createIndex(String indexName, FlintMetadata metadata) {
@Override
public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
Expand All @@ -105,7 +115,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}
}

@Override public boolean exists(String indexName) {
@Override
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
Expand All @@ -115,7 +126,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}
}

@Override public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
@Override
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = toLowercase(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
Expand All @@ -132,7 +144,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}
}

@Override public FlintMetadata getIndexMetadata(String indexName) {
@Override
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
Expand All @@ -147,7 +160,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}
}

@Override public void deleteIndex(String indexName) {
@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
Expand All @@ -163,10 +177,11 @@ public FlintOpenSearchClient(FlintOptions options) {
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all.
* @param query DSL query. DSL query is null means match_all.
* @return {@link FlintReader}.
*/
@Override public FlintReader createReader(String indexName, String query) {
@Override
public FlintReader createReader(String indexName, String query) {
LOG.info("Creating Flint index reader for " + indexName + " with query " + query);
try {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
Expand All @@ -190,7 +205,8 @@ public FlintWriter createWriter(String indexName) {
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
}

@Override public RestHighLevelClient createClient() {
@Override
public RestHighLevelClient createClient() {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadat
/**
* Reuse query request index as Flint metadata log store
*/
private final String indexName;
private final String metaLogIndexName;

/**
* Doc id for latest log entry (Naming rule is static so no need to query Flint index metadata)
*/
private final String latestId;

public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName, String indexName) {
public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName, String metaLogIndexName) {
this.flintClient = flintClient;
this.indexName = indexName;
this.metaLogIndexName = metaLogIndexName;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
}

Expand All @@ -70,7 +70,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
GetResponse response =
client.get(new GetRequest(indexName, latestId), RequestOptions.DEFAULT);
client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

if (response.isExists()) {
FlintMetadataLogEntry latest = new FlintMetadataLogEntry(
Expand Down Expand Up @@ -105,7 +105,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
return writeLogEntry(logEntryWithId,
client -> client.index(
new IndexRequest()
.index(indexName)
.index(metaLogIndexName)
.id(logEntryWithId.id())
.source(logEntryWithId.toJson(), XContentType.JSON),
RequestOptions.DEFAULT));
Expand All @@ -115,7 +115,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Updating log entry " + logEntry);
return writeLogEntry(logEntry,
client -> client.update(
new UpdateRequest(indexName, logEntry.id())
new UpdateRequest(metaLogIndexName, logEntry.id())
.doc(logEntry.toJson(), XContentType.JSON)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo(logEntry.seqNo())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
private val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1)

/**
* Metadata log index name with a default name for backward compatibility. If the index doesn't
* exist, the transaction support will be disabled in FlintClient.
* Data source name. Assign empty string in case of backward compatibility. TODO: remove this in
* future
*/
private val metaLogIndexName: String = {
val indexName = spark.conf.getOption("spark.flint.job.requestIndex")
indexName.getOrElse(".query_execution_request")
}
private val dataSourceName: String =
spark.conf.getOption("spark.flint.datasource.name").getOrElse("")

/**
* Create index builder for creating index with fluent API.
Expand Down Expand Up @@ -107,7 +105,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
val metadata = index.metadata()
try {
flintClient
.startTransaction(indexName, metaLogIndexName)
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == EMPTY)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand Down Expand Up @@ -137,7 +135,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {

try {
flintClient
.startTransaction(indexName, metaLogIndexName)
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest => latest.copy(state = REFRESHING))
.finalLog(latest => {
Expand Down Expand Up @@ -210,7 +208,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, metaLogIndexName)
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING)
.transientLog(latest => latest.copy(state = DELETING))
.finalLog(latest => latest.copy(state = DELETED))
Expand Down Expand Up @@ -251,7 +249,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo("Scheduler triggers index log entry update")
try {
flintClient
.startTransaction(indexName, metaLogIndexName)
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(latest => logInfo("Updating log entry to " + latest))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.core.storage.FlintOpenSearchClient._
import org.opensearch.flint.spark.FlintSparkSuite

/**
Expand All @@ -26,11 +27,12 @@ import org.opensearch.flint.spark.FlintSparkSuite
*/
trait OpenSearchTransactionSuite extends FlintSparkSuite {

val testMetaLogIndex = ".query_execution_request_mys3"
val testDataSourceName = "myglue"
lazy val testMetaLogIndex: String = META_LOG_NAME_PREFIX + "_" + testDataSourceName

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set("spark.flint.job.requestIndex", testMetaLogIndex)
spark.conf.set("spark.flint.datasource.name", testDataSourceName)
}

override def beforeEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should transit from initial to final log if initial log is empty") {
flintClient
.startTransaction(testFlintIndex, testMetaLogIndex)
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(latest => {
latest.state shouldBe EMPTY
true
Expand All @@ -43,7 +43,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should transit from initial to final log directly if no transient log") {
flintClient
.startTransaction(testFlintIndex, testMetaLogIndex)
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => latestLogEntry(testLatestId) should contain("state" -> "empty"))
Expand All @@ -64,7 +64,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
error = ""))

flintClient
.startTransaction(testFlintIndex, testMetaLogIndex)
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(latest => {
latest.state shouldBe ACTIVE
true
Expand All @@ -79,7 +79,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should exit if initial log entry doesn't meet precondition") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testMetaLogIndex)
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => false)
.transientLog(latest => latest)
.finalLog(latest => latest)
Expand All @@ -90,7 +90,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if initial log entry updated by others when updating transient log entry") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testMetaLogIndex)
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => true)
.transientLog(latest => {
// This update will happen first and thus cause version conflict as expected
Expand All @@ -106,7 +106,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should fail if transient log entry updated by others when updating final log entry") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testMetaLogIndex)
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => true)
.transientLog(latest => {

Expand Down

0 comments on commit ceb5c4c

Please sign in to comment.