Skip to content

Commit

Permalink
scheduler changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Laxman Dhulipala committed Sep 5, 2023
1 parent 6b4a4cd commit c011f16
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 19 deletions.
7 changes: 1 addition & 6 deletions include/parlay/internal/uninitialized_sequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@

namespace parlay {
namespace internal {

#ifndef PARLAY_USE_STD_ALLOC
template<typename T>
using _uninitialized_sequence_default_allocator = parlay::allocator<T>;
#else

template<typename T>
using _uninitialized_sequence_default_allocator = std::allocator<T>;
#endif

// An uninitialized fixed-size sequence container.
//
Expand Down
268 changes: 260 additions & 8 deletions include/parlay/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
//
// Default: true
#ifndef PARLAY_ELASTIC_PARALLELISM
#define PARLAY_ELASTIC_PARALLELISM true
#define PARLAY_ELASTIC_PARALLELISM false
#endif


Expand All @@ -52,6 +52,16 @@

namespace parlay {

// kMaxThreadId represents an invalid thread id. A thread with thread_id equals
// kMaxThreadId is an uninitialized thread from Parlay's perspective.
inline constexpr unsigned int kMaxThreadId =
std::numeric_limits<unsigned int>::max();

inline unsigned int& GetThreadId() {
static thread_local unsigned int thread_id = kMaxThreadId;
return thread_id;
}

template <typename Job>
struct scheduler {
static_assert(std::is_invocable_r_v<void, Job&>);
Expand All @@ -69,8 +79,6 @@ struct scheduler {
public:
unsigned int num_threads;

static thread_local unsigned int thread_id;

explicit scheduler(size_t num_workers)
: num_threads(num_workers),
num_deques(num_threads),
Expand All @@ -81,7 +89,7 @@ struct scheduler {
finished_flag(false) {

// Spawn num_threads many threads on startup
thread_id = 0; // thread-local write
GetThreadId() = 0; // thread-local write
for (unsigned int i = 1; i < num_threads; i++) {
spawned_threads.emplace_back([&, i]() {
worker(i);
Expand Down Expand Up @@ -130,7 +138,21 @@ struct scheduler {
}

unsigned int num_workers() { return num_threads; }
unsigned int worker_id() { return thread_id; }
unsigned int worker_id() { return GetThreadId(); }

// Sets the amount of sleep delay when a worker finds no work to steal. This
// controls a tradeoff between latency (how long before a worker wakes up and
// discovers a pending job to steal) and cpu load when idle (how often a
// worker tries to find a job to steal). Returns the previous value.
// NOTE: It is normally not necessary to adjust this, since the initial value
// is chosen to incur minimal load with only a small latency impact. But this
// function is available for tuning performance in special cases.
std::chrono::nanoseconds set_sleep_delay_when_idle(
std::chrono::nanoseconds new_sleep_delay_when_idle) {
auto old_value = sleep_delay_when_idle.load();
sleep_delay_when_idle.store(new_sleep_delay_when_idle);
return old_value;
}

bool finished() const noexcept {
return finished_flag.load(std::memory_order_acquire);
Expand All @@ -152,12 +174,20 @@ struct scheduler {
std::atomic<size_t> wake_up_counter{0};
std::atomic<size_t> num_finished_workers{0};

// When a worker fails to find work to steal, it performs 100 * num_queues
// sleeps of this duration (checking finished() and work_available after each
// one) before trying again to find work to steal.
std::atomic<std::chrono::nanoseconds> sleep_delay_when_idle =
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::milliseconds(1));


// Start an individual worker task, stealing work if no local
// work is available. May go to sleep if no work is available
// for a long time, until woken up again when notified that
// new work is available.
void worker(size_t id) {
thread_id = id; // thread-local write
GetThreadId() = id; // thread-local write
#if PARLAY_ELASTIC_PARALLELISM
wait_for_work();
#endif
Expand Down Expand Up @@ -288,23 +318,181 @@ struct scheduler {
for (unsigned int i = 1; i < num_threads; i++) {
spawned_threads[i - 1].join();
}
// Reset thread_local thread_id for the main thread before recycling it.
GetThreadId() = kMaxThreadId;
}
};

template <typename T>
thread_local unsigned int scheduler<T>::thread_id = 0;
class fork_join_scheduler;

// scheduler_internal_pointer is a data class that defines pointer types used
// internally for scheduler lifecycle management. It is conceptually equivalent
// to `scheduler_pointer` except that the latter is used externally (as a return
// type of `initialize_scheduler`).
//
// The intended use of `scheduler_internal_pointer` is to maintain a
// thread-local instance of this class for all Parlay threads (both main and
// child threads).
struct scheduler_internal_pointer {
fork_join_scheduler* raw;
std::weak_ptr<fork_join_scheduler> weak;

scheduler_internal_pointer() : raw(nullptr) {}
};

// scheduler_pointer is the external interface as the pointer type for the
// scheduler. It supports dereference operation.
struct scheduler_pointer {
fork_join_scheduler* raw;
std::shared_ptr<fork_join_scheduler> shared;

scheduler_pointer() : raw(nullptr) {}

scheduler_pointer(const scheduler_pointer& other) {
raw = other.raw;
shared = other.shared;
}

scheduler_pointer& operator=(const scheduler_pointer& other) {
raw = other.raw;
shared = other.shared;
return *this;
}

// When dereferencing, the raw pointer must be valid. Do *not* use `shared`,
// which is for lifecycle management only.
fork_join_scheduler* operator->() { return raw; }
};

class fork_join_scheduler {
using Job = WorkStealingJob;

// Underlying scheduler object
std::unique_ptr<scheduler<Job>> sched;

inline static thread_local unsigned int num_workers_to_start = 0;

public:
explicit fork_join_scheduler(size_t p) : sched(std::make_unique<scheduler<Job>>(p)) {}

static void set_num_workers_to_start(unsigned int num_workers) {
num_workers_to_start = num_workers;
}

// Returns a scheduler_pointer instance managing the lifecycle of a underlying
// Parlay scheduler.
//
// Properties of the returned scheduler_pointer instance:
//
// (1) `raw` points to a valid scheduler instance.
//
// (2) `shared` is nullptr except for the following two cases, in which it
// points to the same scheduler instance as the raw pointer.
//
// (2.1) If the current call creates a scheduler instance.
//
// (2.2) If the current call has `increment_ref_count` set to true and is made
// by the thread which is the root thread of a live scheduler instance. This
// represents a case where the caller explicitly maintains the shared
// ownership of the scheduler.
static scheduler_pointer GetThreadLocalScheduler(
bool increment_ref_count) {
auto& scheduler = GetInternalScheduler();
scheduler_pointer result;
unsigned int thread_id = GetThreadId();
if (thread_id == kMaxThreadId) {
// We have not yet been initialized, so are not part of an existing parlay
// scheduler. Create a scheduler and perform ref-counting.

std::cerr << "Currently not expecting to create scheduler instance from GetThreadLocalScheduler()" << std::endl;
exit(-1);

// // Create the scheduler instance.
// result.shared = std::make_shared<fork_join_scheduler>();
//
// // Assign it to the thread_local weak pointer. The scheduler is stored in
// // the thread_local weak pointer variable for the root thread only.
// scheduler.weak = result.shared;
//
// // Also store the raw pointer. This allows faster access when ref-counting
// // is unnecessary based on the call context.
// fork_join_scheduler* raw_pointer = result.shared.get();
// scheduler.raw = raw_pointer;
//
// // The raw pointer needs to be stored in child threads' thread_local
// // variable as well. Thus the need to capture it in the initialization
// // function.
// std::function<void(unsigned int)> init_func =
// [raw_pointer](unsigned int child_thread_id) {
// GetThreadId() = child_thread_id;
// SetInternalScheduler(raw_pointer);
// };
//
// std::function<void()> cleanup_func = []() {
// ResetInternalScheduler();
// GetThreadId() = kMaxThreadId;
// };
// raw_pointer->start_workers(std::move(init_func), std::move(cleanup_func));

} else if (thread_id == 0 && increment_ref_count) {
// We are in the already-initialized main thread and we are requested to
// increase the ref-count. This is the case where the external caller
// would like to explicitly manage the lifecycle of the scheduler.
result.shared = scheduler.weak.lock();
}

// When we arrive here, the raw pointer in the thread_local variable
// `scheduler` must have already been set properly.
result.raw = scheduler.raw;

// Invariants on return value
// (1) Raw pointer must not be nullptr
// (2) If (2.1) we create a new scheduler instance in this call or (2.2)
// `increment_ref_count` is set to true, then the returned shared pointer
// must not be nullptr and it must point to the same scheduler object as the
// raw pointer. Otherwise, it must be nullptr.
assert(result.raw != nullptr);

// Note that we must use the previously stored thread_id instead of calling
// GetThreadId again, because the thread-local value in GetThreadId may have
// been changed within this function. The invariant is checked against the
// initial thread_id value upon entry.
if ((thread_id == kMaxThreadId) ||
(thread_id == 0 && increment_ref_count)) {
assert(result.shared != nullptr);
assert(result.shared.get() == result.raw);
} else {
assert(result.shared == nullptr);
}

return result;
}

unsigned int num_workers() { return sched->num_workers(); }
unsigned int worker_id() { return sched->worker_id(); }
std::chrono::nanoseconds set_sleep_delay_when_idle(
std::chrono::nanoseconds new_sleep_delay_when_idle) {
return sched->set_sleep_delay_when_idle(new_sleep_delay_when_idle);
}

// Determine the number of workers to spawn
unsigned int init_num_workers() {
if (const auto env_p = std::getenv("PARLAY_NUM_THREADS")) {
return std::stoi(env_p);
} else {
if (num_workers_to_start == 0) {
std::cerr
<< "WARNING: Initializing parlay scheduler from "
"parlay/scheduler.h. Expected scheduler to be explicitly "
"initialized by the client before use. In the future, we may "
"change the default initialization behavior to result in a "
"runtime error."
<< std::endl;
num_workers_to_start = std::thread::hardware_concurrency();
}
return num_workers_to_start;
}
}

// Fork two thunks and wait until they both finish.
template <typename L, typename R>
Expand Down Expand Up @@ -369,8 +557,72 @@ class fork_join_scheduler {
}
}

static scheduler_internal_pointer& GetInternalScheduler() {
static thread_local scheduler_internal_pointer scheduler_ptr;
return scheduler_ptr;
}

static void SetInternalScheduler(fork_join_scheduler* input_scheduler) {
auto& scheduler = GetInternalScheduler();
scheduler.raw = input_scheduler; // thread-local write
// There is no need to populate the weak pointer for child threads.
scheduler.weak.reset();
}

static void ResetInternalScheduler() {
auto& scheduler = GetInternalScheduler();
scheduler.raw = nullptr;
// This is only needed by the main thread. But resetting is harmless for
// child threads.
scheduler.weak.reset();
}

};


inline bool IsSchedulerInitialized() {
return GetThreadId() != kMaxThreadId;
}

// Returns the thread-local scheduler pointer. Creates the thread-local
// scheduler if necessary.
//
// If `increment_ref_count` is true, then we return the shared pointer in
// addition to the raw pointer if we are called in thread_id==0 (i.e., we are
// in an already-initialized Parlay main thread).
//
// If we are called with thread_id==kMaxThreadId, then a new scheduler instance
// will be created with this call. In this case, we always return the shared
// pointer along with the raw pointer, regardless of the value of
// `increment_ref_count`.
inline scheduler_pointer GetScheduler(bool increment_ref_count = false) {
return fork_join_scheduler::GetThreadLocalScheduler(increment_ref_count);
}


// Initializes the scheduler to use num_workers, and starts the workers. Calling
// this function is optional, but recommended: if it is not called, the
// scheduler will be initialized with a default num_workers the first time it is
// used via a parallel_for() or pardo(). Since the default num_workers is
// unlikely to be the best choice for the algorithm and hardware environment,
// calling initialize_scheduler is preferred.
//
// `num_workers` will have no effect if the scheduler is already running, either
// from a previous call to initialize_scheduler, or from a call to
// GetScheduler() (via parallel_for() or pardo()). But initialize_scheduler will
// always extend the lifetime of the scheduler to include that of the returned
// scheduler_pointer if it is called wtihin a root thread (i.e., thread id ==
// 0).
//
// Returns a scheduler_pointer instance pointing to the new scheduler instance.
inline scheduler_pointer initialize_scheduler(
unsigned int num_workers) {
fork_join_scheduler::set_num_workers_to_start(num_workers);
return GetScheduler(/*increment_ref_count=*/true);
}



} // namespace parlay

#endif // PARLAY_SCHEDULER_H_
5 changes: 0 additions & 5 deletions include/parlay/sequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,8 @@ namespace parlay {
// If the macro PARLAY_USE_STD_ALLOC is defined, sequences will default
// to using std::allocator instead of parlay::allocator.
namespace internal {
#ifndef PARLAY_USE_STD_ALLOC
template<typename T>
using sequence_default_allocator = parlay::allocator<T>;
#else
template<typename T>
using sequence_default_allocator = std::allocator<T>;
#endif
} // namespace internal


Expand Down

0 comments on commit c011f16

Please sign in to comment.