151c0b2f7Stbbdev /* 251c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17*49e08aacStbbdev #include "oneapi/tbb/detail/_config.h" 18*49e08aacStbbdev #include "oneapi/tbb/detail/_utils.h" 1951c0b2f7Stbbdev 2051c0b2f7Stbbdev #include "observer_proxy.h" 2151c0b2f7Stbbdev #include "arena.h" 2251c0b2f7Stbbdev #include "main.h" 2351c0b2f7Stbbdev #include "thread_data.h" 2451c0b2f7Stbbdev 2551c0b2f7Stbbdev #include <atomic> 2651c0b2f7Stbbdev 2751c0b2f7Stbbdev namespace tbb { 2851c0b2f7Stbbdev namespace detail { 2951c0b2f7Stbbdev namespace r1 { 3051c0b2f7Stbbdev 3151c0b2f7Stbbdev #if TBB_USE_ASSERT 3251c0b2f7Stbbdev extern std::atomic<int> the_observer_proxy_count; 3351c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 3451c0b2f7Stbbdev 3551c0b2f7Stbbdev observer_proxy::observer_proxy( d1::task_scheduler_observer& tso ) 3651c0b2f7Stbbdev : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso) 3751c0b2f7Stbbdev { 3851c0b2f7Stbbdev #if TBB_USE_ASSERT 3951c0b2f7Stbbdev ++the_observer_proxy_count; 4051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 4151c0b2f7Stbbdev } 4251c0b2f7Stbbdev 4351c0b2f7Stbbdev observer_proxy::~observer_proxy() { 4451c0b2f7Stbbdev __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" ); 4551c0b2f7Stbbdev poison_value(my_ref_count); 4651c0b2f7Stbbdev poison_pointer(my_prev); 4751c0b2f7Stbbdev poison_pointer(my_next); 4851c0b2f7Stbbdev #if TBB_USE_ASSERT 4951c0b2f7Stbbdev --the_observer_proxy_count; 5051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 5151c0b2f7Stbbdev } 5251c0b2f7Stbbdev 5351c0b2f7Stbbdev void observer_list::clear() { 5451c0b2f7Stbbdev // Though the method will work fine for the empty list, we require the caller 5551c0b2f7Stbbdev // to check for the list emptiness before invoking it to avoid extra overhead. 5651c0b2f7Stbbdev __TBB_ASSERT( !empty(), NULL ); 5751c0b2f7Stbbdev { 5851c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/true); 5951c0b2f7Stbbdev observer_proxy *next = my_head; 6051c0b2f7Stbbdev while ( observer_proxy *p = next ) { 6151c0b2f7Stbbdev next = p->my_next; 6251c0b2f7Stbbdev // Both proxy p and observer p->my_observer (if non-null) are guaranteed 6351c0b2f7Stbbdev // to be alive while the list is locked. 6451c0b2f7Stbbdev d1::task_scheduler_observer *obs = p->my_observer; 6551c0b2f7Stbbdev // Make sure that possible concurrent observer destruction does not 6651c0b2f7Stbbdev // conflict with the proxy list cleanup. 6751c0b2f7Stbbdev if (!obs || !(p = obs->my_proxy.exchange(nullptr))) { 6851c0b2f7Stbbdev continue; 6951c0b2f7Stbbdev } 7051c0b2f7Stbbdev // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction 7151c0b2f7Stbbdev __TBB_ASSERT(!next || p == next->my_prev, nullptr); 7251c0b2f7Stbbdev __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely"); 7351c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing"); 7451c0b2f7Stbbdev poison_pointer(p->my_observer); 7551c0b2f7Stbbdev remove(p); 7651c0b2f7Stbbdev --p->my_ref_count; 7751c0b2f7Stbbdev delete p; 7851c0b2f7Stbbdev } 7951c0b2f7Stbbdev } 8051c0b2f7Stbbdev 8151c0b2f7Stbbdev // If observe(false) is called concurrently with the destruction of the arena, 8251c0b2f7Stbbdev // need to wait until all proxies are removed. 8351c0b2f7Stbbdev for (atomic_backoff backoff; ; backoff.pause()) { 8451c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false); 8551c0b2f7Stbbdev if (my_head == nullptr) { 8651c0b2f7Stbbdev break; 8751c0b2f7Stbbdev } 8851c0b2f7Stbbdev } 8951c0b2f7Stbbdev 9051c0b2f7Stbbdev __TBB_ASSERT(my_head == nullptr && my_tail == nullptr, nullptr); 9151c0b2f7Stbbdev } 9251c0b2f7Stbbdev 9351c0b2f7Stbbdev void observer_list::insert( observer_proxy* p ) { 9451c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/true); 9551c0b2f7Stbbdev if (my_head) { 9651c0b2f7Stbbdev p->my_prev = my_tail; 9751c0b2f7Stbbdev my_tail->my_next = p; 9851c0b2f7Stbbdev } else { 9951c0b2f7Stbbdev my_head = p; 10051c0b2f7Stbbdev } 10151c0b2f7Stbbdev my_tail = p; 10251c0b2f7Stbbdev } 10351c0b2f7Stbbdev 10451c0b2f7Stbbdev void observer_list::remove(observer_proxy* p) { 10551c0b2f7Stbbdev __TBB_ASSERT(my_head, "Attempt to remove an item from an empty list"); 10651c0b2f7Stbbdev __TBB_ASSERT(!my_tail->my_next, "Last item's my_next must be NULL"); 10751c0b2f7Stbbdev if (p == my_tail) { 10851c0b2f7Stbbdev __TBB_ASSERT(!p->my_next, nullptr); 10951c0b2f7Stbbdev my_tail = p->my_prev; 11051c0b2f7Stbbdev } else { 11151c0b2f7Stbbdev __TBB_ASSERT(p->my_next, nullptr); 11251c0b2f7Stbbdev p->my_next->my_prev = p->my_prev; 11351c0b2f7Stbbdev } 11451c0b2f7Stbbdev if (p == my_head) { 11551c0b2f7Stbbdev __TBB_ASSERT(!p->my_prev, nullptr); 11651c0b2f7Stbbdev my_head = p->my_next; 11751c0b2f7Stbbdev } else { 11851c0b2f7Stbbdev __TBB_ASSERT(p->my_prev, nullptr); 11951c0b2f7Stbbdev p->my_prev->my_next = p->my_next; 12051c0b2f7Stbbdev } 12151c0b2f7Stbbdev __TBB_ASSERT((my_head && my_tail) || (!my_head && !my_tail), nullptr); 12251c0b2f7Stbbdev } 12351c0b2f7Stbbdev 12451c0b2f7Stbbdev void observer_list::remove_ref(observer_proxy* p) { 12551c0b2f7Stbbdev std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire); 12651c0b2f7Stbbdev __TBB_ASSERT(is_alive(r), nullptr); 12751c0b2f7Stbbdev while (r > 1) { 12851c0b2f7Stbbdev if (p->my_ref_count.compare_exchange_strong(r, r - 1)) { 12951c0b2f7Stbbdev return; 13051c0b2f7Stbbdev } 13151c0b2f7Stbbdev } 13251c0b2f7Stbbdev __TBB_ASSERT(r == 1, nullptr); 13351c0b2f7Stbbdev // Reference count might go to zero 13451c0b2f7Stbbdev { 13551c0b2f7Stbbdev // Use lock to avoid resurrection by a thread concurrently walking the list 13651c0b2f7Stbbdev observer_list::scoped_lock lock(mutex(), /*is_writer=*/true); 13751c0b2f7Stbbdev r = --p->my_ref_count; 13851c0b2f7Stbbdev if (!r) { 13951c0b2f7Stbbdev remove(p); 14051c0b2f7Stbbdev } 14151c0b2f7Stbbdev } 14251c0b2f7Stbbdev __TBB_ASSERT(r || !p->my_ref_count, nullptr); 14351c0b2f7Stbbdev if (!r) { 14451c0b2f7Stbbdev delete p; 14551c0b2f7Stbbdev } 14651c0b2f7Stbbdev } 14751c0b2f7Stbbdev 14851c0b2f7Stbbdev void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) { 14951c0b2f7Stbbdev // Pointer p marches though the list from last (exclusively) to the end. 15051c0b2f7Stbbdev observer_proxy* p = last, * prev = p; 15151c0b2f7Stbbdev for (;;) { 15251c0b2f7Stbbdev d1::task_scheduler_observer* tso = nullptr; 15351c0b2f7Stbbdev // Hold lock on list only long enough to advance to the next proxy in the list. 15451c0b2f7Stbbdev { 15551c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false); 15651c0b2f7Stbbdev do { 15751c0b2f7Stbbdev if (p) { 15851c0b2f7Stbbdev // We were already processing the list. 15951c0b2f7Stbbdev if (observer_proxy* q = p->my_next) { 16051c0b2f7Stbbdev if (p == prev) { 16151c0b2f7Stbbdev remove_ref_fast(prev); // sets prev to NULL if successful 16251c0b2f7Stbbdev } 16351c0b2f7Stbbdev p = q; 16451c0b2f7Stbbdev } else { 16551c0b2f7Stbbdev // Reached the end of the list. 16651c0b2f7Stbbdev if (p == prev) { 16751c0b2f7Stbbdev // Keep the reference as we store the 'last' pointer in scheduler 16851c0b2f7Stbbdev __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr); 16951c0b2f7Stbbdev } else { 17051c0b2f7Stbbdev // The last few proxies were empty 17151c0b2f7Stbbdev __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr); 17251c0b2f7Stbbdev ++p->my_ref_count; 17351c0b2f7Stbbdev if (prev) { 17451c0b2f7Stbbdev lock.release(); 17551c0b2f7Stbbdev remove_ref(prev); 17651c0b2f7Stbbdev } 17751c0b2f7Stbbdev } 17851c0b2f7Stbbdev last = p; 17951c0b2f7Stbbdev return; 18051c0b2f7Stbbdev } 18151c0b2f7Stbbdev } else { 18251c0b2f7Stbbdev // Starting pass through the list 18351c0b2f7Stbbdev p = my_head; 18451c0b2f7Stbbdev if (!p) { 18551c0b2f7Stbbdev return; 18651c0b2f7Stbbdev } 18751c0b2f7Stbbdev } 18851c0b2f7Stbbdev tso = p->my_observer; 18951c0b2f7Stbbdev } while (!tso); 19051c0b2f7Stbbdev ++p->my_ref_count; 19151c0b2f7Stbbdev ++tso->my_busy_count; 19251c0b2f7Stbbdev } 19351c0b2f7Stbbdev __TBB_ASSERT(!prev || p != prev, nullptr); 19451c0b2f7Stbbdev // Release the proxy pinned before p 19551c0b2f7Stbbdev if (prev) { 19651c0b2f7Stbbdev remove_ref(prev); 19751c0b2f7Stbbdev } 19851c0b2f7Stbbdev // Do not hold any locks on the list while calling user's code. 19951c0b2f7Stbbdev // Do not intercept any exceptions that may escape the callback so that 20051c0b2f7Stbbdev // they are either handled by the TBB scheduler or passed to the debugger. 20151c0b2f7Stbbdev tso->on_scheduler_entry(worker); 20251c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr); 20351c0b2f7Stbbdev intptr_t bc = --tso->my_busy_count; 20451c0b2f7Stbbdev __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 20551c0b2f7Stbbdev prev = p; 20651c0b2f7Stbbdev } 20751c0b2f7Stbbdev } 20851c0b2f7Stbbdev 20951c0b2f7Stbbdev void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) { 21051c0b2f7Stbbdev // Pointer p marches though the list from the beginning to last (inclusively). 21151c0b2f7Stbbdev observer_proxy* p = nullptr, * prev = nullptr; 21251c0b2f7Stbbdev for (;;) { 21351c0b2f7Stbbdev d1::task_scheduler_observer* tso = nullptr; 21451c0b2f7Stbbdev // Hold lock on list only long enough to advance to the next proxy in the list. 21551c0b2f7Stbbdev { 21651c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false); 21751c0b2f7Stbbdev do { 21851c0b2f7Stbbdev if (p) { 21951c0b2f7Stbbdev // We were already processing the list. 22051c0b2f7Stbbdev if (p != last) { 22151c0b2f7Stbbdev __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer"); 22251c0b2f7Stbbdev if (p == prev) 22351c0b2f7Stbbdev remove_ref_fast(prev); // sets prev to NULL if successful 22451c0b2f7Stbbdev p = p->my_next; 22551c0b2f7Stbbdev } else { 22651c0b2f7Stbbdev // remove the reference from the last item 22751c0b2f7Stbbdev remove_ref_fast(p); 22851c0b2f7Stbbdev if (p) { 22951c0b2f7Stbbdev lock.release(); 23051c0b2f7Stbbdev if (p != prev && prev) { 23151c0b2f7Stbbdev remove_ref(prev); 23251c0b2f7Stbbdev } 23351c0b2f7Stbbdev remove_ref(p); 23451c0b2f7Stbbdev } 23551c0b2f7Stbbdev return; 23651c0b2f7Stbbdev } 23751c0b2f7Stbbdev } else { 23851c0b2f7Stbbdev // Starting pass through the list 23951c0b2f7Stbbdev p = my_head; 24051c0b2f7Stbbdev __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty"); 24151c0b2f7Stbbdev } 24251c0b2f7Stbbdev tso = p->my_observer; 24351c0b2f7Stbbdev } while (!tso); 24451c0b2f7Stbbdev // The item is already refcounted 24551c0b2f7Stbbdev if (p != last) // the last is already referenced since entry notification 24651c0b2f7Stbbdev ++p->my_ref_count; 24751c0b2f7Stbbdev ++tso->my_busy_count; 24851c0b2f7Stbbdev } 24951c0b2f7Stbbdev __TBB_ASSERT(!prev || p != prev, nullptr); 25051c0b2f7Stbbdev if (prev) 25151c0b2f7Stbbdev remove_ref(prev); 25251c0b2f7Stbbdev // Do not hold any locks on the list while calling user's code. 25351c0b2f7Stbbdev // Do not intercept any exceptions that may escape the callback so that 25451c0b2f7Stbbdev // they are either handled by the TBB scheduler or passed to the debugger. 25551c0b2f7Stbbdev tso->on_scheduler_exit(worker); 25651c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count || p == last, nullptr); 25751c0b2f7Stbbdev intptr_t bc = --tso->my_busy_count; 25851c0b2f7Stbbdev __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 25951c0b2f7Stbbdev prev = p; 26051c0b2f7Stbbdev } 26151c0b2f7Stbbdev } 26251c0b2f7Stbbdev 26351c0b2f7Stbbdev void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) { 26451c0b2f7Stbbdev if( enable ) { 26551c0b2f7Stbbdev if( !tso.my_proxy.load(std::memory_order_relaxed) ) { 26651c0b2f7Stbbdev observer_proxy* p = new observer_proxy(tso); 26751c0b2f7Stbbdev tso.my_proxy.store(p, std::memory_order_relaxed); 26851c0b2f7Stbbdev tso.my_busy_count.store(0, std::memory_order_relaxed); 26951c0b2f7Stbbdev 27051c0b2f7Stbbdev thread_data* td = governor::get_thread_data_if_initialized(); 27151c0b2f7Stbbdev if (p->my_observer->my_task_arena == nullptr) { 27251c0b2f7Stbbdev if (!(td && td->my_arena)) { 27351c0b2f7Stbbdev td = governor::get_thread_data(); 27451c0b2f7Stbbdev } 27551c0b2f7Stbbdev __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr); 27651c0b2f7Stbbdev __TBB_ASSERT(td && td->my_arena, nullptr); 27751c0b2f7Stbbdev p->my_list = &td->my_arena->my_observers; 27851c0b2f7Stbbdev } else { 27951c0b2f7Stbbdev d1::task_arena* ta = p->my_observer->my_task_arena; 28051c0b2f7Stbbdev arena* a = ta->my_arena; 28151c0b2f7Stbbdev if (a == nullptr) { // Avoid recursion during arena initialization 28251c0b2f7Stbbdev ta->initialize(); 28351c0b2f7Stbbdev a = ta->my_arena; 28451c0b2f7Stbbdev } 28551c0b2f7Stbbdev __TBB_ASSERT(a != nullptr, nullptr); 28651c0b2f7Stbbdev p->my_list = &ta->my_arena->my_observers; 28751c0b2f7Stbbdev } 28851c0b2f7Stbbdev p->my_list->insert(p); 28951c0b2f7Stbbdev // Notify newly activated observer and other pending ones if it belongs to current arena 29051c0b2f7Stbbdev if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) { 29151c0b2f7Stbbdev p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker); 29251c0b2f7Stbbdev } 29351c0b2f7Stbbdev } 29451c0b2f7Stbbdev } else { 29551c0b2f7Stbbdev // Make sure that possible concurrent proxy list cleanup does not conflict 29651c0b2f7Stbbdev // with the observer destruction here. 29751c0b2f7Stbbdev if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) { 29851c0b2f7Stbbdev // List destruction should not touch this proxy after we've won the above interlocked exchange. 29951c0b2f7Stbbdev __TBB_ASSERT( proxy->my_observer == &tso, nullptr); 30051c0b2f7Stbbdev __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" ); 30151c0b2f7Stbbdev __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" ); 30251c0b2f7Stbbdev observer_list &list = *proxy->my_list; 30351c0b2f7Stbbdev { 30451c0b2f7Stbbdev // Ensure that none of the list walkers relies on observer pointer validity 30551c0b2f7Stbbdev observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true); 30651c0b2f7Stbbdev proxy->my_observer = nullptr; 30751c0b2f7Stbbdev // Proxy may still be held by other threads (to track the last notified observer) 30851c0b2f7Stbbdev if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock 30951c0b2f7Stbbdev list.remove(proxy); 31051c0b2f7Stbbdev __TBB_ASSERT( !proxy->my_ref_count, NULL ); 31151c0b2f7Stbbdev delete proxy; 31251c0b2f7Stbbdev } 31351c0b2f7Stbbdev } 31451c0b2f7Stbbdev spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback 31551c0b2f7Stbbdev } 31651c0b2f7Stbbdev } 31751c0b2f7Stbbdev } 31851c0b2f7Stbbdev 31951c0b2f7Stbbdev } // namespace r1 32051c0b2f7Stbbdev } // namespace detail 32151c0b2f7Stbbdev } // namespace tbb 322