From a175090ac7066190f9eb6af1bc7bc4f9fae85a83 Mon Sep 17 00:00:00 2001 From: Harry Xi <44687000+sfc-gh-ext-simba-hx@users.noreply.github.com> Date: Mon, 11 Sep 2023 09:17:53 -0700 Subject: [PATCH] SNOW-683712 [HTAP] query context cache (#757) ### Description sdk issue15 did manual e2e test with log output query request/response body, the query context there seems good. --- .../UnitTests/QueryContextCacheTest.cs | 270 ++++++++++++++ .../UnitTests/SFSessionPropertyTest.cs | 52 ++- Snowflake.Data/Core/QueryContextCache.cs | 332 ++++++++++++++++++ Snowflake.Data/Core/RestRequest.cs | 72 ++++ Snowflake.Data/Core/RestResponse.cs | 42 +++ .../Core/SFMultiStatementsResultSet.cs | 1 + Snowflake.Data/Core/SFResultSet.cs | 1 + Snowflake.Data/Core/SFStatement.cs | 1 + Snowflake.Data/Core/Session/SFSession.cs | 37 ++ .../Core/Session/SFSessionParameter.cs | 1 + .../Core/Session/SFSessionProperty.cs | 2 + 11 files changed, 803 insertions(+), 8 deletions(-) create mode 100644 Snowflake.Data.Tests/UnitTests/QueryContextCacheTest.cs create mode 100644 Snowflake.Data/Core/QueryContextCache.cs diff --git a/Snowflake.Data.Tests/UnitTests/QueryContextCacheTest.cs b/Snowflake.Data.Tests/UnitTests/QueryContextCacheTest.cs new file mode 100644 index 000000000..407ef9c0a --- /dev/null +++ b/Snowflake.Data.Tests/UnitTests/QueryContextCacheTest.cs @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + */ + +namespace Snowflake.Data.Tests.UnitTests +{ + using Newtonsoft.Json; + using NUnit.Framework; + using Snowflake.Data.Core; + using System; + using System.Collections.Generic; + + [TestFixture] + class QueryContextCacheTest + { + private QueryContextCache _qcc = null; + private const long BaseReadTimestamp = 1668727958; + private const string Context = "Some query context"; + private const long BaseId = 0; + private const long BasePriority = 0; + private const int MaxCapacity = 5; + + private long[] _expectedIDs; + private long[] _expectedReadTimestamp; + private long[] _expectedPriority; + + private void InitCache() + { + _qcc = new QueryContextCache(MaxCapacity); + } + + private void InitCacheWithData() + { + InitCacheWithDataWithContext(Context); + } + + private void InitCacheWithDataWithContext(String context) + { + _qcc = new QueryContextCache(MaxCapacity); + _expectedIDs = new long[MaxCapacity]; + _expectedReadTimestamp = new long[MaxCapacity]; + _expectedPriority = new long[MaxCapacity]; + for (int i = 0; i < MaxCapacity; i++) + { + _expectedIDs[i] = BaseId + i; + _expectedReadTimestamp[i] = BaseReadTimestamp + i; + _expectedPriority[i] = BasePriority + i; + _qcc.Merge(_expectedIDs[i], _expectedReadTimestamp[i], _expectedPriority[i], context); + } + _qcc.SyncPriorityMap(); + } + + private void InitCacheWithDataInRandomOrder() + { + _qcc = new QueryContextCache(MaxCapacity); + _expectedIDs = new long[MaxCapacity]; + _expectedReadTimestamp = new long[MaxCapacity]; + _expectedPriority = new long[MaxCapacity]; + for (int i = 0; i < MaxCapacity; i++) + { + _expectedIDs[i] = BaseId + i; + _expectedReadTimestamp[i] = BaseReadTimestamp + i; + _expectedPriority[i] = BasePriority + i; + } + + _qcc.Merge(_expectedIDs[3], _expectedReadTimestamp[3], _expectedPriority[3], Context); + _qcc.Merge(_expectedIDs[2], _expectedReadTimestamp[2], _expectedPriority[2], Context); + _qcc.Merge(_expectedIDs[4], _expectedReadTimestamp[4], _expectedPriority[4], Context); + _qcc.Merge(_expectedIDs[0], _expectedReadTimestamp[0], _expectedPriority[0], Context); + _qcc.Merge(_expectedIDs[1], _expectedReadTimestamp[1], _expectedPriority[1], Context); + _qcc.SyncPriorityMap(); + } + + private void AssertCacheData() + { + AssertCacheDataWithContext(Context); + } + + private void AssertCacheDataWithContext(string context) + { + int size = _qcc.GetSize(); + Assert.IsTrue(size == MaxCapacity); + + // Compare elements + SortedSet elements = _qcc.getElements(); + int i = 0; + foreach (QueryContextElement elem in elements) + { + Assert.AreEqual(_expectedIDs[i], elem.Id); + Assert.AreEqual(_expectedReadTimestamp[i], elem.ReadTimestamp); + Assert.AreEqual(_expectedPriority[i], elem.Priority); + Assert.AreEqual(context, elem.Context); + i++; + } + Assert.AreEqual(i, MaxCapacity); + } + + [Test] + public void TestIsEmpty() + { + InitCache(); + Assert.IsTrue(_qcc.GetSize() == 0); + } + + [Test] + public void TestWithSomeData() + { + InitCacheWithData(); + // Compare elements + AssertCacheData(); + } + + [Test] + public void TestWithSomeDataInRandomOrder() + { + InitCacheWithDataInRandomOrder(); + // Compare elements + AssertCacheData(); + } + + [Test] + public void TestMoreThanCapacity() + { + InitCacheWithData(); + + // Add one more element at the end + int i = MaxCapacity; + _qcc.Merge(BaseId + i, BaseReadTimestamp + i, BasePriority + i, Context); + _qcc.SyncPriorityMap(); + _qcc.CheckCacheCapacity(); + + // Compare elements + AssertCacheData(); + } + + [Test] + public void TestUpdateTimestamp() + { + InitCacheWithData(); + + // Add one more element with new TS with existing id + int updatedID = 1; + _expectedReadTimestamp[updatedID] = BaseReadTimestamp + updatedID + 10; + _qcc.Merge( + BaseId + updatedID, _expectedReadTimestamp[updatedID], BasePriority + updatedID, Context); + _qcc.SyncPriorityMap(); + _qcc.CheckCacheCapacity(); + + // Compare elements + AssertCacheData(); + } + + [Test] + public void TestUpdatePriority() + { + InitCacheWithData(); + + // Add one more element with new priority with existing id + int updatedID = 3; + long updatedPriority = BasePriority + updatedID + 7; + + _expectedPriority[updatedID] = updatedPriority; + _qcc.Merge( + BaseId + updatedID, BaseReadTimestamp + updatedID, _expectedPriority[updatedID], Context); + _qcc.SyncPriorityMap(); + _qcc.CheckCacheCapacity(); + + for (int i = updatedID; i < MaxCapacity - 1; i++) + { + _expectedIDs[i] = _expectedIDs[i + 1]; + _expectedReadTimestamp[i] = _expectedReadTimestamp[i + 1]; + _expectedPriority[i] = _expectedPriority[i + 1]; + } + + _expectedIDs[MaxCapacity - 1] = BaseId + updatedID; + _expectedReadTimestamp[MaxCapacity - 1] = BaseReadTimestamp + updatedID; + _expectedPriority[MaxCapacity - 1] = updatedPriority; + + AssertCacheData(); + } + + [Test] + public void TestAddSamePriority() + { + InitCacheWithData(); + + // Add one more element with same priority + int i = MaxCapacity; + long updatedPriority = BasePriority + 1; + _qcc.Merge(BaseId + i, BaseReadTimestamp + i, updatedPriority, Context); + _qcc.SyncPriorityMap(); + _qcc.CheckCacheCapacity(); + _expectedIDs[1] = BaseId + i; + _expectedReadTimestamp[1] = BaseReadTimestamp + i; + + // Compare elements + AssertCacheData(); + } + + [Test] + public void TestAddSameIDButStaleTimestamp() + { + InitCacheWithData(); + + // Add one more element with same priority + int i = 2; + _qcc.Merge(BaseId + i, BaseReadTimestamp + i - 10, BasePriority + i, Context); + _qcc.SyncPriorityMap(); + _qcc.CheckCacheCapacity(); + + // Compare elements + AssertCacheData(); + } + + [Test] + public void TestEmptyCacheWithNullData() + { + InitCacheWithData(); + + _qcc.Update(null); + Assert.AreEqual(_qcc.GetSize(), 0); + } + + [Test] + public void TestEmptyCacheWithEmptyResponseData() + { + InitCacheWithData(); + + ResponseQueryContext rsp = JsonConvert.DeserializeObject("", JsonUtils.JsonSettings); + _qcc.Update(rsp); + Assert.AreEqual(_qcc.GetSize(), 0); + } + + [Test] + public void TestSerializeRequestAndDeserializeResponseData() + { + // Init _qcc + InitCacheWithData(); + AssertCacheData(); + + var json = JsonConvert.SerializeObject(_qcc.GetQueryContextResponse(), JsonUtils.JsonSettings); + + // Clear _qcc + _qcc.ClearCache(); + Assert.AreEqual(_qcc.GetSize(), 0); + + ResponseQueryContext rsp = JsonConvert.DeserializeObject(json, JsonUtils.JsonSettings); + _qcc.Update(rsp); + AssertCacheData(); + } + + [Test] + public void TestSerializeRequestAndDeserializeResponseDataWithNullContext() + { + // Init _qcc + InitCacheWithDataWithContext(null); + AssertCacheDataWithContext(null); + + var json = JsonConvert.SerializeObject(_qcc.GetQueryContextResponse(), JsonUtils.JsonSettings); + + // Clear _qcc + _qcc.ClearCache(); + Assert.AreEqual(_qcc.GetSize(), 0); + + ResponseQueryContext rsp = JsonConvert.DeserializeObject(json, JsonUtils.JsonSettings); + _qcc.Update(rsp); + AssertCacheDataWithContext(null); + } + } +} diff --git a/Snowflake.Data.Tests/UnitTests/SFSessionPropertyTest.cs b/Snowflake.Data.Tests/UnitTests/SFSessionPropertyTest.cs index 4b19baffc..cdf6296e2 100644 --- a/Snowflake.Data.Tests/UnitTests/SFSessionPropertyTest.cs +++ b/Snowflake.Data.Tests/UnitTests/SFSessionPropertyTest.cs @@ -59,6 +59,7 @@ public static IEnumerable ConnectionStringTestCases() string defMaxHttpRetries = "7"; string defIncludeRetryReason = "true"; + string defDisableQueryContextCache = "false"; var simpleTestCase = new TestCase() { @@ -82,7 +83,8 @@ public static IEnumerable ConnectionStringTestCases() { SFSessionProperty.FORCEPARSEERROR, "false" }, { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, defBrowserResponseTime }, { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, - { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason } + { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, defDisableQueryContextCache } } }; var testCaseWithBrowserResponseTimeout = new TestCase() @@ -106,9 +108,10 @@ public static IEnumerable ConnectionStringTestCases() { SFSessionProperty.FORCEPARSEERROR, "false" }, { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, "180" }, { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, - { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason } + { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, defDisableQueryContextCache } } - }; + }; var testCaseWithProxySettings = new TestCase() { ExpectedProperties = new SFSessionProperties() @@ -133,7 +136,8 @@ public static IEnumerable ConnectionStringTestCases() { SFSessionProperty.NONPROXYHOSTS, defNonProxyHosts }, { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, defBrowserResponseTime }, { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, - { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason } + { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, defDisableQueryContextCache } }, ConnectionString = $"ACCOUNT={defAccount};USER={defUser};PASSWORD={defPassword};useProxy=true;proxyHost=proxy.com;proxyPort=1234;nonProxyHosts=localhost" @@ -162,7 +166,8 @@ public static IEnumerable ConnectionStringTestCases() { SFSessionProperty.NONPROXYHOSTS, defNonProxyHosts }, { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, defBrowserResponseTime }, { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, - { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason } + { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, defDisableQueryContextCache } }, ConnectionString = $"ACCOUNT={defAccount};USER={defUser};PASSWORD={defPassword};proxyHost=proxy.com;proxyPort=1234;nonProxyHosts=localhost" @@ -190,7 +195,8 @@ public static IEnumerable ConnectionStringTestCases() { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, defBrowserResponseTime }, { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, { SFSessionProperty.FILE_TRANSFER_MEMORY_THRESHOLD, "25" }, - { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason } + { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, defDisableQueryContextCache } } }; var testCaseWithIncludeRetryReason = new TestCase() @@ -215,16 +221,46 @@ public static IEnumerable ConnectionStringTestCases() { SFSessionProperty.FORCEPARSEERROR, "false" }, { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, defBrowserResponseTime }, { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, - { SFSessionProperty.INCLUDERETRYREASON, "false" } + { SFSessionProperty.INCLUDERETRYREASON, "false" }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, defDisableQueryContextCache } } }; + var testCaseWithDisableQueryContextCache = new TestCase() + { + ExpectedProperties = new SFSessionProperties() + { + { SFSessionProperty.ACCOUNT, defAccount }, + { SFSessionProperty.USER, defUser }, + { SFSessionProperty.HOST, defHost }, + { SFSessionProperty.AUTHENTICATOR, defAuthenticator }, + { SFSessionProperty.SCHEME, defScheme }, + { SFSessionProperty.CONNECTION_TIMEOUT, defConnectionTimeout }, + { SFSessionProperty.PASSWORD, defPassword }, + { SFSessionProperty.PORT, defPort }, + { SFSessionProperty.VALIDATE_DEFAULT_PARAMETERS, "true" }, + { SFSessionProperty.USEPROXY, "false" }, + { SFSessionProperty.INSECUREMODE, "false" }, + { SFSessionProperty.DISABLERETRY, "false" }, + { SFSessionProperty.FORCERETRYON404, "false" }, + { SFSessionProperty.CLIENT_SESSION_KEEP_ALIVE, "false" }, + { SFSessionProperty.FORCEPARSEERROR, "false" }, + { SFSessionProperty.BROWSER_RESPONSE_TIMEOUT, defBrowserResponseTime }, + { SFSessionProperty.MAXHTTPRETRIES, defMaxHttpRetries }, + { SFSessionProperty.INCLUDERETRYREASON, defIncludeRetryReason }, + { SFSessionProperty.DISABLEQUERYCONTEXTCACHE, "true" } + }, + ConnectionString = + $"ACCOUNT={defAccount};USER={defUser};PASSWORD={defPassword};DISABLEQUERYCONTEXTCACHE=true" + }; return new TestCase[] { simpleTestCase, testCaseWithBrowserResponseTimeout, testCaseWithProxySettings, testCaseThatDefaultForUseProxyIsFalse, - testCaseWithFileTransferMaxBytesInMemory + testCaseWithFileTransferMaxBytesInMemory, + testCaseWithIncludeRetryReason, + testCaseWithDisableQueryContextCache }; } diff --git a/Snowflake.Data/Core/QueryContextCache.cs b/Snowflake.Data/Core/QueryContextCache.cs new file mode 100644 index 000000000..ac2ba6d01 --- /dev/null +++ b/Snowflake.Data/Core/QueryContextCache.cs @@ -0,0 +1,332 @@ +using System; +using System.Collections.Generic; +using Newtonsoft.Json; +using System.Linq; +using Snowflake.Data.Log; + +namespace Snowflake.Data.Core +{ + internal class QueryContextElement + { + // database id as key. (bigint) + public long Id { get; set; } + + // When the query context read (bigint). Compare for same id. + public long ReadTimestamp { get; set; } + + // Priority of the query context (bigint). Compare for different ids. + public long Priority { get; set; } + + // Opaque information (object with a value of base64 encoded string). + public string Context { get; set; } + + public QueryContextElement(long id, long timestamp, + long pri, string context) + { + Id = id; + ReadTimestamp = timestamp; + Priority = pri; + Context = context; + } + + public override bool Equals(Object obj) + { + if (obj == null || !(obj is QueryContextElement)) + { + return false; + } + + QueryContextElement other = (QueryContextElement)obj; + return (Id == other.Id) && + (Priority == other.Priority) && + (ReadTimestamp == other.ReadTimestamp) && + (Context.Equals(other.Context)); + } + + public override int GetHashCode() + { + int hash = 31; + + hash = hash * 31 + (int)Id; + hash += (hash * 31) + (int)ReadTimestamp; + hash += (hash * 31) + (int)Priority; + hash += (hash * 31) + Context.GetHashCode(); + + return hash; + } + }; + + class ElementComparer : IComparer + { + public int Compare(QueryContextElement x, QueryContextElement y) + { + if (x.Priority != y.Priority) return x.Priority > y.Priority ? 1 : -1; + if (x.Id != y.Id) return x.Id > y.Id ? 1 : -1; + if (x.ReadTimestamp != y.ReadTimestamp) return x.ReadTimestamp > y.ReadTimestamp ? 1 : -1; + return 0; + } + } + + internal class QueryContextCache + { + private int _capacity; // Capacity of the cache + private Dictionary _idMap; // Map for id and QCC + private Dictionary _priorityMap; // Map for priority and QCC + private Dictionary _newPriorityMap; // Intermediate map for priority and QCC for current round of merging + private SortedSet _cacheSet; // Order data as per priority + private SFLogger _logger = SFLoggerFactory.GetLogger(); + + public QueryContextCache(int capacity) + { + _capacity = capacity; + _idMap = new Dictionary(); + _priorityMap = new Dictionary(); + _newPriorityMap = new Dictionary(); + _cacheSet = new SortedSet(new ElementComparer()); + } + + public void Merge(long id, long readTimestamp, long priority, string context) + { + if (_idMap.ContainsKey(id)) + { + _logger.Debug( + $"Merge with existing id in cache = {id}, priority = {priority}"); + // ID found in the cache + QueryContextElement qce = _idMap[id]; + if (readTimestamp > qce.ReadTimestamp) + { + if (qce.Priority == priority) + { + // Same priority, overwrite new data at same place + qce.ReadTimestamp = readTimestamp; + qce.Context = context; + } + else + { + // Change in priority + QueryContextElement newQCE = + new QueryContextElement(id, readTimestamp, priority, context); + + ReplaceQCE(qce, newQCE); + } // new priority + } // new data is recent + else if (readTimestamp == qce.ReadTimestamp && qce.Priority != priority) + { + // Same read timestamp but change in priority + QueryContextElement newQCE = new QueryContextElement(id, readTimestamp, priority, context); + ReplaceQCE(qce, newQCE); + } + } // id found + else + { + // new id + if (_priorityMap.ContainsKey(priority)) + { + _logger.Debug( + $"Merge with existing priority in cache = {id}, priority = {priority}"); + // Same priority with different id + QueryContextElement qce = _priorityMap[priority]; + // Replace with new data + QueryContextElement newQCE = new QueryContextElement(id, readTimestamp, priority, context); + ReplaceQCE(qce, newQCE); + } + else + { + // new priority + // Add new element in the cache + _logger.Debug( + $"Adding new QCC item with either id nor priority found in cache id = {id}, priority = {priority}"); + QueryContextElement newQCE = new QueryContextElement(id, readTimestamp, priority, context); + AddQCE(newQCE); + } + } + } + + /** Sync the newPriorityMap with the priorityMap at the end of current round of merge */ + public void SyncPriorityMap() + { + _logger.Debug( + $"syncPriorityMap called priorityMap size = {_priorityMap.Count}, newPrioirtyMap size = {_newPriorityMap.Count}"); + foreach (KeyValuePair entry in _newPriorityMap) + { + _priorityMap.Add(entry.Key, entry.Value); + } + // clear the newPriorityMap for next round of QCC merge(a round consists of multiple entries) + _newPriorityMap.Clear(); + } + + /** + * After the merge, loop through priority list and make sure cache is at most capacity. Remove all + * other elements from the list based on priority. + */ + public void CheckCacheCapacity() + { + _logger.Debug( + $"checkCacheCapacity() called. cacheSet size {_cacheSet.Count} cache capacity {_capacity}"); + if (_cacheSet.Count > _capacity) + { + // remove elements based on priority + while (_cacheSet.Count > _capacity) + { + QueryContextElement qce = _cacheSet.Last(); + RemoveQCE(qce); + } + } + + _logger.Debug( + $"checkCacheCapacity() returns. cacheSet size {_cacheSet.Count} cache capacity {_capacity}"); + } + + /** Clear the cache. */ + public void ClearCache() + { + _logger.Debug("clearCache() called"); + _idMap.Clear(); + _priorityMap.Clear(); + _cacheSet.Clear(); + _logger.Debug($"clearCache() returns. Number of entries in cache now {_cacheSet.Count}"); + } + + public void SetCapacity(int cap) + { + // check without locking first for performance reason + if (_capacity == cap) + return; + + _logger.Debug($"set capacity from {_capacity} to {cap}"); + _capacity = cap; + CheckCacheCapacity(); + LogCacheEntries(); + } + + /** + * Get all elements in the cache in the order of the priority. + * Make it public for test purpose. + * + * @return TreeSet containing cache elements + */ + public SortedSet getElements() + { + return _cacheSet; + } + + public int GetSize() + { + return _cacheSet.Count; + } + + /** + * Update query context chache with the query context received in query response + * @param queryContext: the QueryContext body parsed from query response + */ + public void Update(ResponseQueryContext queryContext) + { + // Log existing cache entries + LogCacheEntries(); + + if (queryContext == null || queryContext.Entries == null) + { + // Clear the cache + ClearCache(); + return; + } + foreach (ResponseQueryContextElement entry in queryContext.Entries) + { + Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context); + } + + SyncPriorityMap(); + + // After merging all entries, truncate to capacity + CheckCacheCapacity(); + // Log existing cache entries + LogCacheEntries(); + } + + /** + * Get the query context can be sent through query request + * @return the QueryContext body + */ + public RequestQueryContext GetQueryContextRequest() + { + RequestQueryContext reqQCC = new RequestQueryContext(); + reqQCC.Entries = new List(); + foreach (QueryContextElement elem in _cacheSet) + { + RequestQueryContextElement reqElem = new RequestQueryContextElement(elem); + reqQCC.Entries.Add(reqElem); + } + + return reqQCC; + } + + /** + * Get the query context in response format, for test purpose + * @return the QueryContext body + */ + public ResponseQueryContext GetQueryContextResponse() + { + ResponseQueryContext rspQCC = new ResponseQueryContext(); + rspQCC.Entries = new List(); + foreach (QueryContextElement elem in _cacheSet) + { + ResponseQueryContextElement rspElem = new ResponseQueryContextElement(elem); + rspQCC.Entries.Add(rspElem); + } + + return rspQCC; + } + + /** + * Add an element in the cache. + * + * @param qce element to add + */ + private void AddQCE(QueryContextElement qce) + { + _idMap.Add(qce.Id, qce); + // In a round of merge operations, we should save the new priority->qce mapping in an additional map + // and sync `newPriorityMap` to `priorityMap` at the end of a for loop of `merge` operations + _newPriorityMap.Add(qce.Priority, qce); + _cacheSet.Add(qce); + } + + /** + * Remove an element from the cache. + * + * @param qce element to remove. + */ + private void RemoveQCE(QueryContextElement qce) + { + _cacheSet.Remove(qce); + _priorityMap.Remove(qce.Priority); + _idMap.Remove(qce.Id); + } + + /** + * Replace the cache element with a new response element. Remove old element exist in the cache + * and add a new element received. + * + * @param oldQCE an element exist in the cache + * @param newQCE a new element just received. + */ + private void ReplaceQCE(QueryContextElement oldQCE, QueryContextElement newQCE) + { + // Remove old element from the cache + RemoveQCE(oldQCE); + // Add new element in the cache + AddQCE(newQCE); + } + + /** Debugging purpose, log the all entries in the cache. */ + private void LogCacheEntries() + { +#if DEBUG + foreach (QueryContextElement elem in _cacheSet) + { + _logger.Debug($"Cache Entry: id: {elem.Id} readTimestamp: {elem.ReadTimestamp} priority: {elem.Priority}"); + } +#endif + } + } +} diff --git a/Snowflake.Data/Core/RestRequest.cs b/Snowflake.Data/Core/RestRequest.cs index f3cbbd5ff..49a53bc04 100644 --- a/Snowflake.Data/Core/RestRequest.cs +++ b/Snowflake.Data/Core/RestRequest.cs @@ -299,6 +299,78 @@ class QueryRequest [JsonProperty(PropertyName = "parameters")] internal Dictionary parameters { get; set; } + + [JsonProperty(PropertyName = "queryContextDTO", NullValueHandling = NullValueHandling.Ignore)] + internal RequestQueryContext QueryContextDTO { get; set; } + } + + // The query context in query response + internal class RequestQueryContext + { + [JsonProperty(PropertyName = "entries")] + internal List Entries { get; set; } + } + + // The empty query context value in request + internal class QueryContextValueEmpty + { + // empty object with no filed + } + + // The non-empty query context value in request + internal class QueryContextValue + { + // base64 encoded string of Opaque information + [JsonProperty(PropertyName = "base64Data")] + public string Base64Data { get; set; } + + public QueryContextValue(string context) + { + Base64Data = context; + } + } + + // The query context in query response + internal class RequestQueryContextElement + { + // database id as key. (bigint) + [JsonProperty(PropertyName = "id")] + public long Id { get; set; } + + // When the query context read (bigint). Compare for same id. + [JsonProperty(PropertyName = "timestamp")] + public long ReadTimestamp { get; set; } + + // Priority of the query context (bigint). Compare for different ids. + [JsonProperty(PropertyName = "priority")] + public long Priority { get; set; } + + // Opaque information (object with a value of base64 encoded string). + [JsonProperty(PropertyName = "context")] + public object Context{ get; set; } + + public void SetContext(string context) + { + if (context != null) + { + Context = new QueryContextValue(context); + } + else + { + Context = new QueryContextValueEmpty(); + } + } + + // default constructor for JSON converter + public RequestQueryContextElement() { } + + public RequestQueryContextElement(QueryContextElement elem) + { + Id = elem.Id; + Priority = elem.Priority; + ReadTimestamp = elem.ReadTimestamp; + SetContext(elem.Context); + } } class QueryCancelRequest diff --git a/Snowflake.Data/Core/RestResponse.cs b/Snowflake.Data/Core/RestResponse.cs index 6db550cfd..ae1055c28 100755 --- a/Snowflake.Data/Core/RestResponse.cs +++ b/Snowflake.Data/Core/RestResponse.cs @@ -229,6 +229,48 @@ internal class QueryExecResponseData : IQueryExecResponseData [JsonProperty(PropertyName = "rowsetBase64", NullValueHandling = NullValueHandling.Ignore)] internal string rowsetBase64 { get; set; } + + // query context + [JsonProperty(PropertyName = "queryContext", NullValueHandling = NullValueHandling.Ignore)] + internal ResponseQueryContext QueryContext { get; set; } + } + + // The query context in query response + internal class ResponseQueryContext + { + [JsonProperty(PropertyName = "entries")] + internal List Entries { get; set; } + } + + // The query context in query response + internal class ResponseQueryContextElement + { + // database id as key. (bigint) + [JsonProperty(PropertyName = "id")] + public long Id { get; set; } + + // When the query context read (bigint). Compare for same id. + [JsonProperty(PropertyName = "timestamp")] + public long ReadTimestamp { get; set; } + + // Priority of the query context (bigint). Compare for different ids. + [JsonProperty(PropertyName = "priority")] + public long Priority { get; set; } + + // Opaque information (object with a value of base64 encoded string). + [JsonProperty(PropertyName = "context", NullValueHandling = NullValueHandling.Ignore)] + public string Context { get; set; } + + // default constructor for JSON converter + public ResponseQueryContextElement() { } + + public ResponseQueryContextElement(QueryContextElement elem) + { + Id = elem.Id; + Priority = elem.Priority; + ReadTimestamp = elem.ReadTimestamp; + Context = elem.Context; + } } internal class ExecResponseRowType diff --git a/Snowflake.Data/Core/SFMultiStatementsResultSet.cs b/Snowflake.Data/Core/SFMultiStatementsResultSet.cs index daa37d961..735d912b4 100644 --- a/Snowflake.Data/Core/SFMultiStatementsResultSet.cs +++ b/Snowflake.Data/Core/SFMultiStatementsResultSet.cs @@ -117,6 +117,7 @@ private void updateSessionStatus(QueryExecResponseData responseData) SFSession session = this.sfStatement.SfSession; session.UpdateDatabaseAndSchema(responseData.finalDatabaseName, responseData.finalSchemaName); session.UpdateSessionParameterMap(responseData.parameters); + session.UpdateQueryContextCache(responseData.QueryContext); } private void updateResultMetadata() diff --git a/Snowflake.Data/Core/SFResultSet.cs b/Snowflake.Data/Core/SFResultSet.cs index 43256697b..0ce066d63 100755 --- a/Snowflake.Data/Core/SFResultSet.cs +++ b/Snowflake.Data/Core/SFResultSet.cs @@ -236,6 +236,7 @@ private void updateSessionStatus(QueryExecResponseData responseData) SFSession session = this.sfStatement.SfSession; session.UpdateDatabaseAndSchema(responseData.finalDatabaseName, responseData.finalSchemaName); session.UpdateSessionParameterMap(responseData.parameters); + session.UpdateQueryContextCache(responseData.QueryContext); } } } diff --git a/Snowflake.Data/Core/SFStatement.cs b/Snowflake.Data/Core/SFStatement.cs index 2414d6f68..3c48688ee 100644 --- a/Snowflake.Data/Core/SFStatement.cs +++ b/Snowflake.Data/Core/SFStatement.cs @@ -121,6 +121,7 @@ private SFRestRequest BuildQueryRequest(string sql, Dictionary parameterList) stopHeartBeatForThisSession(); } } + if ((!_disableQueryContextCache) && + (ParameterMap.ContainsKey(SFSessionParameter.QUERY_CONTEXT_CACHE_SIZE))) + { + string val = ParameterMap[SFSessionParameter.QUERY_CONTEXT_CACHE_SIZE].ToString(); + _queryContextCacheSize = Int32.Parse(val); + _queryContextCache.SetCapacity(_queryContextCacheSize); + } + } + + internal void UpdateQueryContextCache(ResponseQueryContext queryContext) + { + if (!_disableQueryContextCache) + { + _queryContextCache.Update(queryContext); + } + } + + internal RequestQueryContext GetQueryContextRequest() + { + if (_disableQueryContextCache) + { + return null; + } + return _queryContextCache.GetQueryContextRequest(); } internal void UpdateDatabaseAndSchema(string databaseName, string schemaName) diff --git a/Snowflake.Data/Core/Session/SFSessionParameter.cs b/Snowflake.Data/Core/Session/SFSessionParameter.cs index cb980c938..4212f5b35 100755 --- a/Snowflake.Data/Core/Session/SFSessionParameter.cs +++ b/Snowflake.Data/Core/Session/SFSessionParameter.cs @@ -11,5 +11,6 @@ internal enum SFSessionParameter CLIENT_VALIDATE_DEFAULT_PARAMETERS, CLIENT_STAGE_ARRAY_BINDING_THRESHOLD, CLIENT_SESSION_KEEP_ALIVE, + QUERY_CONTEXT_CACHE_SIZE, } } diff --git a/Snowflake.Data/Core/Session/SFSessionProperty.cs b/Snowflake.Data/Core/Session/SFSessionProperty.cs index 7403f3b31..993a9279b 100755 --- a/Snowflake.Data/Core/Session/SFSessionProperty.cs +++ b/Snowflake.Data/Core/Session/SFSessionProperty.cs @@ -84,6 +84,8 @@ internal enum SFSessionProperty FILE_TRANSFER_MEMORY_THRESHOLD, [SFSessionPropertyAttr(required = false, defaultValue = "true")] INCLUDERETRYREASON, + [SFSessionPropertyAttr(required = false, defaultValue = "false")] + DISABLEQUERYCONTEXTCACHE, } class SFSessionPropertyAttr : Attribute