Skip to content

Commit

Permalink
Submit long running job only when auto_refresh = false (#2208)
Browse files Browse the repository at this point in the history
* Bug Fix, Submit long running job only when auto_refresh = false

Signed-off-by: Peng Huo <[email protected]>

* update

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
(cherry picked from commit 49ea48c)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 4, 2023
1 parent 610a1b6 commit 2b3695e
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ private URI parseUri(String opensearchUri, String datasourceName) {
}
}

public Builder structuredStreaming() {
config.put("spark.flint.job.type", "streaming");

public Builder structuredStreaming(Boolean isStructuredStreaming) {
if (isStructuredStreaming) {
config.put("spark.flint.job.type", "streaming");
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ private StartJobRequest getStartJobRequestForIndexRequest(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming()
.structuredStreaming(indexDetails.getAutoRefresh())
.build()
.toString(),
tags,
true);
indexDetails.getAutoRefresh());
return startJobRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@
public class IndexDetails {
private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private Boolean autoRefresh = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.utils;

import java.util.Locale;
import lombok.Getter;
import lombok.experimental.UtilityClass;
import org.antlr.v4.runtime.CommonTokenStream;
Expand Down Expand Up @@ -132,5 +133,56 @@ public Void visitTableName(FlintSparkSqlExtensionsParser.TableNameContext ctx) {
indexDetails.setFullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText()));
return super.visitTableName(ctx);
}

@Override
public Void visitCreateSkippingIndexStatement(
FlintSparkSqlExtensionsParser.CreateSkippingIndexStatementContext ctx) {
visitPropertyList(ctx.propertyList());
return super.visitCreateSkippingIndexStatement(ctx);
}

@Override
public Void visitCreateCoveringIndexStatement(
FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext ctx) {
visitPropertyList(ctx.propertyList());
return super.visitCreateCoveringIndexStatement(ctx);
}

@Override
public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext ctx) {
if (ctx != null) {
ctx.property()
.forEach(
property -> {
// todo. Currently, we use contains() api to avoid unescape string. In future, we
// should leverage
// https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 to unescape string literal
if (propertyKey(property.key).toLowerCase(Locale.ROOT).contains("auto_refresh")) {
if (propertyValue(property.value).toLowerCase(Locale.ROOT).contains("true")) {
indexDetails.setAutoRefresh(true);
}
}
});
}
return null;
}

private String propertyKey(FlintSparkSqlExtensionsParser.PropertyKeyContext key) {
if (key.STRING() != null) {
return key.STRING().getText();
} else {
return key.getText();
}
}

private String propertyValue(FlintSparkSqlExtensionsParser.PropertyValueContext value) {
if (value.STRING() != null) {
return value.STRING().getText();
} else if (value.booleanValue() != null) {
return value.getText();
} else {
return value.getText();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.sql.spark.utils;

import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.index;
import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.skippingIndex;

import lombok.Getter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -107,4 +111,74 @@ void testExtractionFromFlintIndexQueries() {
Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
}

/** https://github.com/opensearch-project/sql/issues/2206 */
@Test
void testAutoRefresh() {
Assertions.assertFalse(
SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).getAutoRefresh());

Assertions.assertFalse(
SQLQueryUtils.extractIndexDetails(
skippingIndex().withProperty("auto_refresh", "false").getQuery())
.getAutoRefresh());

Assertions.assertTrue(
SQLQueryUtils.extractIndexDetails(
skippingIndex().withProperty("auto_refresh", "true").getQuery())
.getAutoRefresh());

Assertions.assertTrue(
SQLQueryUtils.extractIndexDetails(
skippingIndex().withProperty("\"auto_refresh\"", "true").getQuery())
.getAutoRefresh());

Assertions.assertTrue(
SQLQueryUtils.extractIndexDetails(
skippingIndex().withProperty("\"auto_refresh\"", "\"true\"").getQuery())
.getAutoRefresh());

Assertions.assertFalse(
SQLQueryUtils.extractIndexDetails(
skippingIndex().withProperty("auto_refresh", "1").getQuery())
.getAutoRefresh());

Assertions.assertFalse(
SQLQueryUtils.extractIndexDetails(skippingIndex().withProperty("interval", "1").getQuery())
.getAutoRefresh());

Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).getAutoRefresh());

Assertions.assertFalse(
SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "false").getQuery())
.getAutoRefresh());

Assertions.assertTrue(
SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "true").getQuery())
.getAutoRefresh());
}

@Getter
protected static class IndexQuery {
private String query;

private IndexQuery(String query) {
this.query = query;
}

public static IndexQuery skippingIndex() {
return new IndexQuery(
"CREATE SKIPPING INDEX ON myS3.default.alb_logs" + "(l_orderkey VALUE_SET)");
}

public static IndexQuery index() {
return new IndexQuery(
"CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, " + "l_quantity)");
}

public IndexQuery withProperty(String key, String value) {
query = String.format("%s with (%s = %s)", query, key, value);
return this;
}
}
}

0 comments on commit 2b3695e

Please sign in to comment.