diff --git a/src/main/java/net/snowflake/client/core/SFSession.java b/src/main/java/net/snowflake/client/core/SFSession.java index 4f4f67ef7..0757bf854 100644 --- a/src/main/java/net/snowflake/client/core/SFSession.java +++ b/src/main/java/net/snowflake/client/core/SFSession.java @@ -153,7 +153,7 @@ public void addQueryToActiveQueryList(String queryID) { * @return enum of type QueryStatus indicating the query's status * @throws SQLException */ - public QueryStatus getQueryStatus(String queryID) throws SQLException { + public JsonNode getQueryMetadata(String queryID) throws SQLException { // create the URL to check the query monitoring endpoint String statusUrl = ""; String sessionUrl = getUrl(); @@ -228,7 +228,16 @@ else if (ex instanceof SFException) { } } } while (sessionRenewed); - JsonNode queryNode = jsonNode.path("data").path("queries"); + return jsonNode.path("data").path("queries"); + } + + /** + * @param queryID query ID of the query whose status is being investigated + * @return enum of type QueryStatus indicating the query's status + * @throws SQLException + */ + public QueryStatus getQueryStatus(String queryID) throws SQLException { + JsonNode queryNode = getQueryMetadata(queryID); String queryStatus = ""; String errorMessage = ""; int errorCode = 0; @@ -267,6 +276,39 @@ else if (isAnError(result)) { return result; } + /** + * @param queryID query ID of the query whose status is being investigated + * @return a QueryStatusV2 instance indicating the query's status + * @throws SQLException + */ + public QueryStatusV2 getQueryStatusV2(String queryID) throws SQLException { + JsonNode queryNode = getQueryMetadata(queryID); + logger.debug("Query status: {}", queryNode.asText()); + if (queryNode.isEmpty()) { + return QueryStatusV2.empty(); + } + JsonNode node = queryNode.get(0); + long endTime = node.path("endTime").asLong(0); + int errorCode = node.path("errorCode").asInt(0); + String errorMessage = node.path("errorMessage").asText("No error reported"); + String id = node.path("id").asText(""); + String name = node.path("status").asText(""); + long sessionId = node.path("sessionId").asLong(0); + String sqlText = node.path("sqlText").asText(""); + long startTime = node.path("startTime").asLong(0); + String state = node.path("state").asText(""); + int totalDuration = node.path("totalDuration").asInt(0); + String warehouseExternalSize = node.path("warehouseExternalSize").asText(null); + int warehouseId = node.path("warehouseId").asInt(0); + String warehouseName = node.path("warehouseName").asText(null); + String warehouseServerType = node.path("warehouseServerType").asText(null); + QueryStatusV2 result = new QueryStatusV2(endTime, errorCode, errorMessage, id, name, sessionId, sqlText, startTime, state, totalDuration, warehouseExternalSize, warehouseId, warehouseName, warehouseServerType); + if (!result.isStillRunning()) { + activeAsyncQueries.remove(queryID); + } + return result; + } + /** * Add a property If a property is known for connection, add it to connection properties If not, * add it as a dynamic session parameters diff --git a/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java b/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java new file mode 100644 index 000000000..0dd93d903 --- /dev/null +++ b/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java @@ -0,0 +1,129 @@ +package net.snowflake.client.jdbc; + +import net.snowflake.client.core.QueryStatus; + +public final class QueryStatusV2 { + private final long endTime; + private final int errorCode; + private final String errorMessage; + private final String id; + private final String name; + private final long sessionId; + private final String sqlText; + private final long startTime; + private final String state; + private final QueryStatus status; + private final int totalDuration; + private final String warehouseExternalSize; + private final int warehouseId; + private final String warehouseName; + private final String warehouseServerType; + + public QueryStatusV2( + long endTime, + int errorCode, + String errorMessage, + String id, + String name, + long sessionId, + String sqlText, + long startTime, + String state, + int totalDuration, + String warehouseExternalSize, + int warehouseId, + String warehouseName, + String warehouseServerType) { + this.endTime = endTime; + this.errorCode = errorCode; + this.errorMessage = errorMessage; + this.id = id; + this.name = name; + this.sessionId = sessionId; + this.sqlText = sqlText; + this.startTime = startTime; + this.state = state; + this.status = QueryStatus.getStatusFromString(name); + this.totalDuration = totalDuration; + this.warehouseExternalSize = warehouseExternalSize; + this.warehouseId = warehouseId; + this.warehouseName = warehouseName; + this.warehouseServerType = warehouseServerType; + } + + public static QueryStatusV2 empty() { + return new QueryStatusV2(0, 0, "", "", "", 0, "", 0, "", 0, "", 0, "", ""); + } + + public boolean isEmpty() { + return name.isEmpty(); + } + + public boolean isStillRunning() { + return QueryStatus.isStillRunning(status); + } + + public boolean isSuccess() { + return status == QueryStatus.SUCCESS; + } + + public boolean isAnError() { + return QueryStatus.isAnError(status); + } + + public long getEndTime() { + return endTime; + } + + public int getErrorCode() { + return errorCode; + } + + public String getErrorMessage() { + return errorMessage; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public long getSessionId() { + return sessionId; + } + + public String getSqlText() { + return sqlText; + } + + public long getStartTime() { + return startTime; + } + + public String getState() { + return state; + } + + public int getTotalDuration() { + return totalDuration; + } + + public String getWarehouseExternalSize() { + return warehouseExternalSize; + } + + public int getWarehouseId() { + return warehouseId; + } + + public String getWarehouseName() { + return warehouseName; + } + + public String getWarehouseServerType() { + return warehouseServerType; + } +} diff --git a/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java b/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java index cc55a19b0..f92cf06ff 100644 --- a/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java +++ b/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java @@ -5,7 +5,10 @@ package net.snowflake.client.jdbc; import static net.snowflake.client.core.QueryStatus.NO_DATA; +import static net.snowflake.client.core.QueryStatus.getStatusFromString; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; import com.google.api.client.util.Strings; import java.math.BigDecimal; import java.sql.*; @@ -26,6 +29,7 @@ class SFAsyncResultSet extends SnowflakeBaseResultSet implements SnowflakeResult private SFSession session; private Statement extraStatement; private QueryStatus lastQueriedStatus = NO_DATA; + private QueryStatusV2 lastQueriedStatusV2 = QueryStatusV2.empty(); /** * Constructor takes an inputstream from the API response that we get from executing a SQL @@ -109,6 +113,22 @@ public String getQueryErrorMessage() throws SQLException { return this.lastQueriedStatus.getErrorMessage(); } + @Override + public QueryStatusV2 getQueryStatus() throws SQLException { + if (session == null) { + throw new SQLException("Session not set"); + } + if (this.queryID == null) { + throw new SQLException("QueryID unknown"); + } + if (this.lastQueriedStatusV2.isSuccess()) { + return this.lastQueriedStatusV2; + } + this.lastQueriedStatusV2 = session.getQueryStatusV2(this.queryID); + // if query has completed successfully, cache its metadata to avoid unnecessary future server calls + return this.lastQueriedStatusV2; + } + /** * helper function for next() and getMetaData(). Calls result_scan to get resultSet after * asynchronous query call diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java index e517f2edf..239976183 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java @@ -6,6 +6,8 @@ import java.sql.SQLException; import java.util.List; + +import com.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.core.QueryStatus; /** This interface defines Snowflake specific APIs for ResultSet */ @@ -35,6 +37,19 @@ public interface SnowflakeResultSet { */ String getQueryErrorMessage() throws SQLException; + /** + * This function retrieves the status of an asynchronous query. An empty ResultSet object has + * already been returned, but the query may still be running. This function can be used to query + * whether it is possible to retrieve results from the ResultSet already. + *

+ * status.isSuccess() means that results can be retrieved. + *

+ * + * @return an instance containing query metadata + * @throws SQLException + */ + QueryStatusV2 getQueryStatus() throws SQLException; + /** * Get a list of ResultSetSerializables for the ResultSet in order to parallel processing * diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java index cd8e5a787..64c8bc795 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; + +import com.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.core.QueryStatus; import net.snowflake.client.core.SFBaseResultSet; import net.snowflake.client.core.SFException; @@ -62,6 +64,12 @@ public String getQueryErrorMessage() throws SQLException { session, "This function is only supported for asynchronous queries."); } + @Override + public QueryStatusV2 getQueryStatus() throws SQLException { + throw new SnowflakeLoggedFeatureNotSupportedException( + session, "This function is only supported for asynchronous queries."); + } + /** * Constructor takes a result set serializable object to create a sessionless result set. * diff --git a/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java index 3b650d28d..e3a0fdfad 100644 --- a/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java @@ -173,9 +173,11 @@ public void testAsyncQueryOpenAndCloseConnection() // Retrieve query ID for part 2 of test, check status of query String queryID = rs1.unwrap(SnowflakeResultSet.class).getQueryID(); QueryStatus status = null; + QueryStatusV2 statusV2 = null; for (int retry = 0; retry < 5; ++retry) { Thread.sleep(100); status = rs1.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs1.unwrap(SnowflakeResultSet.class).getQueryStatus(); // Sometimes 100 millis is too short for GS to get query status with provided queryID, in // which case we will get NO_DATA. if (status != QueryStatus.NO_DATA) { @@ -184,6 +186,7 @@ public void testAsyncQueryOpenAndCloseConnection() } // Query should take 60 seconds so should be running assertEquals(QueryStatus.RUNNING, status); + assertEquals(QueryStatus.RUNNING.name(), statusV2.getName()); // close connection and wait for 1 minute while query finishes running statement.close(); con.close(); @@ -199,11 +202,15 @@ public void testAsyncQueryOpenAndCloseConnection() } ResultSet rs = con.unwrap(SnowflakeConnection.class).createResultSet(queryID); status = rs.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs.unwrap(SnowflakeResultSet.class).getQueryStatus(); // Assert status of query is a success assertEquals(QueryStatus.SUCCESS, status); assertEquals("No error reported", status.getErrorMessage()); assertEquals(0, status.getErrorCode()); assertEquals(1, getSizeOfResultSet(rs)); + assertEquals(QueryStatus.SUCCESS.name(), statusV2.getName()); + assertEquals("No error reported", statusV2.getErrorMessage()); + assertEquals(0, statusV2.getErrorCode()); statement = con.createStatement(); // Create another query that will not be successful (querying table that does not exist) rs1 = @@ -212,11 +219,13 @@ public void testAsyncQueryOpenAndCloseConnection() .executeAsyncQuery("select * from nonexistentTable"); Thread.sleep(100); status = rs1.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs1.unwrap(SnowflakeResultSet.class).getQueryStatus(); // when GS response is slow, allow up to 1 second of retries to get final query status int counter = 0; while ((status == QueryStatus.NO_DATA || status == QueryStatus.RUNNING) && counter < 10) { Thread.sleep(100); status = rs1.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs1.unwrap(SnowflakeResultSet.class).getQueryStatus(); } // If GS response is too slow to return data, do nothing to avoid flaky test failure. If // response has returned, @@ -228,6 +237,12 @@ public void testAsyncQueryOpenAndCloseConnection() "SQL compilation error:\n" + "Object 'NONEXISTENTTABLE' does not exist or not authorized.", status.getErrorMessage()); + assertEquals(QueryStatus.FAILED_WITH_ERROR.name(), statusV2.getName()); + assertEquals(2003, statusV2.getErrorCode()); + assertEquals( + "SQL compilation error:\n" + + "Object 'NONEXISTENTTABLE' does not exist or not authorized.", + statusV2.getErrorMessage()); } statement.close(); con.close();