Skip to content

Commit

Permalink
add lock on qcc to fix sync issue
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-hx committed Nov 17, 2023
1 parent 62f2206 commit 8137f56
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 28 deletions.
45 changes: 45 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFDbCommandIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,51 @@ public void TestExecAsyncAPI()
}
}

[Test]
public void TestExecAsyncAPIParallel()
{
SnowflakeDbConnectionPool.ClearAllPools();
using (DbConnection conn = new SnowflakeDbConnection())
{
conn.ConnectionString = ConnectionString;

Task connectTask = conn.OpenAsync(CancellationToken.None);
connectTask.Wait();
Assert.AreEqual(ConnectionState.Open, conn.State);

Task[] taskArray = new Task[100];
for (int i = 0; i < taskArray.Length; i++)
{
taskArray[i] = Task.Factory.StartNew(() =>
{
using (DbCommand cmd = conn.CreateCommand())
{
long queryResult = 0;
cmd.CommandText = "select count(seq4()) from table(generator(timelimit => 3)) v";
Task<DbDataReader> execution = cmd.ExecuteReaderAsync();
Task readCallback = execution.ContinueWith((t) =>
{
using (DbDataReader reader = t.Result)
{
Assert.IsTrue(reader.Read());
queryResult = reader.GetInt64(0);
Assert.IsFalse(reader.Read());
}
});
// query is not finished yet, result is still 0;
Assert.AreEqual(0, queryResult);
// block till query finished
readCallback.Wait();
// queryResult should be updated by callback
Assert.AreNotEqual(0, queryResult);
}
});
}
Task.WaitAll(taskArray);
conn.Close();
}
}

[Test]
public void TestCancelExecuteAsync()
{
Expand Down
67 changes: 39 additions & 28 deletions Snowflake.Data/Core/QueryContextCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public int Compare(QueryContextElement x, QueryContextElement y)

internal class QueryContextCache
{
private readonly object _qccLock;
private int _capacity; // Capacity of the cache
private Dictionary<long, QueryContextElement> _idMap; // Map for id and QCC
private Dictionary<long, QueryContextElement> _priorityMap; // Map for priority and QCC
Expand All @@ -78,6 +79,7 @@ internal class QueryContextCache

public QueryContextCache(int capacity)
{
_qccLock = new object();
_capacity = capacity;
_idMap = new Dictionary<long, QueryContextElement>();
_priorityMap = new Dictionary<long, QueryContextElement>();
Expand Down Expand Up @@ -189,14 +191,17 @@ public void ClearCache()

public void SetCapacity(int cap)
{
// check without locking first for performance reason
if (_capacity == cap)
return;

_logger.Debug($"set capacity from {_capacity} to {cap}");
_capacity = cap;
CheckCacheCapacity();
LogCacheEntries();
lock(_qccLock)
{
// check without locking first for performance reason
if (_capacity == cap)
return;

_logger.Debug($"set capacity from {_capacity} to {cap}");
_capacity = cap;
CheckCacheCapacity();
LogCacheEntries();
}

Check warning on line 204 in Snowflake.Data/Core/QueryContextCache.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/QueryContextCache.cs#L200-L204

Added lines #L200 - L204 were not covered by tests
}

/**
Expand All @@ -221,26 +226,29 @@ public int GetSize()
*/
public void Update(ResponseQueryContext queryContext)
{
// Log existing cache entries
LogCacheEntries();

if (queryContext == null || queryContext.Entries == null)
{
// Clear the cache
ClearCache();
return;
}
foreach (ResponseQueryContextElement entry in queryContext.Entries)
lock(_qccLock)
{
Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context);
}
// Log existing cache entries
LogCacheEntries();

SyncPriorityMap();
if (queryContext == null || queryContext.Entries == null)
{
// Clear the cache
ClearCache();
return;
}
foreach (ResponseQueryContextElement entry in queryContext.Entries)
{
Merge(entry.Id, entry.ReadTimestamp, entry.Priority, entry.Context);
}

// After merging all entries, truncate to capacity
CheckCacheCapacity();
// Log existing cache entries
LogCacheEntries();
SyncPriorityMap();

// After merging all entries, truncate to capacity
CheckCacheCapacity();
// Log existing cache entries
LogCacheEntries();
}
}

/**
Expand All @@ -251,10 +259,13 @@ public RequestQueryContext GetQueryContextRequest()
{
RequestQueryContext reqQCC = new RequestQueryContext();
reqQCC.Entries = new List<RequestQueryContextElement>();
foreach (QueryContextElement elem in _cacheSet)
lock(_qccLock)
{
RequestQueryContextElement reqElem = new RequestQueryContextElement(elem);
reqQCC.Entries.Add(reqElem);
foreach (QueryContextElement elem in _cacheSet)
{
RequestQueryContextElement reqElem = new RequestQueryContextElement(elem);
reqQCC.Entries.Add(reqElem);
}
}

return reqQCC;
Expand Down

0 comments on commit 8137f56

Please sign in to comment.