Skip to content

Commit

Permalink
add lock on qcc to fix sync issue (#814)
Browse files Browse the repository at this point in the history
### Description
fix for sdk issue 755: Intermittent exception with query context cache

### Checklist
- [x] Code compiles correctly
- [x] Code is formatted according to [Coding
Conventions](../CodingConventions.md)
- [x] Created tests which fail without the change (if possible)
- [x] All tests passing (`dotnet test`)
- [ ] Extended the README / documentation, if necessary
- [x] Provide JIRA issue id (if possible) or GitHub issue id in PR name
  • Loading branch information
Harry Xi authored Nov 30, 2023
1 parent 1a0809a commit 2dce725
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 31 deletions.
45 changes: 45 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFDbCommandIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,51 @@ public void TestExecAsyncAPI()
}
}

[Test]
public void TestExecAsyncAPIParallel()
{
SnowflakeDbConnectionPool.ClearAllPools();
using (DbConnection conn = new SnowflakeDbConnection())
{
conn.ConnectionString = ConnectionString;

Task connectTask = conn.OpenAsync(CancellationToken.None);
connectTask.Wait();
Assert.AreEqual(ConnectionState.Open, conn.State);

Task[] taskArray = new Task[5];
for (int i = 0; i < taskArray.Length; i++)
{
taskArray[i] = Task.Factory.StartNew(() =>
{
using (DbCommand cmd = conn.CreateCommand())
{
long queryResult = 0;
cmd.CommandText = "select count(seq4()) from table(generator(timelimit => 3)) v";
Task<DbDataReader> execution = cmd.ExecuteReaderAsync();
Task readCallback = execution.ContinueWith((t) =>
{
using (DbDataReader reader = t.Result)
{
Assert.IsTrue(reader.Read());
queryResult = reader.GetInt64(0);
Assert.IsFalse(reader.Read());
}
});
// query is not finished yet, result is still 0;
Assert.AreEqual(0, queryResult);
// block till query finished
readCallback.Wait();
// queryResult should be updated by callback
Assert.AreNotEqual(0, queryResult);
}
});
}
Task.WaitAll(taskArray);
conn.Close();
}
}

[Test]
public void TestCancelExecuteAsync()
{
Expand Down
19 changes: 19 additions & 0 deletions Snowflake.Data.Tests/UnitTests/QueryContextCacheTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ public void TestMoreThanCapacity()
AssertCacheData();
}

[Test]
public void TestChangingCapacity()
{
InitCacheWithData();

// Add one more element at the end
int i = MaxCapacity;
_qcc.SetCapacity(MaxCapacity + 1);
_qcc.Merge(BaseId + i, BaseReadTimestamp + i, BasePriority + i, Context);
_qcc.SyncPriorityMap();
_qcc.CheckCacheCapacity();
Assert.IsTrue(_qcc.GetSize() == MaxCapacity + 1);

// reduce the capacity back
_qcc.SetCapacity(MaxCapacity);
// Compare elements
AssertCacheData();
}

[Test]
public void TestUpdateTimestamp()
{
Expand Down
79 changes: 48 additions & 31 deletions Snowflake.Data/Core/QueryContextCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public int Compare(QueryContextElement x, QueryContextElement y)

internal class QueryContextCache
{
private readonly object _qccLock;
private int _capacity; // Capacity of the cache
private Dictionary<long, QueryContextElement> _idMap; // Map for id and QCC
private Dictionary<long, QueryContextElement> _priorityMap; // Map for priority and QCC
Expand All @@ -78,6 +79,7 @@ internal class QueryContextCache

public QueryContextCache(int capacity)
{
_qccLock = new object();
_capacity = capacity;
_idMap = new Dictionary<long, QueryContextElement>();
_priorityMap = new Dictionary<long, QueryContextElement>();
Expand Down Expand Up @@ -192,11 +194,16 @@ public void SetCapacity(int cap)
// check without locking first for performance reason
if (_capacity == cap)
return;
lock (_qccLock)
{
if (_capacity == cap)
return;

_logger.Debug($"set capacity from {_capacity} to {cap}");
_capacity = cap;
CheckCacheCapacity();
LogCacheEntries();
_logger.Debug($"set capacity from {_capacity} to {cap}");
_capacity = cap;
CheckCacheCapacity();
LogCacheEntries();
}
}

/**
Expand All @@ -221,26 +228,29 @@ public int GetSize()
*/
public void Update(ResponseQueryContext queryContext)
{
// Log existing cache entries
LogCacheEntries();

if (queryContext == null || queryContext.Entries == null)
{
// Clear the cache
ClearCache();
return;
}
foreach (ResponseQueryContextElement entry in queryContext.Entries)
lock(_qccLock)
{
Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context);
}
// Log existing cache entries
LogCacheEntries();

SyncPriorityMap();
if (queryContext == null || queryContext.Entries == null)
{
// Clear the cache
ClearCache();
return;
}
foreach (ResponseQueryContextElement entry in queryContext.Entries)
{
Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context);
}

SyncPriorityMap();

// After merging all entries, truncate to capacity
CheckCacheCapacity();
// Log existing cache entries
LogCacheEntries();
// After merging all entries, truncate to capacity
CheckCacheCapacity();
// Log existing cache entries
LogCacheEntries();
}
}

/**
Expand All @@ -251,10 +261,13 @@ public RequestQueryContext GetQueryContextRequest()
{
RequestQueryContext reqQCC = new RequestQueryContext();
reqQCC.Entries = new List<RequestQueryContextElement>();
foreach (QueryContextElement elem in _cacheSet)
lock(_qccLock)
{
RequestQueryContextElement reqElem = new RequestQueryContextElement(elem);
reqQCC.Entries.Add(reqElem);
foreach (QueryContextElement elem in _cacheSet)
{
RequestQueryContextElement reqElem = new RequestQueryContextElement(elem);
reqQCC.Entries.Add(reqElem);
}
}

return reqQCC;
Expand All @@ -268,10 +281,13 @@ public ResponseQueryContext GetQueryContextResponse()
{
ResponseQueryContext rspQCC = new ResponseQueryContext();
rspQCC.Entries = new List<ResponseQueryContextElement>();
foreach (QueryContextElement elem in _cacheSet)
lock (_qccLock)
{
ResponseQueryContextElement rspElem = new ResponseQueryContextElement(elem);
rspQCC.Entries.Add(rspElem);
foreach (QueryContextElement elem in _cacheSet)
{
ResponseQueryContextElement rspElem = new ResponseQueryContextElement(elem);
rspQCC.Entries.Add(rspElem);
}
}

return rspQCC;
Expand Down Expand Up @@ -321,12 +337,13 @@ private void ReplaceQCE(QueryContextElement oldQCE, QueryContextElement newQCE)
/** Debugging purpose, log the all entries in the cache. */
private void LogCacheEntries()
{
#if DEBUG
foreach (QueryContextElement elem in _cacheSet)
if (_logger.IsDebugEnabled())
{
_logger.Debug($"Cache Entry: id: {elem.Id} readTimestamp: {elem.ReadTimestamp} priority: {elem.Priority}");
foreach (QueryContextElement elem in _cacheSet)
{
_logger.Debug($"Cache Entry: id: {elem.Id} readTimestamp: {elem.ReadTimestamp} priority: {elem.Priority}");
}
}
#endif
}
}
}

0 comments on commit 2dce725

Please sign in to comment.