Skip to content

Commit

Permalink
Merge pull request #3 from TsReaper/sleep
Browse files Browse the repository at this point in the history
Set result fetching max sleep time according to query elapsed time
  • Loading branch information
tsreaper authored Mar 31, 2020
2 parents d9ac0da + 1c6c8e5 commit 36dc701
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/main/java/com/ververica/flink/table/jdbc/FlinkResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@
public class FlinkResultSet implements ResultSet {

// If an empty array is fetched as result, at least sleep this long millis before next attempt
private static final int DEFAULT_INIT_SLEEP_MILLIS = 10;
private static final long DEFAULT_INIT_SLEEP_MILLIS = 200L;
// If an empty array is fetched as result, at most sleep this long millis before next attempt
private static final int DEFAULT_MAX_SLEEP_MILLIS = 1000;
private static final long DEFAULT_MAX_SLEEP_MILLIS = 60000L;
// If an empty array is fetched as result, at most sleep [query elapsed time] * this fraction before next attempt
private static final double DEFAULT_MAX_SLEEP_FRACTION = 0.1;

private final SessionClient session;
private final Either<JobID, com.ververica.flink.table.gateway.rest.result.ResultSet> jobIdOrResultSet;
Expand All @@ -82,6 +84,8 @@ public class FlinkResultSet implements ResultSet {
private boolean wasNull;
private boolean closed;

private long resultSetCreateMillis;

public FlinkResultSet(
SessionClient session,
Either<JobID, com.ververica.flink.table.gateway.rest.result.ResultSet> jobIdOrResultSet,
Expand All @@ -100,6 +104,7 @@ public FlinkResultSet(
this.wasNull = false;
this.closed = false;

this.resultSetCreateMillis = System.currentTimeMillis();
}

@Override
Expand Down Expand Up @@ -1435,7 +1440,7 @@ private boolean fetchNextResponse(boolean needData) throws SQLException {
}

// do the actual remote fetching work
int sleepMillis = DEFAULT_INIT_SLEEP_MILLIS;
long sleepMillis = DEFAULT_INIT_SLEEP_MILLIS;
while (true) {
currentToken++;
ResultFetchResponseBody response;
Expand All @@ -1457,7 +1462,12 @@ private boolean fetchNextResponse(boolean needData) throws SQLException {
// empty array as result but we need data, sleep before next attempt
try {
Thread.sleep(sleepMillis);
sleepMillis = Math.min(sleepMillis * 2, DEFAULT_MAX_SLEEP_MILLIS);
long elapsedMillis = System.currentTimeMillis() - resultSetCreateMillis;
long maxSleepMillis = Math.min(
DEFAULT_MAX_SLEEP_MILLIS, Math.round(elapsedMillis * DEFAULT_MAX_SLEEP_FRACTION));
sleepMillis = Math.min(sleepMillis * 2, maxSleepMillis);
// we do not want the sleep time to be too short, so we should have a lower bound
sleepMillis = Math.max(sleepMillis, DEFAULT_INIT_SLEEP_MILLIS);
} catch (InterruptedException e) {
throw new SQLException(
"Interrupted while fetching more results for job " + jobIdOrResultSet.left(), e);
Expand Down

0 comments on commit 36dc701

Please sign in to comment.