Skip to content

Commit

Permalink
Darwin: PlatformManagerImpl improvements (#32904)
Browse files Browse the repository at this point in the history
* Small improvements to TestPlatformMgr.cpp

Use NL_TEST_ASSERT_SUCCESS when checking errors
Add a few more assertions

* Add AtomicGlobal<T> as a thread-safe variant of Global<T>

It is simply an alias for the same type if CHIP_CONFIG_GLOBALS_LAZY_INIT is not
enabled, as the eager implementation of Global<T> is thread-safe anyway.

Use "friend" instead of "friend class" where the type might be an alias.

* Darwin: PlatformManagerImpl improvements

Make PlatformMgr[Impl]() thread-safe by using an AtomicGlobal
Make GetWorkQueue() thread-safe by creating the queue in the constructor
Make {Start,Stop}EventLoopTask thread-safe using an atomic for state
Signal the sempahore only if non-null, no matter where stop is called from
Other minor tweaks

* Don't reference std::call_once unless CHIP_CONFIG_GLOBALS_LAZY_INIT=1
  • Loading branch information
ksperling-apple authored and shaoltan-amazon committed Apr 10, 2024
1 parent 85388a1 commit 8f676d7
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 117 deletions.
86 changes: 73 additions & 13 deletions src/lib/core/Global.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,65 @@

#include <lib/core/CHIPConfig.h>

#include <mutex>
#include <new>

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
#include <dispatch/dispatch.h>
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

namespace chip {
namespace detail {

#if CHIP_CONFIG_GLOBALS_LAZY_INIT

struct NonAtomicOnce
{
bool mInitialized = false;
void call(void (*func)(void *), void * context)
{
if (!mInitialized)
{
mInitialized = true;
func(context);
}
}
};

struct AtomicOnce
{
// dispatch_once (if available) is more efficient than std::call_once because
// it takes advantage of the additional assumption that the dispatch_once_t
// is allocated within a static / global.
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_once_t mOnce = 0;
void call(void (*func)(void *), void * context) { dispatch_once_f(&mOnce, context, func); }
#else // CHIP_SYSTEM_CONFIG_USE_DISPATCH
std::once_flag mOnce;
void call(void (*func)(void *), void * context) { std::call_once(mOnce, func, context); }
#endif
};

#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT

} // namespace detail

/**
* A wrapper for global object that enables initialization and destruction to
* be configured by the platform via `CHIP_CONFIG_GLOBALS_*` options.
*
* The contained object of type T is default constructed, possibly lazily.
*
* This class is generally NOT thread-safe; external synchronization is required.
* Values of this type MUST be globals or static class members.
*
* This class is not thread-safe; external synchronization is required.
* @see AtomicGlobal<T> for a thread-safe variant.
*/
#if CHIP_CONFIG_GLOBALS_LAZY_INIT
template <class T, class OnceStrategy = detail::NonAtomicOnce>
#else // CHIP_CONFIG_GLOBALS_LAZY_INIT
template <class T>
#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT
class Global
{
public:
Expand All @@ -40,6 +86,11 @@ class Global
T & get() { return _get(); }
T * operator->() { return &_get(); }

// Globals are not copyable or movable
Global(const Global &) = delete;
Global(const Global &&) = delete;
Global & operator=(const Global &) = delete;

#if CHIP_CONFIG_GLOBALS_LAZY_INIT
public:
constexpr Global() = default;
Expand All @@ -49,24 +100,22 @@ class Global
// Zero-initialize everything. We should technically leave mStorage uninitialized,
// but that can sometimes cause clang to be unable to constant-initialize the object.
alignas(T) unsigned char mStorage[sizeof(T)] = {};
bool mInitialized = false;

T & _value() { return *reinterpret_cast<T *>(mStorage); }
OnceStrategy mOnce;

T & _get()
{
if (!mInitialized)
{
new (mStorage) T();
mInitialized = true;
T * value = reinterpret_cast<T *>(mStorage);
mOnce.call(&create, value);
return *value;
}
static void create(void * value)
{
new (value) T();
#if !CHIP_CONFIG_GLOBALS_NO_DESTRUCT
CHIP_CXA_ATEXIT(&destroy, this);
CHIP_CXA_ATEXIT(&destroy, value);
#endif // CHIP_CONFIG_GLOBALS_NO_DESTRUCT
}
return _value();
}

static void destroy(void * context) { static_cast<Global<T> *>(context)->_value().~T(); }
static void destroy(void * value) { static_cast<T *>(value)->~T(); }

#else // CHIP_CONFIG_GLOBALS_LAZY_INIT
public:
Expand Down Expand Up @@ -100,4 +149,15 @@ class Global
#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT
};

/**
* A variant of Global<T> that is thread-safe.
*/
template <class T>
using AtomicGlobal =
#if CHIP_CONFIG_GLOBALS_LAZY_INIT
Global<T, detail::AtomicOnce>;
#else // CHIP_CONFIG_GLOBALS_LAZY_INIT
Global<T>; // eager globals are already thread-safe
#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT

} // namespace chip
2 changes: 1 addition & 1 deletion src/lib/dnssd/Discovery_ImplPlatform.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DiscoveryImplPlatform : public ServiceAdvertiser, public Resolver
uint8_t mCommissionableInstanceName[sizeof(uint64_t)];
OperationalResolveDelegate * mOperationalDelegate = nullptr;

friend class Global<DiscoveryImplPlatform>;
friend Global<DiscoveryImplPlatform>;
static Global<DiscoveryImplPlatform> sManager;
};

Expand Down
2 changes: 1 addition & 1 deletion src/platform/Darwin/DnssdImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MdnsContexts

private:
MdnsContexts() = default;
friend class Global<MdnsContexts>;
friend Global<MdnsContexts>;
static Global<MdnsContexts> sInstance;

std::vector<GenericContext *> mContexts;
Expand Down
108 changes: 37 additions & 71 deletions src/platform/Darwin/PlatformManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#endif

#include <platform/Darwin/DiagnosticDataProviderImpl.h>
#include <platform/Darwin/PlatformMetricKeys.h>
#include <platform/PlatformManager.h>

// Include the non-inline definitions for the GenericPlatformManagerImpl<> template,
Expand All @@ -39,22 +40,28 @@
#include <CoreFoundation/CoreFoundation.h>
#include <tracing/metric_event.h>

#import "PlatformMetricKeys.h"
using namespace chip::Tracing::DarwinPlatform;

namespace chip {
namespace DeviceLayer {

Global<PlatformManagerImpl> PlatformManagerImpl::sInstance;
AtomicGlobal<PlatformManagerImpl> PlatformManagerImpl::sInstance;

CHIP_ERROR PlatformManagerImpl::_InitChipStack()
PlatformManagerImpl::PlatformManagerImpl() :
mWorkQueue(dispatch_queue_create("org.csa-iot.matter.workqueue",
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL,
QOS_CLASS_USER_INITIATED, QOS_MIN_RELATIVE_PRIORITY)))
{
CHIP_ERROR err;
// Tag our queue for IsWorkQueueCurrentQueue()
dispatch_queue_set_specific(mWorkQueue, this, this, nullptr);
dispatch_suspend(mWorkQueue);
}

CHIP_ERROR PlatformManagerImpl::_InitChipStack()
{
// Initialize the configuration system.
#if !CHIP_DISABLE_PLATFORM_KVS
err = Internal::PosixConfig::Init();
SuccessOrExit(err);
ReturnErrorOnFailure(Internal::PosixConfig::Init());
#endif // CHIP_DISABLE_PLATFORM_KVS

#if !CHIP_SYSTEM_CONFIG_USE_LIBEV
Expand All @@ -64,8 +71,7 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()

// Call _InitChipStack() on the generic implementation base class
// to finish the initialization process.
err = Internal::GenericPlatformManagerImpl<PlatformManagerImpl>::_InitChipStack();
SuccessOrExit(err);
ReturnErrorOnFailure(Internal::GenericPlatformManagerImpl<PlatformManagerImpl>::_InitChipStack());

#if !CHIP_DISABLE_PLATFORM_KVS
// Now set up our device instance info provider. We couldn't do that
Expand All @@ -74,53 +80,36 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()
#endif // CHIP_DISABLE_PLATFORM_KVS

mStartTime = System::SystemClock().GetMonotonicTimestamp();

exit:
return err;
return CHIP_NO_ERROR;
}

CHIP_ERROR PlatformManagerImpl::_StartEventLoopTask()
{
if (mIsWorkQueueSuspended)
{
mIsWorkQueueSuspended = false;
dispatch_resume(mWorkQueue);
}

auto expected = WorkQueueState::kSuspended;
VerifyOrReturnError(mWorkQueueState.compare_exchange_strong(expected, WorkQueueState::kRunning), CHIP_ERROR_INCORRECT_STATE);
dispatch_resume(mWorkQueue);
return CHIP_NO_ERROR;
};

CHIP_ERROR PlatformManagerImpl::_StopEventLoopTask()
{
if (!mIsWorkQueueSuspended && !mIsWorkQueueSuspensionPending)
{
mIsWorkQueueSuspensionPending = true;
if (!IsWorkQueueCurrentQueue())
{
// dispatch_sync is used in order to guarantee serialization of the caller with
// respect to any tasks that might already be on the queue, or running.
dispatch_sync(mWorkQueue, ^{
dispatch_suspend(mWorkQueue);
});

mIsWorkQueueSuspended = true;
mIsWorkQueueSuspensionPending = false;
}
else
auto expected = WorkQueueState::kRunning;
VerifyOrReturnError(mWorkQueueState.compare_exchange_strong(expected, WorkQueueState::kSuspensionPending),
CHIP_ERROR_INCORRECT_STATE);

// We need to dispatch to the work queue to ensure any currently queued jobs
// finish executing. When called from outside the work queue we also need to
// wait for them to complete before returning to the caller, so we use
// dispatch_sync in that case.
(IsWorkQueueCurrentQueue() ? dispatch_async : dispatch_sync)(mWorkQueue, ^{
dispatch_suspend(mWorkQueue);
mWorkQueueState.store(WorkQueueState::kSuspended);
auto * semaphore = mRunLoopSem;
if (semaphore != nullptr)
{
// We are called from a task running on our work queue. Dispatch async,
// so we don't deadlock ourselves. Note that we do have to dispatch to
// guarantee that we don't signal the semaphore until we have ensured
// that no more tasks will run on the queue.
dispatch_async(mWorkQueue, ^{
dispatch_suspend(mWorkQueue);
mIsWorkQueueSuspended = true;
mIsWorkQueueSuspensionPending = false;
dispatch_semaphore_signal(mRunLoopSem);
});
dispatch_semaphore_signal(semaphore);
}
}

});
return CHIP_NO_ERROR;
}

Expand All @@ -147,47 +136,24 @@ void PlatformManagerImpl::_Shutdown()

CHIP_ERROR PlatformManagerImpl::_PostEvent(const ChipDeviceEvent * event)
{
if (mWorkQueue == nullptr)
{
return CHIP_ERROR_INCORRECT_STATE;
}

const ChipDeviceEvent eventCopy = *event;
dispatch_async(mWorkQueue, ^{
Impl()->DispatchEvent(&eventCopy);
DispatchEvent(&eventCopy);
});
return CHIP_NO_ERROR;
}

#if CHIP_STACK_LOCK_TRACKING_ENABLED
bool PlatformManagerImpl::_IsChipStackLockedByCurrentThread() const
{
// If we have no work queue, or it's suspended, then we assume our caller
// knows what they are doing in terms of their own concurrency.
return !mWorkQueue || mIsWorkQueueSuspended || IsWorkQueueCurrentQueue();
// Assume our caller knows what they are doing in terms of concurrency if the work queue is suspended.
return IsWorkQueueCurrentQueue() || mWorkQueueState.load() == WorkQueueState::kSuspended;
};
#endif

static int sPlatformManagerKey; // We use pointer to this as key.

dispatch_queue_t PlatformManagerImpl::GetWorkQueue()
{
if (mWorkQueue == nullptr)
{
mWorkQueue =
dispatch_queue_create(CHIP_CONTROLLER_QUEUE,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL,
QOS_CLASS_USER_INITIATED, QOS_MIN_RELATIVE_PRIORITY));
dispatch_suspend(mWorkQueue);
dispatch_queue_set_specific(mWorkQueue, &sPlatformManagerKey, this, nullptr);
mIsWorkQueueSuspended = true;
}
return mWorkQueue;
}

bool PlatformManagerImpl::IsWorkQueueCurrentQueue() const
{
return dispatch_get_specific(&sPlatformManagerKey) == this;
return dispatch_get_specific(this) == this;
}

CHIP_ERROR PlatformManagerImpl::StartBleScan(BleScannerDelegate * delegate)
Expand Down
32 changes: 18 additions & 14 deletions src/platform/Darwin/PlatformManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
#include <lib/core/Global.h>
#include <platform/internal/GenericPlatformManagerImpl.h>

#include <atomic>
#include <dispatch/dispatch.h>

static constexpr const char * const CHIP_CONTROLLER_QUEUE = "org.csa-iot.matter.framework.controller.workqueue";

namespace chip {
namespace DeviceLayer {

Expand All @@ -47,7 +46,7 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
public:
// ===== Platform-specific members that may be accessed directly by the application.

dispatch_queue_t GetWorkQueue();
dispatch_queue_t GetWorkQueue() { return mWorkQueue; }
bool IsWorkQueueCurrentQueue() const;

CHIP_ERROR StartBleScan(BleScannerDelegate * delegate = nullptr);
Expand Down Expand Up @@ -80,21 +79,26 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
friend PlatformManager & PlatformMgr(void);
friend PlatformManagerImpl & PlatformMgrImpl(void);

static Global<PlatformManagerImpl> sInstance;
friend AtomicGlobal<PlatformManagerImpl>;
static AtomicGlobal<PlatformManagerImpl> sInstance;

PlatformManagerImpl();

System::Clock::Timestamp mStartTime = System::Clock::kZero;

dispatch_queue_t mWorkQueue = nullptr;
// Semaphore used to implement blocking behavior in _RunEventLoop.
dispatch_semaphore_t mRunLoopSem;
dispatch_queue_t mWorkQueue;

bool mIsWorkQueueSuspended = false;
// TODO: mIsWorkQueueSuspensionPending might need to be an atomic and use
// atomic ops, if we're worried about calls to StopEventLoopTask() from
// multiple threads racing somehow...
bool mIsWorkQueueSuspensionPending = false;
enum class WorkQueueState
{
kSuspended,
kRunning,
kSuspensionPending,
};

inline ImplClass * Impl() { return static_cast<PlatformManagerImpl *>(this); }
std::atomic<WorkQueueState> mWorkQueueState = WorkQueueState::kSuspended;

// Semaphore used to implement blocking behavior in _RunEventLoop.
dispatch_semaphore_t mRunLoopSem;
};

/**
Expand All @@ -112,7 +116,7 @@ inline PlatformManager & PlatformMgr(void)
* Returns the platform-specific implementation of the PlatformManager singleton object.
*
* chip applications can use this to gain access to features of the PlatformManager
* that are specific to the ESP32 platform.
* that are specific to the platform.
*/
inline PlatformManagerImpl & PlatformMgrImpl(void)
{
Expand Down
Loading

0 comments on commit 8f676d7

Please sign in to comment.