From 94d4145f9242b9a190031a20db584177431be9f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Thu, 27 Apr 2023 15:55:56 +0200 Subject: [PATCH 1/3] Provide metadata of an asynchronous query --- .../net/snowflake/client/core/SFSession.java | 13 +++++++-- .../client/jdbc/SFAsyncResultSet.java | 29 +++++++++++++++++++ .../client/jdbc/SnowflakeResultSet.java | 10 +++++++ .../client/jdbc/SnowflakeResultSetV1.java | 8 +++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/snowflake/client/core/SFSession.java b/src/main/java/net/snowflake/client/core/SFSession.java index 4f4f67ef7..be7c2ed0f 100644 --- a/src/main/java/net/snowflake/client/core/SFSession.java +++ b/src/main/java/net/snowflake/client/core/SFSession.java @@ -153,7 +153,7 @@ public void addQueryToActiveQueryList(String queryID) { * @return enum of type QueryStatus indicating the query's status * @throws SQLException */ - public QueryStatus getQueryStatus(String queryID) throws SQLException { + public JsonNode getQueryMetadata(String queryID) throws SQLException { // create the URL to check the query monitoring endpoint String statusUrl = ""; String sessionUrl = getUrl(); @@ -228,7 +228,16 @@ else if (ex instanceof SFException) { } } } while (sessionRenewed); - JsonNode queryNode = jsonNode.path("data").path("queries"); + return jsonNode.path("data").path("queries"); + } + + /** + * @param queryID query ID of the query whose status is being investigated + * @return enum of type QueryStatus indicating the query's status + * @throws SQLException + */ + public QueryStatus getQueryStatus(String queryID) throws SQLException { + JsonNode queryNode = getQueryMetadata(queryID); String queryStatus = ""; String errorMessage = ""; int errorCode = 0; diff --git a/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java b/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java index cc55a19b0..7919275ba 100644 --- a/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java +++ b/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java @@ -5,7 +5,10 @@ package net.snowflake.client.jdbc; import static net.snowflake.client.core.QueryStatus.NO_DATA; +import static net.snowflake.client.core.QueryStatus.getStatusFromString; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; import com.google.api.client.util.Strings; import java.math.BigDecimal; import java.sql.*; @@ -25,6 +28,7 @@ class SFAsyncResultSet extends SnowflakeBaseResultSet implements SnowflakeResult private String queryID; private SFSession session; private Statement extraStatement; + private JsonNode lastQueriedMetadata = NullNode.instance; private QueryStatus lastQueriedStatus = NO_DATA; /** @@ -109,6 +113,31 @@ public String getQueryErrorMessage() throws SQLException { return this.lastQueriedStatus.getErrorMessage(); } + private boolean isLastQueriedMetadataStatusSuccess() { + JsonNode queryNode = this.lastQueriedMetadata; + if (queryNode.isEmpty()) { + return false; + } + String queryStatus = queryNode.get(0).path("status").asText(); + return getStatusFromString(queryStatus) == QueryStatus.SUCCESS; + } + + @Override + public JsonNode getQueryMetadata() throws SQLException { + if (session == null) { + throw new SQLException("Session not set"); + } + if (this.queryID == null) { + throw new SQLException("QueryID unknown"); + } + if (isLastQueriedMetadataStatusSuccess()) { + return this.lastQueriedMetadata; + } + this.lastQueriedMetadata = session.getQueryMetadata(this.queryID); + // if query has completed successfully, cache its metadata to avoid unnecessary future server calls + return this.lastQueriedMetadata; + } + /** * helper function for next() and getMetaData(). Calls result_scan to get resultSet after * asynchronous query call diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java index e517f2edf..177b78a5c 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java @@ -6,6 +6,8 @@ import java.sql.SQLException; import java.util.List; + +import com.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.core.QueryStatus; /** This interface defines Snowflake specific APIs for ResultSet */ @@ -35,6 +37,14 @@ public interface SnowflakeResultSet { */ String getQueryErrorMessage() throws SQLException; + /** + * This function retrieves the metadata of an asynchronous query. + * + * @return JsonNode of query metadata + * @throws SQLException + */ + JsonNode getQueryMetadata() throws SQLException; + /** * Get a list of ResultSetSerializables for the ResultSet in order to parallel processing * diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java index cd8e5a787..ca7997549 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; + +import com.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.core.QueryStatus; import net.snowflake.client.core.SFBaseResultSet; import net.snowflake.client.core.SFException; @@ -62,6 +64,12 @@ public String getQueryErrorMessage() throws SQLException { session, "This function is only supported for asynchronous queries."); } + @Override + public JsonNode getQueryMetadata() throws SQLException { + throw new SnowflakeLoggedFeatureNotSupportedException( + session, "This function is only supported for asynchronous queries."); + } + /** * Constructor takes a result set serializable object to create a sessionless result set. * From 5e77c608693e73d436e73fb1c9f6c2c70a69eed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Fri, 28 Apr 2023 12:12:08 +0200 Subject: [PATCH 2/3] Provide QueryStatusV2 API --- .../net/snowflake/client/core/SFSession.java | 31 +++++ .../snowflake/client/jdbc/QueryStatusV2.java | 115 ++++++++++++++++++ .../client/jdbc/SFAsyncResultSet.java | 21 +--- .../client/jdbc/SnowflakeResultSet.java | 11 +- .../client/jdbc/SnowflakeResultSetV1.java | 2 +- .../client/jdbc/ConnectionLatestIT.java | 15 +++ 6 files changed, 176 insertions(+), 19 deletions(-) create mode 100644 src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java diff --git a/src/main/java/net/snowflake/client/core/SFSession.java b/src/main/java/net/snowflake/client/core/SFSession.java index be7c2ed0f..d5d7a39ef 100644 --- a/src/main/java/net/snowflake/client/core/SFSession.java +++ b/src/main/java/net/snowflake/client/core/SFSession.java @@ -276,6 +276,37 @@ else if (isAnError(result)) { return result; } + /** + * @param queryID query ID of the query whose status is being investigated + * @return a QueryStatusV2 instance indicating the query's status + * @throws SQLException + */ + public QueryStatusV2 getQueryStatusV2(String queryID) throws SQLException { + JsonNode queryNode = getQueryMetadata(queryID); + logger.debug("Query status: {}", queryNode.asText()); + if (queryNode.isEmpty()) { + return QueryStatusV2.empty(); + } + JsonNode node = queryNode.get(0); + long endTime = node.path("endTime").asLong(0); + int errorCode = node.path("errorCode").asInt(0); + String errorMessage = node.path("errorMessage").asText("No error reported"); + String name = node.path("status").asText(""); + String sqlText = node.path("sqlText").asText(""); + long startTime = node.path("startTime").asLong(0); + String state = node.path("state").asText(""); + int totalDuration = node.path("totalDuration").asInt(0); + String warehouseExternalSize = node.path("warehouseExternalSize").asText(null); + int warehouseId = node.path("warehouseId").asInt(0); + String warehouseName = node.path("warehouseName").asText(null); + String warehouseServerType = node.path("warehouseServerType").asText(null); + QueryStatusV2 result = new QueryStatusV2(endTime, errorCode, errorMessage, name, sqlText, startTime, state, totalDuration, warehouseExternalSize, warehouseId, warehouseName, warehouseServerType); + if (!result.isStillRunning()) { + activeAsyncQueries.remove(queryID); + } + return result; + } + /** * Add a property If a property is known for connection, add it to connection properties If not, * add it as a dynamic session parameters diff --git a/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java b/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java new file mode 100644 index 000000000..757707c05 --- /dev/null +++ b/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java @@ -0,0 +1,115 @@ +package net.snowflake.client.jdbc; + +import net.snowflake.client.core.QueryStatus; + +public final class QueryStatusV2 { + private final long endTime; + private final int errorCode; + private final String errorMessage; + private final String name; + private final String sqlText; + private final long startTime; + private final String state; + private final QueryStatus status; + private final int totalDuration; + private final String warehouseExternalSize; + private final int warehouseId; + private final String warehouseName; + private final String warehouseServerType; + + public QueryStatusV2( + long endTime, + int errorCode, + String errorMessage, + String name, + String sqlText, + long startTime, + String state, + int totalDuration, + String warehouseExternalSize, + int warehouseId, + String warehouseName, + String warehouseServerType) { + this.endTime = endTime; + this.errorCode = errorCode; + this.errorMessage = errorMessage; + this.name = name; + this.sqlText = sqlText; + this.startTime = startTime; + this.state = state; + this.status = QueryStatus.getStatusFromString(name); + this.totalDuration = totalDuration; + this.warehouseExternalSize = warehouseExternalSize; + this.warehouseId = warehouseId; + this.warehouseName = warehouseName; + this.warehouseServerType = warehouseServerType; + } + + public static QueryStatusV2 empty() { + return new QueryStatusV2(0, 0, "", "", "", 0, "", 0, "", 0, "", ""); + } + + public boolean isEmpty() { + return name.isEmpty(); + } + + public boolean isStillRunning() { + return QueryStatus.isStillRunning(status); + } + + public boolean isSuccess() { + return status == QueryStatus.SUCCESS; + } + + public boolean isAnError() { + return QueryStatus.isAnError(status); + } + + public long getEndTime() { + return endTime; + } + + public int getErrorCode() { + return errorCode; + } + + public String getErrorMessage() { + return errorMessage; + } + + public String getName() { + return name; + } + + public String getSqlText() { + return sqlText; + } + + public long getStartTime() { + return startTime; + } + + public String getState() { + return state; + } + + public int getTotalDuration() { + return totalDuration; + } + + public String getWarehouseExternalSize() { + return warehouseExternalSize; + } + + public int getWarehouseId() { + return warehouseId; + } + + public String getWarehouseName() { + return warehouseName; + } + + public String getWarehouseServerType() { + return warehouseServerType; + } +} diff --git a/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java b/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java index 7919275ba..f92cf06ff 100644 --- a/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java +++ b/src/main/java/net/snowflake/client/jdbc/SFAsyncResultSet.java @@ -28,8 +28,8 @@ class SFAsyncResultSet extends SnowflakeBaseResultSet implements SnowflakeResult private String queryID; private SFSession session; private Statement extraStatement; - private JsonNode lastQueriedMetadata = NullNode.instance; private QueryStatus lastQueriedStatus = NO_DATA; + private QueryStatusV2 lastQueriedStatusV2 = QueryStatusV2.empty(); /** * Constructor takes an inputstream from the API response that we get from executing a SQL @@ -113,29 +113,20 @@ public String getQueryErrorMessage() throws SQLException { return this.lastQueriedStatus.getErrorMessage(); } - private boolean isLastQueriedMetadataStatusSuccess() { - JsonNode queryNode = this.lastQueriedMetadata; - if (queryNode.isEmpty()) { - return false; - } - String queryStatus = queryNode.get(0).path("status").asText(); - return getStatusFromString(queryStatus) == QueryStatus.SUCCESS; - } - @Override - public JsonNode getQueryMetadata() throws SQLException { + public QueryStatusV2 getQueryStatus() throws SQLException { if (session == null) { throw new SQLException("Session not set"); } if (this.queryID == null) { throw new SQLException("QueryID unknown"); } - if (isLastQueriedMetadataStatusSuccess()) { - return this.lastQueriedMetadata; + if (this.lastQueriedStatusV2.isSuccess()) { + return this.lastQueriedStatusV2; } - this.lastQueriedMetadata = session.getQueryMetadata(this.queryID); + this.lastQueriedStatusV2 = session.getQueryStatusV2(this.queryID); // if query has completed successfully, cache its metadata to avoid unnecessary future server calls - return this.lastQueriedMetadata; + return this.lastQueriedStatusV2; } /** diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java index 177b78a5c..239976183 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSet.java @@ -38,12 +38,17 @@ public interface SnowflakeResultSet { String getQueryErrorMessage() throws SQLException; /** - * This function retrieves the metadata of an asynchronous query. + * This function retrieves the status of an asynchronous query. An empty ResultSet object has + * already been returned, but the query may still be running. This function can be used to query + * whether it is possible to retrieve results from the ResultSet already. + *

+ * status.isSuccess() means that results can be retrieved. + *

* - * @return JsonNode of query metadata + * @return an instance containing query metadata * @throws SQLException */ - JsonNode getQueryMetadata() throws SQLException; + QueryStatusV2 getQueryStatus() throws SQLException; /** * Get a list of ResultSetSerializables for the ResultSet in order to parallel processing diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java index ca7997549..64c8bc795 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeResultSetV1.java @@ -65,7 +65,7 @@ public String getQueryErrorMessage() throws SQLException { } @Override - public JsonNode getQueryMetadata() throws SQLException { + public QueryStatusV2 getQueryStatus() throws SQLException { throw new SnowflakeLoggedFeatureNotSupportedException( session, "This function is only supported for asynchronous queries."); } diff --git a/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java index 3b650d28d..e3a0fdfad 100644 --- a/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java @@ -173,9 +173,11 @@ public void testAsyncQueryOpenAndCloseConnection() // Retrieve query ID for part 2 of test, check status of query String queryID = rs1.unwrap(SnowflakeResultSet.class).getQueryID(); QueryStatus status = null; + QueryStatusV2 statusV2 = null; for (int retry = 0; retry < 5; ++retry) { Thread.sleep(100); status = rs1.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs1.unwrap(SnowflakeResultSet.class).getQueryStatus(); // Sometimes 100 millis is too short for GS to get query status with provided queryID, in // which case we will get NO_DATA. if (status != QueryStatus.NO_DATA) { @@ -184,6 +186,7 @@ public void testAsyncQueryOpenAndCloseConnection() } // Query should take 60 seconds so should be running assertEquals(QueryStatus.RUNNING, status); + assertEquals(QueryStatus.RUNNING.name(), statusV2.getName()); // close connection and wait for 1 minute while query finishes running statement.close(); con.close(); @@ -199,11 +202,15 @@ public void testAsyncQueryOpenAndCloseConnection() } ResultSet rs = con.unwrap(SnowflakeConnection.class).createResultSet(queryID); status = rs.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs.unwrap(SnowflakeResultSet.class).getQueryStatus(); // Assert status of query is a success assertEquals(QueryStatus.SUCCESS, status); assertEquals("No error reported", status.getErrorMessage()); assertEquals(0, status.getErrorCode()); assertEquals(1, getSizeOfResultSet(rs)); + assertEquals(QueryStatus.SUCCESS.name(), statusV2.getName()); + assertEquals("No error reported", statusV2.getErrorMessage()); + assertEquals(0, statusV2.getErrorCode()); statement = con.createStatement(); // Create another query that will not be successful (querying table that does not exist) rs1 = @@ -212,11 +219,13 @@ public void testAsyncQueryOpenAndCloseConnection() .executeAsyncQuery("select * from nonexistentTable"); Thread.sleep(100); status = rs1.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs1.unwrap(SnowflakeResultSet.class).getQueryStatus(); // when GS response is slow, allow up to 1 second of retries to get final query status int counter = 0; while ((status == QueryStatus.NO_DATA || status == QueryStatus.RUNNING) && counter < 10) { Thread.sleep(100); status = rs1.unwrap(SnowflakeResultSet.class).getStatus(); + statusV2 = rs1.unwrap(SnowflakeResultSet.class).getQueryStatus(); } // If GS response is too slow to return data, do nothing to avoid flaky test failure. If // response has returned, @@ -228,6 +237,12 @@ public void testAsyncQueryOpenAndCloseConnection() "SQL compilation error:\n" + "Object 'NONEXISTENTTABLE' does not exist or not authorized.", status.getErrorMessage()); + assertEquals(QueryStatus.FAILED_WITH_ERROR.name(), statusV2.getName()); + assertEquals(2003, statusV2.getErrorCode()); + assertEquals( + "SQL compilation error:\n" + + "Object 'NONEXISTENTTABLE' does not exist or not authorized.", + statusV2.getErrorMessage()); } statement.close(); con.close(); From 151425e383bfa8d3cb2b02c9fef1f0dc576187fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Tue, 2 May 2023 16:11:38 +0200 Subject: [PATCH 3/3] Add `id` and `sessionId` to `QueryStatusV2` --- .../net/snowflake/client/core/SFSession.java | 4 +++- .../net/snowflake/client/jdbc/QueryStatusV2.java | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/snowflake/client/core/SFSession.java b/src/main/java/net/snowflake/client/core/SFSession.java index d5d7a39ef..0757bf854 100644 --- a/src/main/java/net/snowflake/client/core/SFSession.java +++ b/src/main/java/net/snowflake/client/core/SFSession.java @@ -291,7 +291,9 @@ public QueryStatusV2 getQueryStatusV2(String queryID) throws SQLException { long endTime = node.path("endTime").asLong(0); int errorCode = node.path("errorCode").asInt(0); String errorMessage = node.path("errorMessage").asText("No error reported"); + String id = node.path("id").asText(""); String name = node.path("status").asText(""); + long sessionId = node.path("sessionId").asLong(0); String sqlText = node.path("sqlText").asText(""); long startTime = node.path("startTime").asLong(0); String state = node.path("state").asText(""); @@ -300,7 +302,7 @@ public QueryStatusV2 getQueryStatusV2(String queryID) throws SQLException { int warehouseId = node.path("warehouseId").asInt(0); String warehouseName = node.path("warehouseName").asText(null); String warehouseServerType = node.path("warehouseServerType").asText(null); - QueryStatusV2 result = new QueryStatusV2(endTime, errorCode, errorMessage, name, sqlText, startTime, state, totalDuration, warehouseExternalSize, warehouseId, warehouseName, warehouseServerType); + QueryStatusV2 result = new QueryStatusV2(endTime, errorCode, errorMessage, id, name, sessionId, sqlText, startTime, state, totalDuration, warehouseExternalSize, warehouseId, warehouseName, warehouseServerType); if (!result.isStillRunning()) { activeAsyncQueries.remove(queryID); } diff --git a/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java b/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java index 757707c05..0dd93d903 100644 --- a/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java +++ b/src/main/java/net/snowflake/client/jdbc/QueryStatusV2.java @@ -6,7 +6,9 @@ public final class QueryStatusV2 { private final long endTime; private final int errorCode; private final String errorMessage; + private final String id; private final String name; + private final long sessionId; private final String sqlText; private final long startTime; private final String state; @@ -21,7 +23,9 @@ public QueryStatusV2( long endTime, int errorCode, String errorMessage, + String id, String name, + long sessionId, String sqlText, long startTime, String state, @@ -33,7 +37,9 @@ public QueryStatusV2( this.endTime = endTime; this.errorCode = errorCode; this.errorMessage = errorMessage; + this.id = id; this.name = name; + this.sessionId = sessionId; this.sqlText = sqlText; this.startTime = startTime; this.state = state; @@ -46,7 +52,7 @@ public QueryStatusV2( } public static QueryStatusV2 empty() { - return new QueryStatusV2(0, 0, "", "", "", 0, "", 0, "", 0, "", ""); + return new QueryStatusV2(0, 0, "", "", "", 0, "", 0, "", 0, "", 0, "", ""); } public boolean isEmpty() { @@ -77,10 +83,18 @@ public String getErrorMessage() { return errorMessage; } + public String getId() { + return id; + } + public String getName() { return name; } + public long getSessionId() { + return sessionId; + } + public String getSqlText() { return sqlText; }