From ab8682266d5a4a8955b1557534d0c8b658e795ab Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 26 May 2023 14:28:56 -0400 Subject: [PATCH] PaRSEC backend: offload reductions to dedicated task When we encounter a streaming terminal we schedule a dedicated reducer task. Subsequent inputs are put into a LIFO. Once the reducer task runs it will process all available inputs for that terminal but will not block waiting for all inputs. In the worst case, we create N tasks (one for each input), likely less. Signed-off-by: Joseph Schuchart --- ttg/ttg/parsec/ttg.h | 358 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 318 insertions(+), 40 deletions(-) diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 41ee228e5..b3b2f975c 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -233,7 +233,7 @@ namespace ttg_parsec { parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, PARSEC_TTG_MAX_AM_SIZE); parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128); } - + create_tpool(); } @@ -542,10 +542,36 @@ namespace ttg_parsec { typedef void (release_task_fn)(parsec_ttg_task_base_t*); - typedef struct { + struct stream_info_t { std::size_t goal; std::size_t size; - } size_goal_t; + parsec_lifo_t reduce_copies; + std::atomic reduce_count; + }; + + protected: + template + void init_stream_info_impl(TT *tt, std::array& streams) { + if constexpr (TT::numins > i) { + if (std::get(tt->input_reducers)) { + streams[i].goal = tt->static_stream_goal[i]; + streams[i].size = 0; + PARSEC_OBJ_CONSTRUCT(&streams[i].reduce_copies, parsec_lifo_t); + streams[i].reduce_count.store(0, std::memory_order_relaxed); + } + /* recursion */ + if constexpr((i + 1) < TT::numins) { + init_stream_info_impl(tt, streams); + } + } + } + + template + void init_stream_info(TT *tt, std::array& streams) { + init_stream_info_impl<0>(tt, streams); + } + + public: /* Poor-mans virtual function * We cannot use virtual inheritance or private visibility because we @@ -604,7 +630,7 @@ namespace ttg_parsec { static constexpr size_t num_streams = TT::numins; TT* tt; key_type key; - size_goal_t stream[num_streams] = {}; + std::array streams; parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class) : parsec_ttg_task_base_t(mempool, task_class, num_streams) { @@ -631,6 +657,8 @@ namespace ttg_parsec { parsec_task.data[i].data_in = nullptr; } + init_stream_info(tt, streams); + // We store the hash of the key and the address where it can be found in locals considered as a scratchpad uint64_t hv = ttg::hash>{}(key); *(uintptr_t*)&(parsec_task.locals[0]) = hv; @@ -650,7 +678,7 @@ namespace ttg_parsec { struct parsec_ttg_task_t : public parsec_ttg_task_base_t { static constexpr size_t num_streams = TT::numins; TT* tt; - size_goal_t stream[num_streams] = {}; + std::array streams; parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class) : parsec_ttg_task_base_t(mempool, task_class, num_streams) { @@ -671,6 +699,8 @@ namespace ttg_parsec { for (int i = 0; i < num_streams; ++i) { parsec_task.data[i].data_in = nullptr; } + + init_stream_info(tt, streams); } static void release_task(parsec_ttg_task_base_t* task_base) { @@ -1189,6 +1219,7 @@ namespace ttg_parsec { private: using task_t = detail::parsec_ttg_task_t; + friend detail::parsec_ttg_task_base_t; friend task_t; /* the offset of the key placed after the task structure in the memory from mempool */ @@ -1239,6 +1270,7 @@ namespace ttg_parsec { // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime ttg::meta::detail::input_reducers_t input_reducers; //!< Reducers for the input terminals (empty = expect single value) + std::array inpute_reducers_taskclass = { nullptr }; std::array static_stream_goal; int num_pullins = 0; @@ -1358,6 +1390,97 @@ namespace ttg_parsec { parsec_ttg_caller = NULL; } + struct reducer_task_t { + parsec_task_t parsec_task; + task_t *task; + + reducer_task_t(task_t* task, parsec_thread_mempool_t *mempool, + parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, + int32_t priority) + : task(task) + { + PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); + parsec_task.mempool_owner = mempool; + parsec_task.task_class = task_class; + parsec_task.status = PARSEC_TASK_STATUS_HOOK; + parsec_task.taskpool = taskpool; + parsec_task.priority = priority; + parsec_task.chore_mask = 1<<0; + } + }; + + template + static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) { + using rtask_t = reducer_task_t; + using value_t = std::tuple_element_t; + constexpr const bool val_is_void = ttg::meta::is_void_v; + rtask_t *rtask = (rtask_t*)parsec_task; + task_t *task = rtask->task; + ttT *baseobj = task->tt; + derivedT *obj = static_cast(baseobj); + + auto& reducer = std::get(baseobj->input_reducers); + + assert(parsec_ttg_caller == NULL); + parsec_ttg_caller = static_cast(task); + + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": reducer executing"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing"); + } + + /* the copy to reduce into */ + detail::ttg_data_copy_t *target_copy; + target_copy = static_cast(task->parsec_task.data[i].data_in); + assert(val_is_void || nullptr != target_copy); + /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */ + std::size_t c; + std::cout << "static_reducer_op startign up, size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl; + do { + if constexpr(!val_is_void) { + /* the copies to reduce out of */ + detail::ttg_data_copy_t *source_copy; + parsec_list_item_t *item; + item = parsec_lifo_pop(&task->streams[i].reduce_copies); + if (nullptr == item) { + break; // maybe someone is changing the goal right now + } + source_copy = ((detail::ttg_data_copy_self_t *)(item))->self; + reducer(*reinterpret_cast *>(target_copy->device_private), + *reinterpret_cast *>(source_copy->device_private)); + detail::release_data_copy(source_copy); + } else if constexpr(val_is_void) { + reducer(); // invoke control reducer + } + // there is only one task working on this stream, so no need to be atomic here + task->streams[i].size++; + std::cout << "reducer " << task->streams[i].size << " of " << task->streams[i].goal << std::endl; + } while ((c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)) > 1); + + /* finalize_argstream sets goal to 1, so size may be larger than goal */ + bool complete = (task->streams[i].size >= task->streams[i].goal); + + if (complete && c == 1) { + /* task is still in the hash table, have release_task remove it */ + task->remove_from_hash = true; + task->release_task(task); + } + + parsec_ttg_caller = NULL; + + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); + } + + return PARSEC_HOOK_RETURN_DONE; + } + + protected: template uint64_t unpack(T &obj, void *_bytes, uint64_t pos) { @@ -1723,14 +1846,37 @@ namespace ttg_parsec { newtask->function_template_class_ptr[static_cast(ttg::ExecutionSpace::CUDA)] = reinterpret_cast(&TT::static_op); - for (int i = 0; i < static_stream_goal.size(); ++i) { - newtask->stream[i].goal = static_stream_goal[i]; + ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task"); + return newtask; + } + + + template + reducer_task_t *create_new_reducer_task(task_t *task) { + /* make sure we can reuse the existing memory pool and don't have to create a new one */ + static_assert(sizeof(task_t) >= sizeof(reducer_task_t)); + constexpr const bool keyT_is_Void = ttg::meta::is_void_v; + auto &world_impl = world.impl(); + reducer_task_t *newtask; + parsec_thread_mempool_t *mempool = get_task_mempool(); + char *taskobj = (char *)parsec_thread_mempool_allocate(mempool); + // use the priority of the task we stream into + int32_t priority = 0; + if constexpr (!keyT_is_Void) { + priority = priomap(task->key); + ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task"); + } else { + priority = priomap(); + ttg::trace(world.rank(), ":", get_name(), ": creating reducer task"); } + /* placement-new the task */ + newtask = new (taskobj) reducer_task_t(task, mempool, inpute_reducers_taskclass[i], + world_impl.taskpool(), priority); - ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task"); return newtask; } + // Used to set the i'th argument template void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr, @@ -1756,7 +1902,7 @@ namespace ttg_parsec { task_t *task; auto &world_impl = world.impl(); auto &reducer = std::get(input_reducers); - bool release = true; + bool release = false; bool remove_from_hash = true; bool discover_task = true; bool get_pull_data = false; @@ -1813,10 +1959,26 @@ namespace ttg_parsec { #endif } + std::cout << "set_arg_local_impl key " << key << " value " << value << std::endl; + if (reducer) { // is this a streaming input? reduce the received value // N.B. Right now reductions are done eagerly, without spawning tasks // this means we must lock - parsec_hash_table_lock_bucket(&tasks_table, hk); + //parsec_hash_table_lock_bucket(&tasks_table, hk); + + auto submit_reducer_task = [&](auto *task){ + /* check if we need to create a task */ + std::size_t c = task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); + std::cout << "submit_reducer_task c " << c << ", size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl; + if (0 == c) { + /* we are responsible for creating the reduction task */ + reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(task); + parsec_task_t *vp_task_rings[1] = { &reduce_task->parsec_task }; + parsec_execution_stream_t *es = world_impl.execution_stream(); + __parsec_schedule_vp(es, vp_task_rings, 0); + } + }; if constexpr (!ttg::meta::is_void_v) { // for data values // have a value already? if not, set, otherwise reduce @@ -1827,19 +1989,35 @@ namespace ttg_parsec { * mechanism (it would release the task, not the reduction value). */ copy = detail::create_new_datacopy(std::forward(value)); task->parsec_task.data[i].data_in = copy; + task->streams[i].size++; + if (task->streams[i].size == task->streams[i].goal) { + release = true; + } } else { - reducer(*reinterpret_cast *>(copy->device_private), value); + if (nullptr != parsec_ttg_caller) { + copy = detail::find_copy_in_task(parsec_ttg_caller, &value); + } + if (nullptr != copy) { + /* retain the data copy */ + copy = detail::register_data_copy(copy, task, input_is_const); + } else { + /* create a new copy */ + copy = detail::create_new_datacopy(std::forward(value)); + } + /* enqueue the data copy to be reduced */ + parsec_lifo_push(&task->streams[i].reduce_copies, ©->super); + submit_reducer_task(task); + //reducer(*reinterpret_cast *>(copy->device_private), value); } } else { - reducer(); // even if this was a control input, must execute the reducer for possible side effects - } - task->stream[i].size++; - release = (task->stream[i].size == task->stream[i].goal); - if (release) { - parsec_hash_table_nolock_remove(&tasks_table, hk); - remove_from_hash = false; + /* submit reducer for void values to handle side effects */ + submit_reducer_task(task); } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + //if (release) { + // parsec_hash_table_nolock_remove(&tasks_table, hk); + // remove_from_hash = false; + //} + //parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { /* whether the task needs to be deferred or not */ if constexpr (!valueT_is_Void) { @@ -1864,6 +2042,8 @@ namespace ttg_parsec { * make a copy of the original data */ release = (copy->push_task != &task->parsec_task); task->parsec_task.data[i].data_in = copy; + } else { + release = true; } } task->remove_from_hash = remove_from_hash; @@ -2438,16 +2618,22 @@ namespace ttg_parsec { #endif } } + parsec_hash_table_unlock_bucket(&tasks_table, hk); // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].goal = size; - bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - if (release) release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = size; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= size)) { + release_task(task); + } } } @@ -2492,16 +2678,22 @@ namespace ttg_parsec { #endif } } + parsec_hash_table_unlock_bucket(&tasks_table, hk); // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].goal = size; - bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - if (release) release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = size; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= size)) { + release_task(task); + } } } @@ -2533,8 +2725,8 @@ namespace ttg_parsec { auto hk = reinterpret_cast(&key); task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + //parsec_hash_table_lock_bucket(&tasks_table, hk); + if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) { ttg::print_error(world.rank(), ":", get_name(), ":", key, " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("TT::finalize called on stream that never received an input data"); @@ -2544,10 +2736,16 @@ namespace ttg_parsec { // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = 1; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= 1)) { + release_task(task); + } } } @@ -2588,10 +2786,16 @@ namespace ttg_parsec { // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = 1; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= 1)) { + release_task(task); + } } } @@ -2807,7 +3011,7 @@ namespace ttg_parsec { static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *t) { parsec_execution_stream_t *safe_es = parsec_ttg_es; parsec_ttg_es = es; - auto *task = (detail::parsec_ttg_task_base_t *)t; + task_t *task = (task_t *)t; for (int i = 0; i < task->data_count; i++) { detail::ttg_data_copy_t *copy = static_cast(task->parsec_task.data[i].data_in); if (nullptr == copy) continue; @@ -2985,6 +3189,12 @@ namespace ttg_parsec { free((void*)self.name); self.name = nullptr; } + for (std::size_t i = 0; i < numins; ++i) { + if (inpute_reducers_taskclass[i] != nullptr) { + std::free(inpute_reducers_taskclass[i]); + inpute_reducers_taskclass[i] = nullptr; + } + } release(); } @@ -3036,6 +3246,74 @@ namespace ttg_parsec { void set_input_reducer(Reducer &&reducer) { ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i); std::get(input_reducers) = reducer; + + parsec_task_class_t *tc = inpute_reducers_taskclass[i]; + if (nullptr == tc) { + tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc)); + inpute_reducers_taskclass[i] = tc; + + tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str()); + tc->task_class_id = get_instance_id(); + tc->nb_parameters = 0; + tc->nb_locals = 0; + tc->nb_flows = numflows; + + auto &world_impl = world.impl(); + + if( world_impl.profiling() ) { + // first two ints are used to store the hash of the key. + tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int); + // seconds two ints are used to store a pointer to the key of the task. + tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int); + + // If we have parameters and locals, we need to define the corresponding dereference arrays + tc->params[0] = &detail::parsec_taskclass_param0; + tc->params[1] = &detail::parsec_taskclass_param1; + + tc->locals[0] = &detail::parsec_taskclass_param0; + tc->locals[1] = &detail::parsec_taskclass_param1; + tc->locals[2] = &detail::parsec_taskclass_param2; + tc->locals[3] = &detail::parsec_taskclass_param3; + } + tc->make_key = make_key; + tc->key_functions = &tasks_hash_fcts; + tc->task_snprintf = parsec_ttg_task_snprintf; + +#if defined(PARSEC_PROF_TRACE) + tc->profile_info = &parsec_ttg_task_info; +#endif + + world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_castnb_task_classes)>(self.task_class_id+1)); + +#if 0 + // FIXME: currently only support reduction on the host + if constexpr (derived_has_cuda_op()) { + self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t)); + ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA; + ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL; + ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda; + ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU; + ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL; + ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook; + ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE; + ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL; + ((__parsec_chore_t *)self.incarnations)[2].hook = NULL; + } else +#endif // 0 + { + tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t)); + ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU; + ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL; + ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op; + ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE; + ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL; + ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL; + } + + /* the reduction task does not alter the termination detection because the target task will execute */ + tc->release_task = &parsec_release_task_to_mempool; + tc->complete_execution = NULL; + } } /// define the reducer function to be called when additional inputs are