From 3c90d9d309935333098a9f447368f8e6dc87147f Mon Sep 17 00:00:00 2001 From: mingkuang Date: Mon, 20 May 2024 17:23:43 +0800 Subject: [PATCH] =?UTF-8?q?Fea=20#29,=20=E6=B6=88=E9=99=A4Chromium?= =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E4=B8=ADWindows=20XP=E4=B8=8D=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E7=9A=84=E7=BA=BF=E7=A8=8B=E6=B1=A0=E5=88=9B=E5=BB=BA?= =?UTF-8?q?API=20=20=20-=20=E6=B7=BB=E5=8A=A0=20CreateThreadpool=20=20=20-?= =?UTF-8?q?=20=E6=B7=BB=E5=8A=A0=20CloseThreadpool=20=20=20-=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20SetThreadpoolThreadMaximum=20=20=20-=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20SetThreadpoolThreadMinimum=20=20=20-=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20CallbackMayRunLong?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ThunksList.md | 5 + src/Shared/InterlockedQueue.h | 80 +++ src/Thunks/api-ms-win-core-threadpool.hpp | 458 ++++++++++++++++-- .../YY-Thunks.UnitTest.vcxproj | 2 + .../YY-Thunks.UnitTest.vcxproj.filters | 6 + .../api-ms-win-core-threadpool.UnitTest.cpp | 84 ++++ 6 files changed, 587 insertions(+), 48 deletions(-) create mode 100644 src/Shared/InterlockedQueue.h diff --git a/ThunksList.md b/ThunksList.md index 8dbee4e..1afd49c 100644 --- a/ThunksList.md +++ b/ThunksList.md @@ -63,6 +63,11 @@ | StartThreadpoolIo | 不存在时,内部实现。 | CancelThreadpoolIo | 不存在时,内部实现。 | WaitForThreadpoolIoCallbacks | 不存在时,调用WaitForSingleObject。 +| CreateThreadpool | 不存在时,内部实现。 +| CloseThreadpool | 不存在时,内部实现。 +| SetThreadpoolThreadMaximum | 不存在时,内部实现,自己控制最大并行数量。 +| SetThreadpoolThreadMinimum | 不存在时,忽略,并总是返回成功。 +| CallbackMayRunLong | 不存在时,自己估算系统剩余可用线程数。 ## api-ms-win-core-winrt-l1-1-0.dll | 函数 | Fallback diff --git a/src/Shared/InterlockedQueue.h b/src/Shared/InterlockedQueue.h new file mode 100644 index 0000000..13492de --- /dev/null +++ b/src/Shared/InterlockedQueue.h @@ -0,0 +1,80 @@ +#pragma once + +namespace YY::Thunks::internal +{ + namespace + { + template + class InterlockedQueue + { + private: + struct Block + { + size_t uLastReadIndex = 0; + size_t uLastWriteIndex = 0; + Block* pNextBlock = nullptr; + Entry* arrLoopBuffer[kMaxBlockSize]; + + bool IsEmpty() + { + return uLastReadIndex == uLastWriteIndex; + } + + bool IsFull() + { + return uLastReadIndex + kMaxBlockSize == uLastWriteIndex; + } + }; + + Block* pFirstReadBlock = nullptr; + Block* pLastWriteBlock = nullptr; + + public: + Entry* Pop() noexcept + { + if (!pFirstReadBlock) + return nullptr; + + for (;;) + { + // 当前块任然有元素? + if (!pFirstReadBlock->IsEmpty()) + { + auto _pTmp = pFirstReadBlock->arrLoopBuffer[pFirstReadBlock->uLastReadIndex % kMaxBlockSize]; + pFirstReadBlock->uLastReadIndex += 1; + return _pTmp; + } + + // 尝试流转到下一块 + if (!pFirstReadBlock->pNextBlock) + return nullptr; + + auto _pPendingDelete = pFirstReadBlock; + pFirstReadBlock = pFirstReadBlock->pNextBlock; + Delete(_pPendingDelete); + } + + return nullptr; + } + + void Push(_In_ Entry* _pEntry) + { + if (!pLastWriteBlock) + { + pFirstReadBlock = pLastWriteBlock = New(); + } + + // 如果满了就尝试链接到下一块 + if (pLastWriteBlock->IsFull()) + { + auto _pNextBlock = New(); + pLastWriteBlock->pNextBlock = _pNextBlock; + pLastWriteBlock = _pNextBlock; + } + + pLastWriteBlock->arrLoopBuffer[pLastWriteBlock->uLastWriteIndex % kMaxBlockSize] = _pEntry; + pLastWriteBlock->uLastWriteIndex += 1; + } + }; + } +} diff --git a/src/Thunks/api-ms-win-core-threadpool.hpp b/src/Thunks/api-ms-win-core-threadpool.hpp index bda2363..dfe0857 100644 --- a/src/Thunks/api-ms-win-core-threadpool.hpp +++ b/src/Thunks/api-ms-win-core-threadpool.hpp @@ -2,6 +2,7 @@ #include #ifdef YY_Thunks_Implemented +#include struct _TP_CLEANUP_GROUP_FUNCS { @@ -164,6 +165,9 @@ struct _TP_IO : public _TP_WORK //不完善的定义,我们用多少,处理多少。 struct _TP_CALLBACK_INSTANCE { + //0x25, TpCallbackMayRunLong会修改这个值 + bool bCallbackMayRunLong; + //0x40 PCRITICAL_SECTION CriticalSection; //0x44 @@ -294,6 +298,260 @@ namespace YY::Thunks using TP_Timer = TP_BASE<_TP_TIMER>; using TP_Wait = TP_BASE<_TP_WAIT>; + static void __fastcall DoWhenCallbackReturns(_TP_CALLBACK_INSTANCE* Instance); + + // XP系统默认值是500,我们难以知道 + volatile LONG uAvailableWorkerCount = 500; + + // 结构跟微软完全不同!!! + class TP_Pool + { + static constexpr DWORD kDefaultParallelMaximum = 200; + + volatile ULONG uRef = 1; + + // 允许并行执行的最大个数,0代表默认值,Vista系统默认值为200。 + volatile DWORD uParallelMaximum = 0; + + internal::InterlockedQueue oTaskQueue; + + // |uWeakCount| bPushLock | bStopWakeup | bPushLock | + // | 31 ~ 3 | 2 | 1 | 0 | + union TaskRunnerFlagsType + { + uint64_t fFlags64; + uint32_t uWakeupCountAndPushLock; + + struct + { + volatile uint32_t bPushLock : 1; + volatile uint32_t bStopWakeup : 1; + volatile uint32_t bPopLock : 1; + int32_t uWakeupCount : 29; + // 当前已经启动的线程数 + uint32_t uParallelCurrent; + }; + }; + TaskRunnerFlagsType TaskRunnerFlags = { 0 }; + enum : uint32_t + { + LockedQueuePushBitIndex = 0, + StopWakeupBitIndex, + LockedQueuePopBitIndex, + WakeupCountStartBitIndex, + WakeupOnceRaw = 1 << WakeupCountStartBitIndex, + UnlockQueuePushLockBitAndWakeupOnceRaw = WakeupOnceRaw - (1u << LockedQueuePushBitIndex), + }; + + static_assert(sizeof(TaskRunnerFlags) == sizeof(uint64_t)); + + public: + TP_Pool(bool _bDefault = false) + : uRef(_bDefault? UINT32_MAX : 1) + { + } + + TP_Pool(const TP_Pool&) = delete; + + bool __fastcall PostWork(_In_ TP_Work* _pWork) noexcept + { + if (TaskRunnerFlags.bStopWakeup) + { + return false; + } + + _pWork->AddRef(); + _pWork->AddPendingCount(); + auto _uParallelMaximum = uParallelMaximum; + if (_uParallelMaximum == 0) + _uParallelMaximum = kDefaultParallelMaximum; + + // 将任务提交到任务队列 + AddRef(); + for (;;) + { + if (!_interlockedbittestandset((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, LockedQueuePushBitIndex)) + { + oTaskQueue.Push(_pWork); + + // 后面交换略 + _interlockedbittestandreset((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, LockedQueuePushBitIndex); + break; + } + } + + // 解除锁定,并且 WeakupCount + 1,也尝试提升 uParallelCurrent + TaskRunnerFlagsType _uOldFlags = TaskRunnerFlags; + TaskRunnerFlagsType _uNewFlags; + for (;;) + { + _uNewFlags = _uOldFlags; + _uNewFlags.uWakeupCountAndPushLock += WakeupOnceRaw; + + if (_uNewFlags.uParallelCurrent < _uParallelMaximum && _uNewFlags.uWakeupCount >= (int32_t)_uNewFlags.uParallelCurrent) + { + _uNewFlags.uParallelCurrent += 1; + } + + auto _uLast = InterlockedCompareExchange(&TaskRunnerFlags.fFlags64, _uNewFlags.fFlags64, _uOldFlags.fFlags64); + if (_uLast == _uOldFlags.fFlags64) + break; + + _uOldFlags.fFlags64 = _uLast; + } + + // 并发送数量没有提升,所以无需从线程池拉发起新任务 + if (_uOldFlags.uParallelCurrent == _uNewFlags.uParallelCurrent) + return true; + + // 如果是第一次那么再额外 AddRef,避免ExecuteTaskRunner调用时 TP_Pool 被释放了 + // ExecuteTaskRunner内部负责重新减少引用计数 + if (_uOldFlags.uParallelCurrent == 0u) + { + AddRef(); + } + + InterlockedDecrement(&uAvailableWorkerCount); + auto _bRet = QueueUserWorkItem([](LPVOID lpThreadParameter) -> DWORD + { + auto _pPool = (TP_Pool*)lpThreadParameter; + + _pPool->ExecuteTaskRunner(); + InterlockedIncrement(&uAvailableWorkerCount); + return 0; + + }, this, 0); + + if (!_bRet) + { + InterlockedIncrement(&uAvailableWorkerCount); + Release(); + } + + return true; + } + + static _Ret_notnull_ TP_Pool* __fastcall GetDefaultPool() noexcept + { + static TP_Pool s_Pool(true); + return &s_Pool; + } + + void __fastcall AddRef() noexcept + { + if (uRef != UINT32_MAX) + { + InterlockedIncrement(&uRef); + } + } + + void __fastcall Release() noexcept + { + if (uRef != UINT32_MAX) + { + if (InterlockedDecrement(&uRef) == 0) + { + internal::Delete(this); + } + } + } + + void __fastcall Close() noexcept + { + if (_interlockedbittestandset((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, StopWakeupBitIndex)) + { + return; + } + + for (;;) + { + auto _pTask = PopTask(); + if (!_pTask) + break; + + _pTask->TryReleasePendingount(); + _pTask->Release(); + Release(); + if (InterlockedAdd((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, -(LONG)WakeupOnceRaw) < WakeupOnceRaw) + break; + } + + Release(); + } + + void __fastcall SetParallelMaximum(DWORD _uParallelMaximum) + { + uParallelMaximum = _uParallelMaximum; + } + + private: + _Ret_maybenull_ TP_Work* __fastcall PopTask() noexcept + { + for (;;) + { + if (!_interlockedbittestandset((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, LockedQueuePopBitIndex)) + { + auto _pWork = oTaskQueue.Pop(); + + _interlockedbittestandreset((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, LockedQueuePopBitIndex); + return _pWork; + } + } + } + + void __fastcall ExecuteTaskRunner() noexcept + { + __START: + for (;;) + { + auto _pWork = PopTask(); + if (!_pWork) + break; + + if (_pWork->TryReleaseWorkingCountAddWorkingCount()) + { + TP_CALLBACK_INSTANCE Instance = {}; + _pWork->pTaskVFuncs->pExecuteCallback(&Instance, _pWork); + Fallback::DoWhenCallbackReturns(&Instance); + + _pWork->ReleaseWorkingCount(); + } + + _pWork->Release(); + Release(); + if (InterlockedAdd((volatile LONG*)&TaskRunnerFlags.uWakeupCountAndPushLock, -(LONG)WakeupOnceRaw) < WakeupOnceRaw) + break; + } + + // 尝试释放 uParallelCurrent + TaskRunnerFlagsType _uOldFlags = TaskRunnerFlags; + TaskRunnerFlagsType _uNewFlags; + for (;;) + { + // 任然有任务,重新执行 ExecuteTaskRunner + if (_uOldFlags.uWakeupCount > 0) + { + goto __START; + } + + _uNewFlags = _uOldFlags; + _uNewFlags.uParallelCurrent -= 1; + + auto _uLast = InterlockedCompareExchange(&TaskRunnerFlags.fFlags64, _uNewFlags.fFlags64, _uOldFlags.fFlags64); + if (_uLast == _uOldFlags.fFlags64) + break; + + _uOldFlags.fFlags64 = _uLast; + } + + if (_uNewFlags.uParallelCurrent == 0u) + { + // 对应上面 QueueUserWorkItem 的AddRef(); + Release(); + } + } + }; + static void __fastcall DoWhenCallbackReturns(_TP_CALLBACK_INSTANCE* Instance) { if (auto CriticalSection = Instance->CriticalSection) @@ -583,36 +841,10 @@ namespace YY::Thunks return &TppWorkpTaskVFuncs; } - static void __fastcall TppWorkPost(TP_Work* pwk) + static void __fastcall TppWorkPost(TP_Work* _pWork) { - //增加一次引用。 - pwk->AddRef(); - pwk->AddPendingCount(); - - auto bRet = QueueUserWorkItem([](LPVOID lpThreadParameter) -> DWORD - { - auto pwk = (TP_Work*)lpThreadParameter; - - if (pwk->TryReleaseWorkingCountAddWorkingCount()) - { - TP_CALLBACK_INSTANCE Instance = {}; - pwk->pTaskVFuncs->pExecuteCallback(&Instance, pwk); - Fallback::DoWhenCallbackReturns(&Instance); - - pwk->ReleaseWorkingCount(); - } - - pwk->Release(); - return 0; - - }, pwk, 0); - - if (!bRet) - { - //QueueUserWorkItem失败,重新减少引用计数 - pwk->TryReleasePendingount(); - pwk->Release(); - } + auto _pPool = _pWork->Pool ? reinterpret_cast(_pWork->Pool) : TP_Pool::GetDefaultPool(); + _pPool->PostWork(_pWork); } //暂时我们不需要实现 GetTppSimplepCleanupGroupMemberVFuncs @@ -756,7 +988,7 @@ namespace YY::Thunks void WINAPI TpReleaseWork( - _TP_WORK* Work) + TP_Work* Work) { if (Work == nullptr || (Work->uFlags1 & 0x10000) || Work->VFuncs != Fallback::GetTppWorkpCleanupGroupMemberVFuncs()) { @@ -768,10 +1000,7 @@ namespace YY::Thunks { Work->un58 = _ReturnAddress(); - if (InterlockedExchangeAdd(&Work->nRef, -1) == 0) - { - Work->VFuncs->pfnTppWorkpFree(Work); - } + Work->Release(); } } @@ -876,7 +1105,6 @@ namespace YY::Thunks return Status; } - } } #endif @@ -935,7 +1163,7 @@ namespace YY::Thunks return pCloseThreadpoolWork(pwk); } - Fallback::TpReleaseWork(pwk); + Fallback::TpReleaseWork(static_cast(pwk)); } #endif @@ -1540,30 +1768,25 @@ namespace YY::Thunks return pCloseThreadpoolWait(Wait); } - if (Wait == nullptr || (Wait->uFlags1 & 0x10000) || Wait->VFuncs != Fallback::GetTppWaitpCleanupGroupMemberVFuncs()) + auto _pWait = static_cast(Wait); + if (_pWait == nullptr || (_pWait->uFlags1 & 0x10000) || _pWait->VFuncs != Fallback::GetTppWaitpCleanupGroupMemberVFuncs()) { internal::RaiseStatus(STATUS_INVALID_PARAMETER); return; } - if (Fallback::TppCleanupGroupMemberRelease(Wait, true)) + if (Fallback::TppCleanupGroupMemberRelease(_pWait, true)) { - Wait->un58 = _ReturnAddress(); + _pWait->un58 = _ReturnAddress(); - if (auto hOrgWaitObject = InterlockedExchangePointer(&Wait->hWaitObject, nullptr)) + if (auto hOrgWaitObject = InterlockedExchangePointer(&_pWait->hWaitObject, nullptr)) { UnregisterWait(hOrgWaitObject); - if (InterlockedExchangeAdd(&Wait->nRef, -1) == 0) - { - Wait->VFuncs->pfnTppWorkpFree(Wait); - } + _pWait->Release(); } - if (InterlockedExchangeAdd(&Wait->nRef, -1) == 0) - { - Wait->VFuncs->pfnTppWorkpFree(Wait); - } + _pWait->Release(); } } #endif @@ -1977,4 +2200,143 @@ namespace YY::Thunks Fallback::TppWorkWait(_pIo, _bCancelPendingCallbacks, true); } #endif + + +#if (YY_Thunks_Support_Version < NTDDI_WIN6) + + // 最低受支持的客户端 Windows Vista [桌面应用 | UWP 应用] + // 最低受支持的服务器 Windows Server 2008[桌面应用 | UWP 应用] + __DEFINE_THUNK( + kernel32, + 4, + PTP_POOL, + WINAPI, + CreateThreadpool, + _Reserved_ PVOID _pReserved + ) + { + if (auto const _pfnCreateThreadpool = try_get_CreateThreadpool()) + { + return _pfnCreateThreadpool(_pReserved); + } + + auto _pPool = internal::New(); + if (!_pPool) + { + SetLastError(ERROR_NOT_ENOUGH_MEMORY); + } + return reinterpret_cast(_pPool); + } +#endif + + +#if (YY_Thunks_Support_Version < NTDDI_WIN6) + + // 最低受支持的客户端 Windows Vista [桌面应用 | UWP 应用] + // 最低受支持的服务器 Windows Server 2008[桌面应用 | UWP 应用] + __DEFINE_THUNK( + kernel32, + 4, + VOID, + WINAPI, + CloseThreadpool, + _Inout_ PTP_POOL _pPool2 + ) + { + if (auto const _pfnCloseThreadpool = try_get_CloseThreadpool()) + { + return _pfnCloseThreadpool(_pPool2); + } + auto _pPool = reinterpret_cast(_pPool2); + if (_pPool) + { + _pPool->Close(); + } + } +#endif + + +#if (YY_Thunks_Support_Version < NTDDI_WIN6) + + // 最低受支持的客户端 Windows Vista [桌面应用 | UWP 应用] + // 最低受支持的服务器 Windows Server 2008[桌面应用 | UWP 应用] + __DEFINE_THUNK( + kernel32, + 8, + VOID, + WINAPI, + SetThreadpoolThreadMaximum, + _Inout_ PTP_POOL _pPool2, + _In_ DWORD _cthrdMost + ) + { + if (auto const _pfnSetThreadpoolThreadMaximum = try_get_SetThreadpoolThreadMaximum()) + { + return _pfnSetThreadpoolThreadMaximum(_pPool2, _cthrdMost); + } + + auto _pPool = reinterpret_cast(_pPool2); + if (!_pPool) + _pPool = Fallback::TP_Pool::GetDefaultPool(); + + _pPool->SetParallelMaximum(_cthrdMost); + } +#endif + + +#if (YY_Thunks_Support_Version < NTDDI_WIN6) + + // 最低受支持的客户端 Windows Vista [桌面应用 | UWP 应用] + // 最低受支持的服务器 Windows Server 2008[桌面应用 | UWP 应用] + __DEFINE_THUNK( + kernel32, + 8, + BOOL, + WINAPI, + SetThreadpoolThreadMinimum, + _Inout_ PTP_POOL _pPool2, + _In_ DWORD _cthrdMic + ) + { + if (auto const _pfnSetThreadpoolThreadMinimum = try_get_SetThreadpoolThreadMinimum()) + { + return _pfnSetThreadpoolThreadMinimum(_pPool2, _cthrdMic); + } + + // YY-Thunks因为底层调用的是QueueUserWorkItem + // 无法限制最小线程数量,先忽略把…… + return TRUE; + } +#endif + + +#if (YY_Thunks_Support_Version < NTDDI_WIN6) + + // 最低受支持的客户端 Windows Vista [桌面应用 | UWP 应用] + // 最低受支持的服务器 Windows Server 2008[桌面应用 | UWP 应用] + __DEFINE_THUNK( + kernel32, + 4, + BOOL, + WINAPI, + CallbackMayRunLong, + _Inout_ PTP_CALLBACK_INSTANCE pci + ) + { + if (auto const _pfnCallbackMayRunLong = try_get_CallbackMayRunLong()) + { + return _pfnCallbackMayRunLong(pci); + } + + if (pci == nullptr || pci->bCallbackMayRunLong) + { + internal::RaiseStatus(STATUS_INVALID_PARAMETER); + } + + pci->bCallbackMayRunLong = true; + // 底层调用的是QueueUserWorkItem,我们无法完全知道现在可用的线程情况 + // 自己记录一个 AvailableWorkerCount凑合用吧…… + return Fallback::uAvailableWorkerCount > 0; + } +#endif } diff --git a/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj b/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj index 9c892c7..37658ae 100644 --- a/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj +++ b/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj @@ -204,7 +204,9 @@ + + diff --git a/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj.filters b/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj.filters index d2f9a08..4bb5ff1 100644 --- a/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj.filters +++ b/src/YY-Thunks.UnitTest/YY-Thunks.UnitTest.vcxproj.filters @@ -269,6 +269,12 @@ 头文件 + + Shared + + + Shared + diff --git a/src/YY-Thunks.UnitTest/api-ms-win-core-threadpool.UnitTest.cpp b/src/YY-Thunks.UnitTest/api-ms-win-core-threadpool.UnitTest.cpp index 206c23b..59905b3 100644 --- a/src/YY-Thunks.UnitTest/api-ms-win-core-threadpool.UnitTest.cpp +++ b/src/YY-Thunks.UnitTest/api-ms-win-core-threadpool.UnitTest.cpp @@ -1869,4 +1869,88 @@ namespace api_ms_win_core_threadpool ::CloseThreadpoolIo(_pIo); } }; + + + TEST_CLASS(SetThreadpoolThreadMaximum) + { + AwaysNullGuard Guard; + + public: + SetThreadpoolThreadMaximum() + { + Guard |= YY::Thunks::aways_null_try_get_CreateThreadpool; + Guard |= YY::Thunks::aways_null_try_get_CloseThreadpool; + Guard |= YY::Thunks::aways_null_try_get_SetThreadpoolThreadMaximum; + Guard |= YY::Thunks::aways_null_try_get_CreateThreadpoolWork; + Guard |= YY::Thunks::aways_null_try_get_CloseThreadpoolWork; + Guard |= YY::Thunks::aways_null_try_get_SubmitThreadpoolWork; + Guard |= YY::Thunks::aways_null_try_get_WaitForThreadpoolWorkCallbacks; + } + + TEST_METHOD(最大上限测试) + { + constexpr DWORD kThreadMaximum = 3; + constexpr DWORD kWorkPostCount = 100; + + struct TestInfo + { + volatile DWORD uCount = 0; + volatile DWORD uMax = 0; + volatile DWORD uRunCount = 0; + }; + + TestInfo _TestInfo; + + auto _pPool = ::CreateThreadpool(nullptr); + Assert::IsNotNull(_pPool); + + + ::SetThreadpoolThreadMaximum(_pPool, kThreadMaximum); + TP_CALLBACK_ENVIRON CallbackEnviron; + ::InitializeThreadpoolEnvironment(&CallbackEnviron); + ::SetThreadpoolCallbackPool(&CallbackEnviron, _pPool); + + auto _pWork = ::CreateThreadpoolWork([](_Inout_ PTP_CALLBACK_INSTANCE Instance, + _Inout_ PVOID Context, + _Inout_ PTP_WORK Work) + { + auto& _TestInfo = *(TestInfo*)Context; + InterlockedIncrement(&_TestInfo.uRunCount); + auto _uNew = InterlockedIncrement(&_TestInfo.uCount); + + for (auto _uMax = _TestInfo.uMax;;) + { + if (_uMax >= _uNew) + break; + + auto _uLast = InterlockedCompareExchange(&_TestInfo.uMax, _uNew, _uMax); + if (_uLast == _uMax) + break; + _uMax = _uLast; + } + + Sleep(10); + + InterlockedDecrement(&_TestInfo.uCount); + + }, & _TestInfo, & CallbackEnviron); + + Assert::IsNotNull(_pWork); + + for (int i = 0; i != kWorkPostCount; ++i) + { + ::SubmitThreadpoolWork(_pWork); + } + + ::WaitForThreadpoolWorkCallbacks(_pWork, FALSE); + + ::CloseThreadpoolWork(_pWork); + + ::CloseThreadpool(_pPool); + + Assert::AreEqual((DWORD)_TestInfo.uCount, 0ul); + Assert::AreEqual((DWORD)_TestInfo.uMax, 3ul); + Assert::AreEqual((DWORD)_TestInfo.uRunCount, kWorkPostCount); + } + }; }