Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add lock on qcc to fix sync issue #814

Merged
3 commits merged into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFDbCommandIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,51 @@ public void TestExecAsyncAPI()
}
}

[Test]
public void TestExecAsyncAPIParallel()
{
SnowflakeDbConnectionPool.ClearAllPools();
using (DbConnection conn = new SnowflakeDbConnection())
{
conn.ConnectionString = ConnectionString;

Task connectTask = conn.OpenAsync(CancellationToken.None);
connectTask.Wait();
Assert.AreEqual(ConnectionState.Open, conn.State);

Task[] taskArray = new Task[5];
for (int i = 0; i < taskArray.Length; i++)
{
taskArray[i] = Task.Factory.StartNew(() =>
{
using (DbCommand cmd = conn.CreateCommand())
{
long queryResult = 0;
cmd.CommandText = "select count(seq4()) from table(generator(timelimit => 3)) v";
Task<DbDataReader> execution = cmd.ExecuteReaderAsync();
Task readCallback = execution.ContinueWith((t) =>
{
using (DbDataReader reader = t.Result)
{
Assert.IsTrue(reader.Read());
queryResult = reader.GetInt64(0);
Assert.IsFalse(reader.Read());
}
});
// query is not finished yet, result is still 0;
Assert.AreEqual(0, queryResult);
// block till query finished
readCallback.Wait();
// queryResult should be updated by callback
Assert.AreNotEqual(0, queryResult);
}
});
}
Task.WaitAll(taskArray);
conn.Close();
}
}

[Test]
public void TestCancelExecuteAsync()
{
Expand Down
19 changes: 19 additions & 0 deletions Snowflake.Data.Tests/UnitTests/QueryContextCacheTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ public void TestMoreThanCapacity()
AssertCacheData();
}

[Test]
public void TestChangingCapacity()
{
InitCacheWithData();

// Add one more element at the end
int i = MaxCapacity;
_qcc.SetCapacity(MaxCapacity + 1);
_qcc.Merge(BaseId + i, BaseReadTimestamp + i, BasePriority + i, Context);
_qcc.SyncPriorityMap();
_qcc.CheckCacheCapacity();
Assert.IsTrue(_qcc.GetSize() == MaxCapacity + 1);

// reduce the capacity back
_qcc.SetCapacity(MaxCapacity);
// Compare elements
AssertCacheData();
}

[Test]
public void TestUpdateTimestamp()
{
Expand Down
79 changes: 48 additions & 31 deletions Snowflake.Data/Core/QueryContextCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

internal class QueryContextCache
{
private readonly object _qccLock;
private int _capacity; // Capacity of the cache
private Dictionary<long, QueryContextElement> _idMap; // Map for id and QCC
private Dictionary<long, QueryContextElement> _priorityMap; // Map for priority and QCC
Expand All @@ -78,6 +79,7 @@

public QueryContextCache(int capacity)
{
_qccLock = new object();
_capacity = capacity;
_idMap = new Dictionary<long, QueryContextElement>();
_priorityMap = new Dictionary<long, QueryContextElement>();
Expand Down Expand Up @@ -192,11 +194,16 @@
// check without locking first for performance reason
if (_capacity == cap)
return;
lock (_qccLock)
{
if (_capacity == cap)
return;

Check warning on line 200 in Snowflake.Data/Core/QueryContextCache.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/QueryContextCache.cs#L200

Added line #L200 was not covered by tests

_logger.Debug($"set capacity from {_capacity} to {cap}");
_capacity = cap;
CheckCacheCapacity();
LogCacheEntries();
_logger.Debug($"set capacity from {_capacity} to {cap}");
_capacity = cap;
CheckCacheCapacity();
LogCacheEntries();
}
}

/**
Expand All @@ -221,26 +228,29 @@
*/
public void Update(ResponseQueryContext queryContext)
{
// Log existing cache entries
LogCacheEntries();

if (queryContext == null || queryContext.Entries == null)
{
// Clear the cache
ClearCache();
return;
}
foreach (ResponseQueryContextElement entry in queryContext.Entries)
lock(_qccLock)
{
Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context);
}
// Log existing cache entries
LogCacheEntries();
This conversation was marked as resolved.
Show resolved Hide resolved

SyncPriorityMap();
if (queryContext == null || queryContext.Entries == null)
{
// Clear the cache
ClearCache();
return;
}
foreach (ResponseQueryContextElement entry in queryContext.Entries)
{
Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context);
}

SyncPriorityMap();

// After merging all entries, truncate to capacity
CheckCacheCapacity();
// Log existing cache entries
LogCacheEntries();
// After merging all entries, truncate to capacity
CheckCacheCapacity();
// Log existing cache entries
LogCacheEntries();
}
}

/**
Expand All @@ -251,10 +261,13 @@
{
RequestQueryContext reqQCC = new RequestQueryContext();
reqQCC.Entries = new List<RequestQueryContextElement>();
foreach (QueryContextElement elem in _cacheSet)
lock(_qccLock)
sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved
{
RequestQueryContextElement reqElem = new RequestQueryContextElement(elem);
reqQCC.Entries.Add(reqElem);
foreach (QueryContextElement elem in _cacheSet)
{
RequestQueryContextElement reqElem = new RequestQueryContextElement(elem);
reqQCC.Entries.Add(reqElem);
}
}

return reqQCC;
Expand All @@ -268,10 +281,13 @@
{
ResponseQueryContext rspQCC = new ResponseQueryContext();
rspQCC.Entries = new List<ResponseQueryContextElement>();
foreach (QueryContextElement elem in _cacheSet)
lock (_qccLock)
{
ResponseQueryContextElement rspElem = new ResponseQueryContextElement(elem);
rspQCC.Entries.Add(rspElem);
foreach (QueryContextElement elem in _cacheSet)
{
ResponseQueryContextElement rspElem = new ResponseQueryContextElement(elem);
rspQCC.Entries.Add(rspElem);
}
}

return rspQCC;
Expand Down Expand Up @@ -321,12 +337,13 @@
/** Debugging purpose, log the all entries in the cache. */
private void LogCacheEntries()
{
#if DEBUG
foreach (QueryContextElement elem in _cacheSet)
if (_logger.IsDebugEnabled())
{
_logger.Debug($"Cache Entry: id: {elem.Id} readTimestamp: {elem.ReadTimestamp} priority: {elem.Priority}");
foreach (QueryContextElement elem in _cacheSet)
{
_logger.Debug($"Cache Entry: id: {elem.Id} readTimestamp: {elem.ReadTimestamp} priority: {elem.Priority}");
}

Check warning on line 345 in Snowflake.Data/Core/QueryContextCache.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/QueryContextCache.cs#L343-L345

Added lines #L343 - L345 were not covered by tests
}
#endif
}
}
}
Loading