diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 6da7812bb..81f9eb4ed 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -13,6 +13,7 @@ set(ttg-util-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/macro.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/meta.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/print.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/region_probe.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/span.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/trace.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/tree.h diff --git a/ttg/ttg/madness/ttg.h b/ttg/ttg/madness/ttg.h index 685c9b854..23d6bbab5 100644 --- a/ttg/ttg/madness/ttg.h +++ b/ttg/ttg/madness/ttg.h @@ -21,6 +21,7 @@ #include "ttg/util/meta.h" #include "ttg/util/void.h" #include "ttg/world.h" +#include "ttg/util/region_probe.h" #include #include @@ -126,6 +127,11 @@ namespace ttg_madness { std::shared_ptr world_sptr{static_cast(world_ptr)}; ttg::World world{std::move(world_sptr)}; ttg::detail::set_default_world(std::move(world)); + + /* initialize probes */ + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); } inline void ttg_finalize() { ttg::detail::set_default_world(ttg::World{}); // reset the default world diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 3fe2bec9a..a14c4c1cf 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -60,6 +60,8 @@ #include "ttg/parsec/ttg_data_copy.h" +#include "ttg/util/region_probe.h" + /* PaRSEC function declarations */ extern "C" { void parsec_taskpool_termination_detected(parsec_taskpool_t *tp); @@ -92,6 +94,26 @@ namespace ttg_parsec { namespace detail { + inline ttg::detail::region_probe schedule_probe("TTG::PARSEC SCHEDULE"); + inline ttg::detail::region_probe release_probe("TTG::CREATE TASK"); + inline ttg::detail::region_probe copyhandler_probe("TTG::HANDLE COPY"); + inline ttg::detail::region_probe createtask_probe("TTG::CREATE TASK"); + inline ttg::detail::region_probe setarg_probe("TTG::SETARG_LOCAL"); + inline ttg::detail::region_probe setargmsg_probe("TTG::SETARG FROM MSG"); + inline ttg::detail::region_probe sendam_probe("TTG::SEND AM", sizeof(int32_t), + "msg_size{uint32_t}"); + inline ttg::detail::region_probe get_probe("TTG::GET", sizeof(int64_t), + "size{uint64_t}"); + inline ttg::detail::region_probe reducer_probe("TTG::REDUCE"); + inline ttg::detail::region_probe lock_probe("TTG::LOCK HASHTABLE"); + inline ttg::detail::region_probe find_probe("TTG::FIND HASHTABLE"); + inline ttg::detail::region_probe insert_probe("TTG::INSERT HASHTABLE"); + inline ttg::detail::region_probe remove_probe("TTG::REMOVE HASHTABLE"); + inline ttg::detail::region_probe unlock_probe("TTG::UNLOCK HASHTABLE"); + inline ttg::detail::region_probe splitmdbcast_probe("TTG::SPLITMD BCAST"); + inline ttg::detail::region_probe bcast_probe("TTG::BCAST"); + + static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size, int src_rank, void *obj) { static_set_arg_fct_type static_set_arg_fct; @@ -313,7 +335,7 @@ namespace ttg_parsec { auto *execution_stream() { return parsec_ttg_es == nullptr ? es : parsec_ttg_es; } auto *taskpool() { return tpool; } - void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); } + void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1);} void increment_sent_to_sched() { parsec_atomic_fetch_inc_int32(&sent_to_sched_counter()); } void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_nb_pa(taskpool(), 1); } @@ -665,6 +687,11 @@ namespace ttg_parsec { std::shared_ptr world_sptr{static_cast(world_ptr)}; ttg::World world{std::move(world_sptr)}; ttg::detail::set_default_world(std::move(world)); + + /* initialize probes */ + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); } inline void ttg_finalize() { ttg::detail::set_default_world(ttg::World{}); // reset the default world @@ -800,6 +827,8 @@ namespace ttg_parsec { using task_t = detail::parsec_ttg_task_t; + ttg::detail::region_probe task_probe; + /* the offset of the key placed after the task structure in the memory from mempool */ constexpr static const size_t task_key_offset = sizeof(task_t); @@ -866,6 +895,8 @@ namespace ttg_parsec { static void static_op(parsec_task_t *parsec_task) { task_t *task = (task_t *)parsec_task; opT *baseobj = (opT *)task->object_ptr; + + baseobj->task_probe.enter(); derivedT *obj = (derivedT *)task->object_ptr; assert(parsec_ttg_caller == NULL); parsec_ttg_caller = parsec_task; @@ -896,6 +927,7 @@ namespace ttg_parsec { else ttg::print(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); } + baseobj->task_probe.exit(); } template @@ -994,6 +1026,71 @@ namespace ttg_parsec { } } + inline + void taskstable_lock_bucket(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::lock_probe); + parsec_hash_table_lock_bucket(&tasks_table, hk); + } + + inline + void taskstable_unlock_bucket(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::unlock_probe); + parsec_hash_table_unlock_bucket(&tasks_table, hk); + } + + inline + task_t* taskstable_nolock_find(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::find_probe); + return (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk); + } + + inline + void taskstable_nolock_insert(task_t* task) + { + ttg::detail::region_probe_event ev(detail::insert_probe); + parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); + } + + inline + void taskstable_nolock_remove(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::remove_probe); + parsec_hash_table_nolock_remove(&tasks_table, hk); + } + + template + inline + task_t* taskstable_find_or_insert(Key &&key, parsec_key_t hk) + { + task_t *task; + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { + task = create_new_task(key); + world.impl().increment_created(); + taskstable_nolock_insert(task); + } + taskstable_unlock_bucket(hk); + return task; + } + + inline + void send_am(int target, void* msg, size_t size) + { + auto& world_impl = world.impl(); + uint32_t size32 = size; + detail::sendam_probe.enter(size32); + parsec_taskpool_t *tp = world_impl.taskpool(); + tp->tdm.module->outgoing_message_start(tp, target, NULL); + tp->tdm.module->outgoing_message_pack(tp, target, NULL, NULL, 0); + // std::cout << "Sending AM with " << msg->op_id.num_keys << " keys " << std::endl; + parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), target, msg, size); + detail::sendam_probe.exit(0); + } + + /** Returns the task memory pool owned by the calling thread */ inline parsec_thread_mempool_t *get_task_mempool(void) @@ -1048,6 +1145,7 @@ namespace ttg_parsec { void set_arg_from_msg(void *data, std::size_t size) { using valueT = typename std::tuple_element::type::value_type; using msg_t = detail::msg_t; + ttg::detail::region_probe_event ev(detail::setargmsg_probe); msg_t *msg = static_cast(data); if constexpr (!ttg::meta::is_void_v) { /* unpack the keys */ @@ -1139,10 +1237,12 @@ namespace ttg_parsec { iov.num_bytes, &lreg, &lreg_size); world.impl().increment_inflight_msg(); /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */ + detail::get_probe.enter(static_cast(iov.num_bytes)); parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote, &detail::get_complete_cb, activation, /*world.impl().parsec_ttg_rma_tag()*/ cbtag, &fn_ptr, sizeof(std::intptr_t)); + detail::get_probe.exit(static_cast(0)); } assert(num_iovecs == nv); @@ -1256,6 +1356,7 @@ namespace ttg_parsec { template task_t *create_new_task(const Key &key) { + ttg::detail::region_probe_event ev(detail::createtask_probe); constexpr const bool keyT_is_Void = ttg::meta::is_void_v; auto &world_impl = world.impl(); task_t *newtask; @@ -1285,6 +1386,7 @@ namespace ttg_parsec { } if (tracing()) ttg::print(world.rank(), ":", get_name(), " : ", key, ": creating task"); + return newtask; } @@ -1295,6 +1397,8 @@ namespace ttg_parsec { constexpr const bool valueT_is_Void = ttg::meta::is_void_v; constexpr const bool keyT_is_Void = ttg::meta::is_void_v; + ttg::detail::region_probe_event ev(detail::setarg_probe); + if (tracing()) { if constexpr (!valueT_is_Void) { ttg::print(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i, @@ -1317,13 +1421,7 @@ namespace ttg_parsec { bool remove_from_hash = true; /* If we have only one input and no reducer on that input we can skip the hash table */ if (numins > 1 || reducer) { - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { - task = create_new_task(key); - world_impl.increment_created(); - parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); - } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + task = taskstable_find_or_insert(key, hk); } else { task = create_new_task(key); world_impl.increment_created(); @@ -1336,12 +1434,12 @@ namespace ttg_parsec { if (reducer) { // is this a streaming input? reduce the received value // N.B. Right now reductions are done eagerly, without spawning tasks // this means we must lock - parsec_hash_table_lock_bucket(&tasks_table, hk); + taskstable_lock_bucket(hk); + detail::reducer_probe.enter(); if constexpr (!ttg::meta::is_void_v) { // for data values // have a value already? if not, set, otherwise reduce if (nullptr == (copy = reinterpret_cast(task->parsec_task.data[i].data_in))) { - using decay_valueT = std::decay_t; copy = detail::create_new_datacopy(std::forward(value)); task->parsec_task.data[i].data_in = copy; } else { @@ -1355,17 +1453,19 @@ namespace ttg_parsec { } else { reducer(); // even if this was a control input, must execute the reducer for possible side effects } + detail::reducer_probe.exit(); task->stream[i].size++; release = (task->stream[i].size == task->stream[i].goal); if (release) { - parsec_hash_table_nolock_remove(&tasks_table, hk); + taskstable_nolock_remove(hk); remove_from_hash = false; } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); } else { /* whether the task needs to be deferred or not */ bool needs_deferring = false; if constexpr (!valueT_is_Void) { + detail::copyhandler_probe.enter(); if (nullptr != task->parsec_task.data[i].data_in) { ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i); throw std::logic_error("bad set arg"); @@ -1386,6 +1486,7 @@ namespace ttg_parsec { copy = detail::create_new_datacopy(std::forward(value)); } task->parsec_task.data[i].data_in = copy; + detail::copyhandler_probe.exit(); } if (needs_deferring) { if (nullptr == task->deferred_release) { @@ -1414,6 +1515,7 @@ namespace ttg_parsec { constexpr const bool keyT_is_Void = ttg::meta::is_void_v; task_t *task = static_cast(base_task); opT &op = *reinterpret_cast(op_ptr); + ttg::detail::region_probe_event ev(detail::release_probe); int32_t count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1; assert(count <= op.self.dependencies_goal); auto &world_impl = op.world.impl(); @@ -1438,6 +1540,7 @@ namespace ttg_parsec { } if (RemoveFromHash) parsec_hash_table_remove(&op.tasks_table, hk); if (nullptr == task_list) { + ttg::detail::region_probe_event ev(detail::schedule_probe); __parsec_schedule(es, &task->parsec_task, 0); } else { parsec_list_prepend(task_list, &task->parsec_task.super); @@ -1569,12 +1672,7 @@ namespace ttg_parsec { pos += sizeof(fn_ptr); } } - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - // std::cout << "Sending AM with " << msg->op_id.num_keys << " keys " << std::endl; - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } // case 3 @@ -1604,7 +1702,9 @@ namespace ttg_parsec { world_impl.increment_created(); if (tracing()) ttg::print(world.rank(), ":", get_name(), " : ", key, ": submitting task for op "); world_impl.increment_sent_to_sched(); + detail::schedule_probe.enter(); __parsec_schedule(es, &task->parsec_task, 0); + detail::schedule_probe.exit(); } else { using msg_t = detail::msg_t; // We pass -1 to signal that we just need to call set_arg(key) on the other end @@ -1613,11 +1713,7 @@ namespace ttg_parsec { uint64_t pos = 0; pos = pack(key, msg->bytes, pos); - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } } @@ -1646,7 +1742,9 @@ namespace ttg_parsec { world_impl.increment_created(); if (tracing()) ttg::print(world.rank(), ":", get_name(), " : submitting task for op "); world_impl.increment_sent_to_sched(); + detail::schedule_probe.enter(); __parsec_schedule(es, &task->parsec_task, 0); + detail::schedule_probe.exit(); } } @@ -1660,6 +1758,7 @@ namespace ttg_parsec { /* submit all ready tasks at once */ if (!parsec_list_nolock_is_empty(&task_list)) { auto ring = (parsec_task_t*) parsec_list_unchain(&task_list); + ttg::detail::region_probe_event ev(detail::schedule_probe); __parsec_schedule(world.impl().execution_stream(), ring, 0); } } @@ -1672,6 +1771,8 @@ namespace ttg_parsec { auto world = ttg_default_execution_context(); int rank = world.rank(); + detail::bcast_probe.enter(); + bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(), [&](const Key &key) { return keymap(key) != rank; }); @@ -1722,10 +1823,7 @@ namespace ttg_parsec { pos = pack(value, msg->bytes, pos); /* Send the message */ - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } /* handle local keys */ broadcast_arg_local(local_begin, local_end, value); @@ -1733,6 +1831,7 @@ namespace ttg_parsec { /* only local keys */ broadcast_arg_local(keylist.begin(), keylist.end(), value); } + detail::bcast_probe.exit(); } template @@ -1743,6 +1842,7 @@ namespace ttg_parsec { using valueT = typename std::tuple_element::type; auto world = ttg_default_execution_context(); int rank = world.rank(); + detail::splitmdbcast_probe.enter(); bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(), [&](const Key &key) { return keymap(key) != rank; }); @@ -1864,10 +1964,7 @@ namespace ttg_parsec { pos += sizeof(fn_ptr); ++idx; } - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } /* handle local keys */ broadcast_arg_local(local_begin, local_end, value); @@ -1875,6 +1972,7 @@ namespace ttg_parsec { /* handle local keys */ broadcast_arg_local(keylist.begin(), keylist.end(), value); } + detail::splitmdbcast_probe.exit(); } // Used by invoke to set all arguments associated with a task @@ -1925,11 +2023,7 @@ namespace ttg_parsec { pos = pack(key, msg->bytes, pos); msg->op_id.num_keys = 1; pos = pack(size, msg->bytes, pos); - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i); @@ -1937,22 +2031,23 @@ namespace ttg_parsec { auto hk = reinterpret_cast(&key); task_t *task; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { task = create_new_task(key); world.impl().increment_created(); - parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); + taskstable_nolock_insert(task); } - // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes task->stream[i].goal = size; bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); - if (release) release_task(this, task); + if (release) { + release_task(this, task); + } } } @@ -1979,11 +2074,7 @@ namespace ttg_parsec { /* pack the key */ msg->op_id.num_keys = 0; pos = pack(size, msg->bytes, pos); - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i); @@ -1991,22 +2082,24 @@ namespace ttg_parsec { parsec_key_t hk = 0; task_t *task; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + taskstable_lock_bucket(hk); + detail::find_probe.enter(); + if (nullptr == (task = taskstable_nolock_find(hk))) { task = create_new_task(ttg::Void{}); world.impl().increment_created(); - parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); + taskstable_nolock_insert(task); } - // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes task->stream[i].goal = size; bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); - if (release) release_task(this, task); + if (release) { + release_task(this, task); + } } } @@ -2031,11 +2124,7 @@ namespace ttg_parsec { /* pack the key */ pos = pack(key, msg->bytes, pos); msg->op_id.num_keys = 1; - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i); @@ -2043,8 +2132,9 @@ namespace ttg_parsec { auto hk = reinterpret_cast(&key); task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { + taskstable_unlock_bucket(hk); ttg::print_error(world.rank(), ":", get_name(), ":", key, " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("Op::finalize called on stream that never received an input data"); @@ -2055,7 +2145,7 @@ namespace ttg_parsec { // commit changes task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); release_task(this, task); } @@ -2080,20 +2170,17 @@ namespace ttg_parsec { std::make_unique(get_instance_id(), world_impl.taskpool()->taskpool_id, msg_header_t::MSG_FINALIZE_ARGSTREAM_SIZE, i, 1); msg->op_id.num_keys = 0; - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i); } auto hk = static_cast(0); - task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + task_t *task; + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { + taskstable_unlock_bucket(hk); ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("Op::finalize called on stream that never received an input data"); @@ -2104,7 +2191,7 @@ namespace ttg_parsec { // commit changes task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); release_task(this, task); } @@ -2311,7 +2398,8 @@ namespace ttg_parsec { ? decltype(keymap)(ttg::detail::default_keymap(world)) : decltype(keymap)(std::forward(keymap_))) , priomap(decltype(keymap)(std::forward(priomap_))) - , static_stream_goal() { + , static_stream_goal() + , task_probe(name){ // Cannot call these in base constructor since terminals not yet constructed if (innames.size() != std::tuple_size::value) throw std::logic_error("ttg_parsec::OP: #input names != #input terminals"); diff --git a/ttg/ttg/util/region_probe.h b/ttg/ttg/util/region_probe.h new file mode 100644 index 000000000..4442493a1 --- /dev/null +++ b/ttg/ttg/util/region_probe.h @@ -0,0 +1,264 @@ +#ifndef TTG_REGION_PROBE_H +#define TTG_REGION_PROBE_H + +#include +#include +#include + +#include + +namespace ttg { + + namespace detail { + + enum region_probe_types { + TTG_REGION_PROBE_USER = 0, + TTG_REGION_PROBE_TASKS = 1, + TTG_REGION_PROBE_INTERNAL = 2 + }; + + template + struct ttg_enable_region_probe : std::false_type + { }; + +#if defined(TTG_ENABLE_USER_PROBES) + template<> + struct ttg_enable_region_probe : std::true_type + { }; +#endif + +#if defined(TTG_ENABLE_TASK_PROBES) + template<> + struct ttg_enable_region_probe : std::true_type + { }; +#endif + +#if defined(TTG_ENABLE_INTERNAL_PROBES) + template<> + struct ttg_enable_region_probe : std::true_type + { }; +#endif + + template::value> + struct region_probe { + private: + int enter_, exit_; + bool initialized = false; + + /* Storage: {probe, name, info_length, converter} */ + using deferred_inits_t = std::vector*, std::string, size_t, std::string>>; + + static deferred_inits_t& deferred_inits() { + static deferred_inits_t di; + return di; + } + + static bool& defer_inits() { + static bool v = true; + return v; + }; + + public: + + static void register_deferred_probes() + { + if (defer_inits()) { + for (auto&& it : deferred_inits()) { + std::get<0>(it)->init(std::get<1>(it).c_str(), std::get<2>(it), std::get<3>(it).c_str()); + } + deferred_inits().clear(); + defer_inits() = false; + } + } + + /** + * Default constructor, does not initialize the probe. + * The probe has to be initialized using \ref init before usage. + */ + region_probe() + { } + + /** + * Create and initialize a probe with the given name. + */ + region_probe(const char *name) : region_probe(name, 0, "") + { } + + /** + * Create and initialize a probe with the given name, the size of an info + * object, and the converter description. + * The info object matching the converter description may be passed to + * enter and exit to provide additional information on the event in the + * trace. + * An example for a valid converter description would be: + * "a{int32_t};b{int64_t}" + * and the corresponding structure passed to enter/exit might look like + * struct { int32_t a; int64_t b; }; + */ + region_probe(const char *name, size_t info_length, const char *converter) + { + if (defer_inits()) { + deferred_inits().emplace_back(this, name, info_length, converter); + } else { + init(name, info_length, converter); + } + } + + region_probe(const std::string& name) + : region_probe(name.c_str()) + { } + + region_probe(const std::string& name, size_t info_length, const char *converter) + : region_probe(name.c_str(), info_length, converter) + { } + + void init(const char *name) { + init(name, 0, ""); + } + + void init(const char *name, size_t info_length, const char *converter) { + assert(!initialized); + if (!initialized) { + parsec_profiling_add_dictionary_keyword(name, "#000000", info_length, converter, &enter_, &exit_); + initialized = true; + } + } + + void init(const std::string& name) { + init(name.c_str()); + } + + void init(const std::string& name, size_t info_length, const char *converter) { + init(name.c_str(), info_length, converter); + } + + void enter() { + assert(initialized); + parsec_profiling_ts_trace(enter_, 0, PROFILE_OBJECT_ID_NULL, NULL); + } + + void exit() { + parsec_profiling_ts_trace(exit_, 0, PROFILE_OBJECT_ID_NULL, NULL); + } + + template + void enter(Arg&& arg) { + assert(initialized); + parsec_profiling_ts_trace(enter_, 0, PROFILE_OBJECT_ID_NULL, &arg); + } + + template + void exit(Arg&& arg) { + parsec_profiling_ts_trace(exit_, 0, PROFILE_OBJECT_ID_NULL, &arg); + } + }; + + /* Fallback implementation if the probe was disabled */ + template + struct region_probe + { + static void register_deferred_probes() + { } + + region_probe() + { } + + region_probe(const char *) + { } + + region_probe(const char *, size_t, const char *) + { } + + region_probe(const std::string&) + { } + + region_probe(const std::string&, size_t, const char *) + { } + + void init(const char *) + { } + + void init(const char *, size_t, const char *) + { } + + void init(const std::string& ) + { } + + void init(const std::string& , size_t , const char *) + { } + + void enter() + { } + + void exit() + { } + + template + void enter(Arg&&) + { } + + template + void exit(Arg&&) + { } + }; + + template + struct region_probe_event + { + private: + region_probe& probe_; + + public: + region_probe_event(region_probe& probe) : probe_(probe) + { + probe_.enter(); + } + + template + region_probe_event(region_probe& probe, Arg&& arg) : probe_(probe) + { + probe_.enter(std::forward(arg)); + } + + ~region_probe_event() + { + probe_.exit(); + } + }; + + } // namespace detail + + /** + * TTG user probe that allows users to define custom probes that + * are inserted into PaRSEC traces. + * \see ttg::detail::region_probe for details. + * + * The probe may be defined statically with a name and TTG will take care of + * proper initialization during \ref ttg_initialize. + * Alternatively, probes can be created after \ref ttg_initialize was called, + * either with our without a name. In the latter case, the probe remains + * uninitialized until it is unitilized using the \c init() member function. + * + * Once initialized, a the member functions \c enter and \c exit can be + * used to signal the begin and end of a region. Note that it is the users + * responsibility to ensure proper balancing of enter and exit events. + * + * User probes are disabled by default. Compile with \c -DTTG_ENABLE_USER_PROBES=1 + * to enable them. + * + * NOTE: probes must be defined in the same order on all processes! + * + */ + using user_probe = detail::region_probe; + + /** + * A scoped user probe event. Upon construction, the \c enter of the + * \sa user_probe will be called. Once the event goes out of scope, + * the probe's \c exit will be called. + */ + using user_probe_event = detail::region_probe_event; + +} // namespace ttg + + +#endif // TTG_REGION_PROBE_H