Skip to content

Commit

Permalink
Merge branch 'main' into support-covering-index-idempotency
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Nov 8, 2023
2 parents e2d3f80 + d5e6738 commit c6a7f17
Show file tree
Hide file tree
Showing 29 changed files with 826 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha
* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha @kaituo @YANG-DB
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Lior Perry | [yangdb](https://github.com/YANG-DB) | Amazon |
| Sean Kao | [seankao-az](https://github.com/seankao-az) | Amazon |
| Anirudha Jadhav | [anirudha](https://github.com/anirudha) | Amazon |
| Kaituo Li | [kaituo](https://github.com/kaituo) | Amazon |
18 changes: 16 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,20 @@ WITH (
)
```

### Index Job Management

Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below.

```sql
RECOVER INDEX JOB <id>
```

Example:

```sql
RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index`
```

## Index Store

### OpenSearch
Expand Down Expand Up @@ -342,8 +356,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: default value is 1000.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false),
IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class FlintOptions implements Serializable {
*
* WAIT_UNTIL("wait_for")
*/
public static final String DEFAULT_REFRESH_POLICY = "false";
public static final String DEFAULT_REFRESH_POLICY = "wait_for";

public FlintOptions(Map<String, String> options) {
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core.metadata.log;

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -76,23 +78,46 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));

// Perform initial log check
if (initialCondition.test(latest)) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
}

// Append optional transient log
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));
}
// Append optional transient log
FlintMetadataLogEntry initialLog = latest;
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));

// Copy latest seqNo and primaryTerm to initialLog for potential rollback use
initialLog = initialLog.copy(
initialLog.id(),
latest.seqNo(),
latest.primaryTerm(),
initialLog.createTime(),
initialLog.state(),
initialLog.dataSource(),
initialLog.error());
}

// Perform operation
// Perform operation
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
return result;
} else {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
try {
// Roll back transient log if any
if (transientAction != null) {
metadataLog.add(initialLog);
}
} catch (Exception ex) {
LOG.log(WARNING, "Failed to rollback transient log", ex);
}
throw new IllegalStateException("Failed to commit transaction operation");
}
}

Expand All @@ -101,6 +126,7 @@ private FlintMetadataLogEntry emptyLogEntry() {
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState$.MODULE$.EMPTY(),
dataSourceName,
"");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@ case class FlintMetadataLogEntry(
id: String,
seqNo: Long,
primaryTerm: Long,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
*/
createTime: Long,
state: IndexState,
dataSource: String, // TODO: get from Spark conf
dataSource: String,
error: String) {

def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
id,
seqNo,
primaryTerm,
/* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */
map.get("jobStartTime").asInstanceOf[Number].longValue(),
IndexState.from(map.get("state").asInstanceOf[String]),
map.get("dataSourceName").asInstanceOf[String],
map.get("error").asInstanceOf[String])
Expand All @@ -48,12 +55,14 @@ case class FlintMetadataLogEntry(
s"""
|{
| "version": "1.0",
| "latestId": "$id",
| "type": "flintindexstate",
| "state": "$state",
| "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}",
| "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}",
| "dataSourceName": "$dataSource",
| "lastUpdateTime": "${System.currentTimeMillis()}",
| "jobStartTime": $createTime,
| "lastUpdateTime": ${System.currentTimeMillis()},
| "error": "$error"
|}
|""".stripMargin
Expand All @@ -74,6 +83,7 @@ object FlintMetadataLogEntry {
val DELETING: IndexState.Value = Value("deleting")
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
val RECOVERING: IndexState.Value = Value("recovering")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
latestId,
logEntry.seqNo(),
logEntry.primaryTerm(),
logEntry.createTime(),
logEntry.state(),
logEntry.dataSource(),
logEntry.error());
Expand Down Expand Up @@ -135,6 +136,7 @@ private FlintMetadataLogEntry writeLogEntry(
logEntry.id(),
response.getSeqNo(),
response.getPrimaryTerm(),
logEntry.createTime(),
logEntry.state(),
logEntry.dataSource(),
logEntry.error());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -17,12 +18,16 @@
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/
*/
public class OpenSearchScrollReader extends OpenSearchReader {

private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName());

/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L);

Expand Down Expand Up @@ -54,10 +59,16 @@ SearchResponse search(SearchRequest request) throws IOException {
* clean the scroll context.
*/
void clean() throws IOException {
if (!Strings.isNullOrEmpty(scrollId)) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
try {
if (!Strings.isNullOrEmpty(scrollId)) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
} catch (OpenSearchStatusException e) {
// OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121
LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" +
".com/opensearch-project/OpenSearch/issues/11121.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -109,6 +110,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: recoverIndexJobStatement
;

recoverIndexJobStatement
: RECOVER INDEX JOB identifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
RECOVER: 'RECOVER';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,25 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.util.ShutdownHookManager

/**
* Flint utility methods that rely on access to private code in Spark SQL package.
*/
package object flint {

/**
* Add shutdown hook to SparkContext with default priority.
*
* @param hook
* hook with the code to run during shutdown
* @return
* a handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
ShutdownHookManager.addShutdownHook(hook)
}

/**
* Convert the given logical plan to Spark data frame.
*
Expand Down
Loading

0 comments on commit c6a7f17

Please sign in to comment.