diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index b7c6188c51..f586f0386c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -138,7 +138,7 @@ private URI parseUri(String opensearchUri, String datasourceName) { } public Builder structuredStreaming() { - config.put("spark.flint.job.type", "wait"); + config.put("spark.flint.job.type", "streaming"); return this; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index 2c551c306e..0fad27bf61 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -14,6 +14,9 @@ */ @Data public class StartJobRequest { + + public static final Long DEFAULT_JOB_TIMEOUT = 120L; + private final String query; private final String jobName; private final String applicationId; @@ -27,6 +30,6 @@ public class StartJobRequest { private final boolean isStructuredStreaming; public Long executionTimeout() { - return isStructuredStreaming ? 0L : 60L; + return isStructuredStreaming ? 0L : DEFAULT_JOB_TIMEOUT; } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java index 29c9b5d3e9..783ce8466e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.spark.client; import static org.junit.jupiter.api.Assertions.*; +import static org.opensearch.sql.spark.client.StartJobRequest.DEFAULT_JOB_TIMEOUT; import java.util.Map; import org.junit.jupiter.api.Test; @@ -14,15 +15,15 @@ class StartJobRequestTest { @Test void executionTimeout() { - assertEquals(60L, onDemandJob().executionTimeout()); + assertEquals(DEFAULT_JOB_TIMEOUT, onDemandJob().executionTimeout()); assertEquals(0L, streamingJob().executionTimeout()); } private StartJobRequest onDemandJob() { - return new StartJobRequest("","","","","", Map.of(), false); + return new StartJobRequest("", "", "", "", "", Map.of(), false); } private StartJobRequest streamingJob() { - return new StartJobRequest("","","","","", Map.of(), true); + return new StartJobRequest("", "", "", "", "", Map.of(), true); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 7275947730..0980c72472 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -390,7 +390,7 @@ private String constructExpectedSparkSubmitParameterString() { } private String withStructuredStreaming(String parameters) { - return parameters + " --conf spark.flint.job.type=wait "; + return parameters + " --conf spark.flint.job.type=streaming "; } private DataSourceMetadata constructMyGlueDataSourceMetadata() {