1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev #include "tbb/detail/_config.h" 18*51c0b2f7Stbbdev #include "tbb/detail/_utils.h" 19*51c0b2f7Stbbdev 20*51c0b2f7Stbbdev #include "observer_proxy.h" 21*51c0b2f7Stbbdev #include "arena.h" 22*51c0b2f7Stbbdev #include "main.h" 23*51c0b2f7Stbbdev #include "thread_data.h" 24*51c0b2f7Stbbdev 25*51c0b2f7Stbbdev #include <atomic> 26*51c0b2f7Stbbdev 27*51c0b2f7Stbbdev namespace tbb { 28*51c0b2f7Stbbdev namespace detail { 29*51c0b2f7Stbbdev namespace r1 { 30*51c0b2f7Stbbdev 31*51c0b2f7Stbbdev #if TBB_USE_ASSERT 32*51c0b2f7Stbbdev extern std::atomic<int> the_observer_proxy_count; 33*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 34*51c0b2f7Stbbdev 35*51c0b2f7Stbbdev observer_proxy::observer_proxy( d1::task_scheduler_observer& tso ) 36*51c0b2f7Stbbdev : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso) 37*51c0b2f7Stbbdev { 38*51c0b2f7Stbbdev #if TBB_USE_ASSERT 39*51c0b2f7Stbbdev ++the_observer_proxy_count; 40*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 41*51c0b2f7Stbbdev } 42*51c0b2f7Stbbdev 43*51c0b2f7Stbbdev observer_proxy::~observer_proxy() { 44*51c0b2f7Stbbdev __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" ); 45*51c0b2f7Stbbdev poison_value(my_ref_count); 46*51c0b2f7Stbbdev poison_pointer(my_prev); 47*51c0b2f7Stbbdev poison_pointer(my_next); 48*51c0b2f7Stbbdev #if TBB_USE_ASSERT 49*51c0b2f7Stbbdev --the_observer_proxy_count; 50*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 51*51c0b2f7Stbbdev } 52*51c0b2f7Stbbdev 53*51c0b2f7Stbbdev void observer_list::clear() { 54*51c0b2f7Stbbdev // Though the method will work fine for the empty list, we require the caller 55*51c0b2f7Stbbdev // to check for the list emptiness before invoking it to avoid extra overhead. 56*51c0b2f7Stbbdev __TBB_ASSERT( !empty(), NULL ); 57*51c0b2f7Stbbdev { 58*51c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/true); 59*51c0b2f7Stbbdev observer_proxy *next = my_head; 60*51c0b2f7Stbbdev while ( observer_proxy *p = next ) { 61*51c0b2f7Stbbdev next = p->my_next; 62*51c0b2f7Stbbdev // Both proxy p and observer p->my_observer (if non-null) are guaranteed 63*51c0b2f7Stbbdev // to be alive while the list is locked. 64*51c0b2f7Stbbdev d1::task_scheduler_observer *obs = p->my_observer; 65*51c0b2f7Stbbdev // Make sure that possible concurrent observer destruction does not 66*51c0b2f7Stbbdev // conflict with the proxy list cleanup. 67*51c0b2f7Stbbdev if (!obs || !(p = obs->my_proxy.exchange(nullptr))) { 68*51c0b2f7Stbbdev continue; 69*51c0b2f7Stbbdev } 70*51c0b2f7Stbbdev // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction 71*51c0b2f7Stbbdev __TBB_ASSERT(!next || p == next->my_prev, nullptr); 72*51c0b2f7Stbbdev __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely"); 73*51c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing"); 74*51c0b2f7Stbbdev poison_pointer(p->my_observer); 75*51c0b2f7Stbbdev remove(p); 76*51c0b2f7Stbbdev --p->my_ref_count; 77*51c0b2f7Stbbdev delete p; 78*51c0b2f7Stbbdev } 79*51c0b2f7Stbbdev } 80*51c0b2f7Stbbdev 81*51c0b2f7Stbbdev // If observe(false) is called concurrently with the destruction of the arena, 82*51c0b2f7Stbbdev // need to wait until all proxies are removed. 83*51c0b2f7Stbbdev for (atomic_backoff backoff; ; backoff.pause()) { 84*51c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false); 85*51c0b2f7Stbbdev if (my_head == nullptr) { 86*51c0b2f7Stbbdev break; 87*51c0b2f7Stbbdev } 88*51c0b2f7Stbbdev } 89*51c0b2f7Stbbdev 90*51c0b2f7Stbbdev __TBB_ASSERT(my_head == nullptr && my_tail == nullptr, nullptr); 91*51c0b2f7Stbbdev } 92*51c0b2f7Stbbdev 93*51c0b2f7Stbbdev void observer_list::insert( observer_proxy* p ) { 94*51c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/true); 95*51c0b2f7Stbbdev if (my_head) { 96*51c0b2f7Stbbdev p->my_prev = my_tail; 97*51c0b2f7Stbbdev my_tail->my_next = p; 98*51c0b2f7Stbbdev } else { 99*51c0b2f7Stbbdev my_head = p; 100*51c0b2f7Stbbdev } 101*51c0b2f7Stbbdev my_tail = p; 102*51c0b2f7Stbbdev } 103*51c0b2f7Stbbdev 104*51c0b2f7Stbbdev void observer_list::remove(observer_proxy* p) { 105*51c0b2f7Stbbdev __TBB_ASSERT(my_head, "Attempt to remove an item from an empty list"); 106*51c0b2f7Stbbdev __TBB_ASSERT(!my_tail->my_next, "Last item's my_next must be NULL"); 107*51c0b2f7Stbbdev if (p == my_tail) { 108*51c0b2f7Stbbdev __TBB_ASSERT(!p->my_next, nullptr); 109*51c0b2f7Stbbdev my_tail = p->my_prev; 110*51c0b2f7Stbbdev } else { 111*51c0b2f7Stbbdev __TBB_ASSERT(p->my_next, nullptr); 112*51c0b2f7Stbbdev p->my_next->my_prev = p->my_prev; 113*51c0b2f7Stbbdev } 114*51c0b2f7Stbbdev if (p == my_head) { 115*51c0b2f7Stbbdev __TBB_ASSERT(!p->my_prev, nullptr); 116*51c0b2f7Stbbdev my_head = p->my_next; 117*51c0b2f7Stbbdev } else { 118*51c0b2f7Stbbdev __TBB_ASSERT(p->my_prev, nullptr); 119*51c0b2f7Stbbdev p->my_prev->my_next = p->my_next; 120*51c0b2f7Stbbdev } 121*51c0b2f7Stbbdev __TBB_ASSERT((my_head && my_tail) || (!my_head && !my_tail), nullptr); 122*51c0b2f7Stbbdev } 123*51c0b2f7Stbbdev 124*51c0b2f7Stbbdev void observer_list::remove_ref(observer_proxy* p) { 125*51c0b2f7Stbbdev std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire); 126*51c0b2f7Stbbdev __TBB_ASSERT(is_alive(r), nullptr); 127*51c0b2f7Stbbdev while (r > 1) { 128*51c0b2f7Stbbdev if (p->my_ref_count.compare_exchange_strong(r, r - 1)) { 129*51c0b2f7Stbbdev return; 130*51c0b2f7Stbbdev } 131*51c0b2f7Stbbdev } 132*51c0b2f7Stbbdev __TBB_ASSERT(r == 1, nullptr); 133*51c0b2f7Stbbdev // Reference count might go to zero 134*51c0b2f7Stbbdev { 135*51c0b2f7Stbbdev // Use lock to avoid resurrection by a thread concurrently walking the list 136*51c0b2f7Stbbdev observer_list::scoped_lock lock(mutex(), /*is_writer=*/true); 137*51c0b2f7Stbbdev r = --p->my_ref_count; 138*51c0b2f7Stbbdev if (!r) { 139*51c0b2f7Stbbdev remove(p); 140*51c0b2f7Stbbdev } 141*51c0b2f7Stbbdev } 142*51c0b2f7Stbbdev __TBB_ASSERT(r || !p->my_ref_count, nullptr); 143*51c0b2f7Stbbdev if (!r) { 144*51c0b2f7Stbbdev delete p; 145*51c0b2f7Stbbdev } 146*51c0b2f7Stbbdev } 147*51c0b2f7Stbbdev 148*51c0b2f7Stbbdev void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) { 149*51c0b2f7Stbbdev // Pointer p marches though the list from last (exclusively) to the end. 150*51c0b2f7Stbbdev observer_proxy* p = last, * prev = p; 151*51c0b2f7Stbbdev for (;;) { 152*51c0b2f7Stbbdev d1::task_scheduler_observer* tso = nullptr; 153*51c0b2f7Stbbdev // Hold lock on list only long enough to advance to the next proxy in the list. 154*51c0b2f7Stbbdev { 155*51c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false); 156*51c0b2f7Stbbdev do { 157*51c0b2f7Stbbdev if (p) { 158*51c0b2f7Stbbdev // We were already processing the list. 159*51c0b2f7Stbbdev if (observer_proxy* q = p->my_next) { 160*51c0b2f7Stbbdev if (p == prev) { 161*51c0b2f7Stbbdev remove_ref_fast(prev); // sets prev to NULL if successful 162*51c0b2f7Stbbdev } 163*51c0b2f7Stbbdev p = q; 164*51c0b2f7Stbbdev } else { 165*51c0b2f7Stbbdev // Reached the end of the list. 166*51c0b2f7Stbbdev if (p == prev) { 167*51c0b2f7Stbbdev // Keep the reference as we store the 'last' pointer in scheduler 168*51c0b2f7Stbbdev __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr); 169*51c0b2f7Stbbdev } else { 170*51c0b2f7Stbbdev // The last few proxies were empty 171*51c0b2f7Stbbdev __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr); 172*51c0b2f7Stbbdev ++p->my_ref_count; 173*51c0b2f7Stbbdev if (prev) { 174*51c0b2f7Stbbdev lock.release(); 175*51c0b2f7Stbbdev remove_ref(prev); 176*51c0b2f7Stbbdev } 177*51c0b2f7Stbbdev } 178*51c0b2f7Stbbdev last = p; 179*51c0b2f7Stbbdev return; 180*51c0b2f7Stbbdev } 181*51c0b2f7Stbbdev } else { 182*51c0b2f7Stbbdev // Starting pass through the list 183*51c0b2f7Stbbdev p = my_head; 184*51c0b2f7Stbbdev if (!p) { 185*51c0b2f7Stbbdev return; 186*51c0b2f7Stbbdev } 187*51c0b2f7Stbbdev } 188*51c0b2f7Stbbdev tso = p->my_observer; 189*51c0b2f7Stbbdev } while (!tso); 190*51c0b2f7Stbbdev ++p->my_ref_count; 191*51c0b2f7Stbbdev ++tso->my_busy_count; 192*51c0b2f7Stbbdev } 193*51c0b2f7Stbbdev __TBB_ASSERT(!prev || p != prev, nullptr); 194*51c0b2f7Stbbdev // Release the proxy pinned before p 195*51c0b2f7Stbbdev if (prev) { 196*51c0b2f7Stbbdev remove_ref(prev); 197*51c0b2f7Stbbdev } 198*51c0b2f7Stbbdev // Do not hold any locks on the list while calling user's code. 199*51c0b2f7Stbbdev // Do not intercept any exceptions that may escape the callback so that 200*51c0b2f7Stbbdev // they are either handled by the TBB scheduler or passed to the debugger. 201*51c0b2f7Stbbdev tso->on_scheduler_entry(worker); 202*51c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr); 203*51c0b2f7Stbbdev intptr_t bc = --tso->my_busy_count; 204*51c0b2f7Stbbdev __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 205*51c0b2f7Stbbdev prev = p; 206*51c0b2f7Stbbdev } 207*51c0b2f7Stbbdev } 208*51c0b2f7Stbbdev 209*51c0b2f7Stbbdev void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) { 210*51c0b2f7Stbbdev // Pointer p marches though the list from the beginning to last (inclusively). 211*51c0b2f7Stbbdev observer_proxy* p = nullptr, * prev = nullptr; 212*51c0b2f7Stbbdev for (;;) { 213*51c0b2f7Stbbdev d1::task_scheduler_observer* tso = nullptr; 214*51c0b2f7Stbbdev // Hold lock on list only long enough to advance to the next proxy in the list. 215*51c0b2f7Stbbdev { 216*51c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false); 217*51c0b2f7Stbbdev do { 218*51c0b2f7Stbbdev if (p) { 219*51c0b2f7Stbbdev // We were already processing the list. 220*51c0b2f7Stbbdev if (p != last) { 221*51c0b2f7Stbbdev __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer"); 222*51c0b2f7Stbbdev if (p == prev) 223*51c0b2f7Stbbdev remove_ref_fast(prev); // sets prev to NULL if successful 224*51c0b2f7Stbbdev p = p->my_next; 225*51c0b2f7Stbbdev } else { 226*51c0b2f7Stbbdev // remove the reference from the last item 227*51c0b2f7Stbbdev remove_ref_fast(p); 228*51c0b2f7Stbbdev if (p) { 229*51c0b2f7Stbbdev lock.release(); 230*51c0b2f7Stbbdev if (p != prev && prev) { 231*51c0b2f7Stbbdev remove_ref(prev); 232*51c0b2f7Stbbdev } 233*51c0b2f7Stbbdev remove_ref(p); 234*51c0b2f7Stbbdev } 235*51c0b2f7Stbbdev return; 236*51c0b2f7Stbbdev } 237*51c0b2f7Stbbdev } else { 238*51c0b2f7Stbbdev // Starting pass through the list 239*51c0b2f7Stbbdev p = my_head; 240*51c0b2f7Stbbdev __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty"); 241*51c0b2f7Stbbdev } 242*51c0b2f7Stbbdev tso = p->my_observer; 243*51c0b2f7Stbbdev } while (!tso); 244*51c0b2f7Stbbdev // The item is already refcounted 245*51c0b2f7Stbbdev if (p != last) // the last is already referenced since entry notification 246*51c0b2f7Stbbdev ++p->my_ref_count; 247*51c0b2f7Stbbdev ++tso->my_busy_count; 248*51c0b2f7Stbbdev } 249*51c0b2f7Stbbdev __TBB_ASSERT(!prev || p != prev, nullptr); 250*51c0b2f7Stbbdev if (prev) 251*51c0b2f7Stbbdev remove_ref(prev); 252*51c0b2f7Stbbdev // Do not hold any locks on the list while calling user's code. 253*51c0b2f7Stbbdev // Do not intercept any exceptions that may escape the callback so that 254*51c0b2f7Stbbdev // they are either handled by the TBB scheduler or passed to the debugger. 255*51c0b2f7Stbbdev tso->on_scheduler_exit(worker); 256*51c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count || p == last, nullptr); 257*51c0b2f7Stbbdev intptr_t bc = --tso->my_busy_count; 258*51c0b2f7Stbbdev __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 259*51c0b2f7Stbbdev prev = p; 260*51c0b2f7Stbbdev } 261*51c0b2f7Stbbdev } 262*51c0b2f7Stbbdev 263*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) { 264*51c0b2f7Stbbdev if( enable ) { 265*51c0b2f7Stbbdev if( !tso.my_proxy.load(std::memory_order_relaxed) ) { 266*51c0b2f7Stbbdev observer_proxy* p = new observer_proxy(tso); 267*51c0b2f7Stbbdev tso.my_proxy.store(p, std::memory_order_relaxed); 268*51c0b2f7Stbbdev tso.my_busy_count.store(0, std::memory_order_relaxed); 269*51c0b2f7Stbbdev 270*51c0b2f7Stbbdev thread_data* td = governor::get_thread_data_if_initialized(); 271*51c0b2f7Stbbdev if (p->my_observer->my_task_arena == nullptr) { 272*51c0b2f7Stbbdev if (!(td && td->my_arena)) { 273*51c0b2f7Stbbdev td = governor::get_thread_data(); 274*51c0b2f7Stbbdev } 275*51c0b2f7Stbbdev __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr); 276*51c0b2f7Stbbdev __TBB_ASSERT(td && td->my_arena, nullptr); 277*51c0b2f7Stbbdev p->my_list = &td->my_arena->my_observers; 278*51c0b2f7Stbbdev } else { 279*51c0b2f7Stbbdev d1::task_arena* ta = p->my_observer->my_task_arena; 280*51c0b2f7Stbbdev arena* a = ta->my_arena; 281*51c0b2f7Stbbdev if (a == nullptr) { // Avoid recursion during arena initialization 282*51c0b2f7Stbbdev ta->initialize(); 283*51c0b2f7Stbbdev a = ta->my_arena; 284*51c0b2f7Stbbdev } 285*51c0b2f7Stbbdev __TBB_ASSERT(a != nullptr, nullptr); 286*51c0b2f7Stbbdev p->my_list = &ta->my_arena->my_observers; 287*51c0b2f7Stbbdev } 288*51c0b2f7Stbbdev p->my_list->insert(p); 289*51c0b2f7Stbbdev // Notify newly activated observer and other pending ones if it belongs to current arena 290*51c0b2f7Stbbdev if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) { 291*51c0b2f7Stbbdev p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker); 292*51c0b2f7Stbbdev } 293*51c0b2f7Stbbdev } 294*51c0b2f7Stbbdev } else { 295*51c0b2f7Stbbdev // Make sure that possible concurrent proxy list cleanup does not conflict 296*51c0b2f7Stbbdev // with the observer destruction here. 297*51c0b2f7Stbbdev if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) { 298*51c0b2f7Stbbdev // List destruction should not touch this proxy after we've won the above interlocked exchange. 299*51c0b2f7Stbbdev __TBB_ASSERT( proxy->my_observer == &tso, nullptr); 300*51c0b2f7Stbbdev __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" ); 301*51c0b2f7Stbbdev __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" ); 302*51c0b2f7Stbbdev observer_list &list = *proxy->my_list; 303*51c0b2f7Stbbdev { 304*51c0b2f7Stbbdev // Ensure that none of the list walkers relies on observer pointer validity 305*51c0b2f7Stbbdev observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true); 306*51c0b2f7Stbbdev proxy->my_observer = nullptr; 307*51c0b2f7Stbbdev // Proxy may still be held by other threads (to track the last notified observer) 308*51c0b2f7Stbbdev if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock 309*51c0b2f7Stbbdev list.remove(proxy); 310*51c0b2f7Stbbdev __TBB_ASSERT( !proxy->my_ref_count, NULL ); 311*51c0b2f7Stbbdev delete proxy; 312*51c0b2f7Stbbdev } 313*51c0b2f7Stbbdev } 314*51c0b2f7Stbbdev spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback 315*51c0b2f7Stbbdev } 316*51c0b2f7Stbbdev } 317*51c0b2f7Stbbdev } 318*51c0b2f7Stbbdev 319*51c0b2f7Stbbdev } // namespace r1 320*51c0b2f7Stbbdev } // namespace detail 321*51c0b2f7Stbbdev } // namespace tbb 322