Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 3, 2023
1 parent b1da712 commit 06f7a74
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private URI parseUri(String opensearchUri, String datasourceName) {
}

public Builder structuredStreaming() {
config.put("spark.flint.job.type", "wait");
config.put("spark.flint.job.type", "streaming");

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
@Data
public class StartJobRequest {

public static final Long DEFAULT_JOB_TIMEOUT = 120L;

private final String query;
private final String jobName;
private final String applicationId;
Expand All @@ -27,6 +30,6 @@ public class StartJobRequest {
private final boolean isStructuredStreaming;

public Long executionTimeout() {
return isStructuredStreaming ? 0L : 60L;
return isStructuredStreaming ? 0L : DEFAULT_JOB_TIMEOUT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.spark.client;

import static org.junit.jupiter.api.Assertions.*;
import static org.opensearch.sql.spark.client.StartJobRequest.DEFAULT_JOB_TIMEOUT;

import java.util.Map;
import org.junit.jupiter.api.Test;
Expand All @@ -14,15 +15,15 @@ class StartJobRequestTest {

@Test
void executionTimeout() {
assertEquals(60L, onDemandJob().executionTimeout());
assertEquals(DEFAULT_JOB_TIMEOUT, onDemandJob().executionTimeout());
assertEquals(0L, streamingJob().executionTimeout());
}

private StartJobRequest onDemandJob() {
return new StartJobRequest("","","","","", Map.of(), false);
return new StartJobRequest("", "", "", "", "", Map.of(), false);
}

private StartJobRequest streamingJob() {
return new StartJobRequest("","","","","", Map.of(), true);
return new StartJobRequest("", "", "", "", "", Map.of(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private String constructExpectedSparkSubmitParameterString() {
}

private String withStructuredStreaming(String parameters) {
return parameters + " --conf spark.flint.job.type=wait ";
return parameters + " --conf spark.flint.job.type=streaming ";
}

private DataSourceMetadata constructMyGlueDataSourceMetadata() {
Expand Down

0 comments on commit 06f7a74

Please sign in to comment.