diff --git a/include/parlay/internal/uninitialized_sequence.h b/include/parlay/internal/uninitialized_sequence.h index bc9a2e9b..8949f375 100644 --- a/include/parlay/internal/uninitialized_sequence.h +++ b/include/parlay/internal/uninitialized_sequence.h @@ -11,14 +11,9 @@ namespace parlay { namespace internal { - -#ifndef PARLAY_USE_STD_ALLOC -template -using _uninitialized_sequence_default_allocator = parlay::allocator; -#else + template using _uninitialized_sequence_default_allocator = std::allocator; -#endif // An uninitialized fixed-size sequence container. // diff --git a/include/parlay/scheduler.h b/include/parlay/scheduler.h index 5fc882f8..aeff0c30 100644 --- a/include/parlay/scheduler.h +++ b/include/parlay/scheduler.h @@ -32,7 +32,7 @@ // // Default: true #ifndef PARLAY_ELASTIC_PARALLELISM -#define PARLAY_ELASTIC_PARALLELISM true +#define PARLAY_ELASTIC_PARALLELISM false #endif @@ -52,6 +52,16 @@ namespace parlay { +// kMaxThreadId represents an invalid thread id. A thread with thread_id equals +// kMaxThreadId is an uninitialized thread from Parlay's perspective. +inline constexpr unsigned int kMaxThreadId = + std::numeric_limits::max(); + +inline unsigned int& GetThreadId() { + static thread_local unsigned int thread_id = kMaxThreadId; + return thread_id; +} + template struct scheduler { static_assert(std::is_invocable_r_v); @@ -69,8 +79,6 @@ struct scheduler { public: unsigned int num_threads; - static thread_local unsigned int thread_id; - explicit scheduler(size_t num_workers) : num_threads(num_workers), num_deques(num_threads), @@ -81,7 +89,7 @@ struct scheduler { finished_flag(false) { // Spawn num_threads many threads on startup - thread_id = 0; // thread-local write + GetThreadId() = 0; // thread-local write for (unsigned int i = 1; i < num_threads; i++) { spawned_threads.emplace_back([&, i]() { worker(i); @@ -130,7 +138,21 @@ struct scheduler { } unsigned int num_workers() { return num_threads; } - unsigned int worker_id() { return thread_id; } + unsigned int worker_id() { return GetThreadId(); } + + // Sets the amount of sleep delay when a worker finds no work to steal. This + // controls a tradeoff between latency (how long before a worker wakes up and + // discovers a pending job to steal) and cpu load when idle (how often a + // worker tries to find a job to steal). Returns the previous value. + // NOTE: It is normally not necessary to adjust this, since the initial value + // is chosen to incur minimal load with only a small latency impact. But this + // function is available for tuning performance in special cases. + std::chrono::nanoseconds set_sleep_delay_when_idle( + std::chrono::nanoseconds new_sleep_delay_when_idle) { + auto old_value = sleep_delay_when_idle.load(); + sleep_delay_when_idle.store(new_sleep_delay_when_idle); + return old_value; + } bool finished() const noexcept { return finished_flag.load(std::memory_order_acquire); @@ -152,12 +174,20 @@ struct scheduler { std::atomic wake_up_counter{0}; std::atomic num_finished_workers{0}; + // When a worker fails to find work to steal, it performs 100 * num_queues + // sleeps of this duration (checking finished() and work_available after each + // one) before trying again to find work to steal. + std::atomic sleep_delay_when_idle = + std::chrono::duration_cast( + std::chrono::milliseconds(1)); + + // Start an individual worker task, stealing work if no local // work is available. May go to sleep if no work is available // for a long time, until woken up again when notified that // new work is available. void worker(size_t id) { - thread_id = id; // thread-local write + GetThreadId() = id; // thread-local write #if PARLAY_ELASTIC_PARALLELISM wait_for_work(); #endif @@ -288,11 +318,51 @@ struct scheduler { for (unsigned int i = 1; i < num_threads; i++) { spawned_threads[i - 1].join(); } + // Reset thread_local thread_id for the main thread before recycling it. + GetThreadId() = kMaxThreadId; } }; -template -thread_local unsigned int scheduler::thread_id = 0; +class fork_join_scheduler; + +// scheduler_internal_pointer is a data class that defines pointer types used +// internally for scheduler lifecycle management. It is conceptually equivalent +// to `scheduler_pointer` except that the latter is used externally (as a return +// type of `initialize_scheduler`). +// +// The intended use of `scheduler_internal_pointer` is to maintain a +// thread-local instance of this class for all Parlay threads (both main and +// child threads). +struct scheduler_internal_pointer { + fork_join_scheduler* raw; + std::weak_ptr weak; + + scheduler_internal_pointer() : raw(nullptr) {} +}; + +// scheduler_pointer is the external interface as the pointer type for the +// scheduler. It supports dereference operation. +struct scheduler_pointer { + fork_join_scheduler* raw; + std::shared_ptr shared; + + scheduler_pointer() : raw(nullptr) {} + + scheduler_pointer(const scheduler_pointer& other) { + raw = other.raw; + shared = other.shared; + } + + scheduler_pointer& operator=(const scheduler_pointer& other) { + raw = other.raw; + shared = other.shared; + return *this; + } + + // When dereferencing, the raw pointer must be valid. Do *not* use `shared`, + // which is for lifecycle management only. + fork_join_scheduler* operator->() { return raw; } +}; class fork_join_scheduler { using Job = WorkStealingJob; @@ -300,11 +370,129 @@ class fork_join_scheduler { // Underlying scheduler object std::unique_ptr> sched; + inline static thread_local unsigned int num_workers_to_start = 0; + public: explicit fork_join_scheduler(size_t p) : sched(std::make_unique>(p)) {} + static void set_num_workers_to_start(unsigned int num_workers) { + num_workers_to_start = num_workers; + } + + // Returns a scheduler_pointer instance managing the lifecycle of a underlying + // Parlay scheduler. + // + // Properties of the returned scheduler_pointer instance: + // + // (1) `raw` points to a valid scheduler instance. + // + // (2) `shared` is nullptr except for the following two cases, in which it + // points to the same scheduler instance as the raw pointer. + // + // (2.1) If the current call creates a scheduler instance. + // + // (2.2) If the current call has `increment_ref_count` set to true and is made + // by the thread which is the root thread of a live scheduler instance. This + // represents a case where the caller explicitly maintains the shared + // ownership of the scheduler. + static scheduler_pointer GetThreadLocalScheduler( + bool increment_ref_count) { + auto& scheduler = GetInternalScheduler(); + scheduler_pointer result; + unsigned int thread_id = GetThreadId(); + if (thread_id == kMaxThreadId) { + // We have not yet been initialized, so are not part of an existing parlay + // scheduler. Create a scheduler and perform ref-counting. + + std::cerr << "Currently not expecting to create scheduler instance from GetThreadLocalScheduler()" << std::endl; + exit(-1); + +// // Create the scheduler instance. +// result.shared = std::make_shared(); +// +// // Assign it to the thread_local weak pointer. The scheduler is stored in +// // the thread_local weak pointer variable for the root thread only. +// scheduler.weak = result.shared; +// +// // Also store the raw pointer. This allows faster access when ref-counting +// // is unnecessary based on the call context. +// fork_join_scheduler* raw_pointer = result.shared.get(); +// scheduler.raw = raw_pointer; +// +// // The raw pointer needs to be stored in child threads' thread_local +// // variable as well. Thus the need to capture it in the initialization +// // function. +// std::function init_func = +// [raw_pointer](unsigned int child_thread_id) { +// GetThreadId() = child_thread_id; +// SetInternalScheduler(raw_pointer); +// }; +// +// std::function cleanup_func = []() { +// ResetInternalScheduler(); +// GetThreadId() = kMaxThreadId; +// }; +// raw_pointer->start_workers(std::move(init_func), std::move(cleanup_func)); + + } else if (thread_id == 0 && increment_ref_count) { + // We are in the already-initialized main thread and we are requested to + // increase the ref-count. This is the case where the external caller + // would like to explicitly manage the lifecycle of the scheduler. + result.shared = scheduler.weak.lock(); + } + + // When we arrive here, the raw pointer in the thread_local variable + // `scheduler` must have already been set properly. + result.raw = scheduler.raw; + + // Invariants on return value + // (1) Raw pointer must not be nullptr + // (2) If (2.1) we create a new scheduler instance in this call or (2.2) + // `increment_ref_count` is set to true, then the returned shared pointer + // must not be nullptr and it must point to the same scheduler object as the + // raw pointer. Otherwise, it must be nullptr. + assert(result.raw != nullptr); + + // Note that we must use the previously stored thread_id instead of calling + // GetThreadId again, because the thread-local value in GetThreadId may have + // been changed within this function. The invariant is checked against the + // initial thread_id value upon entry. + if ((thread_id == kMaxThreadId) || + (thread_id == 0 && increment_ref_count)) { + assert(result.shared != nullptr); + assert(result.shared.get() == result.raw); + } else { + assert(result.shared == nullptr); + } + + return result; + } + unsigned int num_workers() { return sched->num_workers(); } unsigned int worker_id() { return sched->worker_id(); } + std::chrono::nanoseconds set_sleep_delay_when_idle( + std::chrono::nanoseconds new_sleep_delay_when_idle) { + return sched->set_sleep_delay_when_idle(new_sleep_delay_when_idle); + } + + // Determine the number of workers to spawn + unsigned int init_num_workers() { + if (const auto env_p = std::getenv("PARLAY_NUM_THREADS")) { + return std::stoi(env_p); + } else { + if (num_workers_to_start == 0) { + std::cerr + << "WARNING: Initializing parlay scheduler from " + "parlay/scheduler.h. Expected scheduler to be explicitly " + "initialized by the client before use. In the future, we may " + "change the default initialization behavior to result in a " + "runtime error." + << std::endl; + num_workers_to_start = std::thread::hardware_concurrency(); + } + return num_workers_to_start; + } + } // Fork two thunks and wait until they both finish. template @@ -369,8 +557,72 @@ class fork_join_scheduler { } } + static scheduler_internal_pointer& GetInternalScheduler() { + static thread_local scheduler_internal_pointer scheduler_ptr; + return scheduler_ptr; + } + + static void SetInternalScheduler(fork_join_scheduler* input_scheduler) { + auto& scheduler = GetInternalScheduler(); + scheduler.raw = input_scheduler; // thread-local write + // There is no need to populate the weak pointer for child threads. + scheduler.weak.reset(); + } + + static void ResetInternalScheduler() { + auto& scheduler = GetInternalScheduler(); + scheduler.raw = nullptr; + // This is only needed by the main thread. But resetting is harmless for + // child threads. + scheduler.weak.reset(); + } + }; + +inline bool IsSchedulerInitialized() { + return GetThreadId() != kMaxThreadId; +} + +// Returns the thread-local scheduler pointer. Creates the thread-local +// scheduler if necessary. +// +// If `increment_ref_count` is true, then we return the shared pointer in +// addition to the raw pointer if we are called in thread_id==0 (i.e., we are +// in an already-initialized Parlay main thread). +// +// If we are called with thread_id==kMaxThreadId, then a new scheduler instance +// will be created with this call. In this case, we always return the shared +// pointer along with the raw pointer, regardless of the value of +// `increment_ref_count`. +inline scheduler_pointer GetScheduler(bool increment_ref_count = false) { + return fork_join_scheduler::GetThreadLocalScheduler(increment_ref_count); +} + + +// Initializes the scheduler to use num_workers, and starts the workers. Calling +// this function is optional, but recommended: if it is not called, the +// scheduler will be initialized with a default num_workers the first time it is +// used via a parallel_for() or pardo(). Since the default num_workers is +// unlikely to be the best choice for the algorithm and hardware environment, +// calling initialize_scheduler is preferred. +// +// `num_workers` will have no effect if the scheduler is already running, either +// from a previous call to initialize_scheduler, or from a call to +// GetScheduler() (via parallel_for() or pardo()). But initialize_scheduler will +// always extend the lifetime of the scheduler to include that of the returned +// scheduler_pointer if it is called wtihin a root thread (i.e., thread id == +// 0). +// +// Returns a scheduler_pointer instance pointing to the new scheduler instance. +inline scheduler_pointer initialize_scheduler( + unsigned int num_workers) { + fork_join_scheduler::set_num_workers_to_start(num_workers); + return GetScheduler(/*increment_ref_count=*/true); +} + + + } // namespace parlay #endif // PARLAY_SCHEDULER_H_ diff --git a/include/parlay/sequence.h b/include/parlay/sequence.h index 33896ddb..18275e50 100644 --- a/include/parlay/sequence.h +++ b/include/parlay/sequence.h @@ -44,13 +44,8 @@ namespace parlay { // If the macro PARLAY_USE_STD_ALLOC is defined, sequences will default // to using std::allocator instead of parlay::allocator. namespace internal { -#ifndef PARLAY_USE_STD_ALLOC -template -using sequence_default_allocator = parlay::allocator; -#else template using sequence_default_allocator = std::allocator; -#endif } // namespace internal