Skip to content

Commit

Permalink
Merge branch 'main' into implement-vacuum-operation
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Mar 18, 2024
2 parents 894aecc + d11a268 commit 1555ca9
Show file tree
Hide file tree
Showing 45 changed files with 2,593 additions and 792 deletions.
27 changes: 26 additions & 1 deletion spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
| analyzeSkippingIndexStatement
;

createSkippingIndexStatement
Expand All @@ -46,6 +48,12 @@ describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

alterSkippingIndexStatement
: ALTER SKIPPING INDEX
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;
Expand All @@ -59,6 +67,7 @@ coveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| alterCoveringIndexStatement
| dropCoveringIndexStatement
| vacuumCoveringIndexStatement
;
Expand All @@ -83,6 +92,12 @@ describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

alterCoveringIndexStatement
: ALTER INDEX indexName
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;
Expand All @@ -91,11 +106,16 @@ vacuumCoveringIndexStatement
: VACUUM INDEX indexName ON tableName
;

analyzeSkippingIndexStatement
: ANALYZE SKIPPING INDEX ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| alterMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;
Expand All @@ -118,6 +138,11 @@ describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

alterMaterializedViewStatement
: ALTER MATERIALIZED VIEW mvName=multipartIdentifier
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down Expand Up @@ -163,7 +188,7 @@ indexColTypeList
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX | BLOOM_FILTER)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

Expand Down
3 changes: 3 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ nonReserved

// Flint lexical tokens

BLOOM_FILTER: 'BLOOM_FILTER';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';
Expand All @@ -155,6 +156,8 @@ DOT: '.';


AS: 'AS';
ALTER: 'ALTER';
ANALYZE: 'ANALYZE';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ COMMA: ',';
DOT: '.';
LEFT_BRACKET: '[';
RIGHT_BRACKET: ']';
BANG: '!';

// NOTE: If you add a new token in the list below, you should update the list of keywords
// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and
Expand Down Expand Up @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NOT: 'NOT' | '!';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
NUMERIC: 'NUMERIC';
Expand Down Expand Up @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL
| DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
;

// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message:
// * Unicode letters rather than a-z and A-Z only
// * URI paths for table references using paths
// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser.
IDENTIFIER
: (LETTER | DIGIT | '_')+
: (UNICODE_LETTER | DIGIT | '_')+
| UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+
;

BACKQUOTED_IDENTIFIER
Expand All @@ -535,6 +541,10 @@ fragment LETTER
: [A-Z]
;

fragment UNICODE_LETTER
: [\p{L}]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| BANG
;

describeColName
Expand Down Expand Up @@ -946,7 +947,7 @@ expressionSeq
;

booleanExpression
: NOT booleanExpression #logicalNot
: (NOT | BANG) booleanExpression #logicalNot
| EXISTS LEFT_PAREN query RIGHT_PAREN #exists
| valueExpression predicate? #predicated
| left=booleanExpression operator=AND right=booleanExpression #logicalBinary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
Expand All @@ -28,79 +30,133 @@
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.utils.SQLQueryUtils;

/** Handle Index DML query. includes * DROP * ALT? */
@RequiredArgsConstructor
public class IndexDMLHandler extends AsyncQueryHandler {
private static final Logger LOG = LogManager.getLogger();

// To be deprecated in 3.0. Still using for backward compatibility.
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";

private final EMRServerlessClient emrServerlessClient;

private final JobExecutionResponseReader jobExecutionResponseReader;

private final FlintIndexMetadataReader flintIndexMetadataReader;

private final Client client;
private final FlintIndexMetadataService flintIndexMetadataService;

private final StateStore stateStore;

private final Client client;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId);
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String error = "";
long startTime = 0L;
String datasource = dispatchQueryRequest.getDatasource();
long startTime = System.currentTimeMillis();
try {
// For drop or vacuum, cancel running job (if refreshing) and delete index logically
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasource, emrServerlessClient);
jobCancelOp.apply(indexMetadata);

FlintIndexOp indexDeleteOp = new FlintIndexOpDelete(stateStore, datasource);
indexDeleteOp.apply(indexMetadata);

// For vacuum, continue to delete index data physically
if (isVacuum(dispatchQueryRequest)) {
FlintIndexOp indexVacuumOp = new FlintIndexOpVacuum(stateStore, datasource, client);
indexVacuumOp.apply(indexMetadata);
}
status = JobRunState.SUCCESS.toString();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);
AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
startTime);
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
} catch (Exception e) {
error = e.getMessage();
LOG.error("Failed to perform index DML operations", e);
LOG.error(e.getMessage());
AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
startTime);
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
}
}

private AsyncQueryId storeIndexDMLResult(
DispatchQueryRequest dispatchQueryRequest,
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long startTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
asyncQueryId.getId(),
status,
error,
datasource,
dispatchQueryRequest.getDatasource(),
System.currentTimeMillis() - startTime,
System.currentTimeMillis());
String resultIndex = dataSourceMetadata.getResultIndex();
createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult);
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
return asyncQueryId;
}

return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null);
private void executeIndexOp(
DispatchQueryRequest dispatchQueryRequest,
IndexQueryDetails indexQueryDetails,
FlintIndexMetadata indexMetadata) {
switch (indexQueryDetails.getIndexQueryActionType()) {
case DROP:
case VACUUM:
FlintIndexOp dropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
dropOp.apply(indexMetadata);

// For vacuum, continue to delete index data physically
if (indexQueryDetails.getIndexQueryActionType() == IndexQueryActionType.VACUUM) {
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
}
break;
case ALTER:
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
indexQueryDetails.getFlintIndexOptions(),
stateStore,
dispatchQueryRequest.getDatasource(),
emrServerlessClient,
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
default:
throw new IllegalStateException(
String.format(
"IndexQueryActionType: %s is not supported in IndexDMLHandler.",
indexQueryDetails.getIndexQueryActionType()));
}
}

private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) {
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(indexDetails.openSearchIndexName());
if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName()));
}
return indexMetadataMap.get(indexDetails.openSearchIndexName());
}

@Override
Expand All @@ -123,9 +179,4 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
throw new IllegalArgumentException("can't cancel index DML query");
}

private boolean isVacuum(DispatchQueryRequest request) {
IndexQueryDetails details = SQLQueryUtils.extractIndexDetails(request.getQuery());
return details.getIndexQueryActionType() == IndexQueryActionType.VACUUM;
}
}
Loading

0 comments on commit 1555ca9

Please sign in to comment.