1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_config.h" 18 #include "oneapi/tbb/detail/_utils.h" 19 20 #include "observer_proxy.h" 21 #include "arena.h" 22 #include "main.h" 23 #include "thread_data.h" 24 25 #include <atomic> 26 27 namespace tbb { 28 namespace detail { 29 namespace r1 { 30 31 #if TBB_USE_ASSERT 32 extern std::atomic<int> the_observer_proxy_count; 33 #endif /* TBB_USE_ASSERT */ 34 35 observer_proxy::observer_proxy( d1::task_scheduler_observer& tso ) 36 : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso) 37 { 38 #if TBB_USE_ASSERT 39 ++the_observer_proxy_count; 40 #endif /* TBB_USE_ASSERT */ 41 } 42 43 observer_proxy::~observer_proxy() { 44 __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" ); 45 poison_value(my_ref_count); 46 poison_pointer(my_prev); 47 poison_pointer(my_next); 48 #if TBB_USE_ASSERT 49 --the_observer_proxy_count; 50 #endif /* TBB_USE_ASSERT */ 51 } 52 53 void observer_list::clear() { 54 // Though the method will work fine for the empty list, we require the caller 55 // to check for the list emptiness before invoking it to avoid extra overhead. 56 __TBB_ASSERT( !empty(), NULL ); 57 { 58 scoped_lock lock(mutex(), /*is_writer=*/true); 59 observer_proxy *next = my_head.load(std::memory_order_relaxed); 60 while ( observer_proxy *p = next ) { 61 next = p->my_next; 62 // Both proxy p and observer p->my_observer (if non-null) are guaranteed 63 // to be alive while the list is locked. 64 d1::task_scheduler_observer *obs = p->my_observer; 65 // Make sure that possible concurrent observer destruction does not 66 // conflict with the proxy list cleanup. 67 if (!obs || !(p = obs->my_proxy.exchange(nullptr))) { 68 continue; 69 } 70 // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction 71 __TBB_ASSERT(!next || p == next->my_prev, nullptr); 72 __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely"); 73 __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing"); 74 poison_pointer(p->my_observer); 75 remove(p); 76 --p->my_ref_count; 77 delete p; 78 } 79 } 80 81 // If observe(false) is called concurrently with the destruction of the arena, 82 // need to wait until all proxies are removed. 83 for (atomic_backoff backoff; ; backoff.pause()) { 84 scoped_lock lock(mutex(), /*is_writer=*/false); 85 if (my_head.load(std::memory_order_relaxed) == nullptr) { 86 break; 87 } 88 } 89 90 __TBB_ASSERT(my_head.load(std::memory_order_relaxed) == nullptr && my_tail.load(std::memory_order_relaxed) == nullptr, nullptr); 91 } 92 93 void observer_list::insert( observer_proxy* p ) { 94 scoped_lock lock(mutex(), /*is_writer=*/true); 95 if (my_head.load(std::memory_order_relaxed)) { 96 p->my_prev = my_tail.load(std::memory_order_relaxed); 97 my_tail.load(std::memory_order_relaxed)->my_next = p; 98 } else { 99 my_head.store(p, std::memory_order_relaxed); 100 } 101 my_tail.store(p, std::memory_order_relaxed); 102 } 103 104 void observer_list::remove(observer_proxy* p) { 105 __TBB_ASSERT(my_head.load(std::memory_order_relaxed), "Attempt to remove an item from an empty list"); 106 __TBB_ASSERT(!my_tail.load(std::memory_order_relaxed)->my_next, "Last item's my_next must be NULL"); 107 if (p == my_tail.load(std::memory_order_relaxed)) { 108 __TBB_ASSERT(!p->my_next, nullptr); 109 my_tail.store(p->my_prev, std::memory_order_relaxed); 110 } else { 111 __TBB_ASSERT(p->my_next, nullptr); 112 p->my_next->my_prev = p->my_prev; 113 } 114 if (p == my_head.load(std::memory_order_relaxed)) { 115 __TBB_ASSERT(!p->my_prev, nullptr); 116 my_head.store(p->my_next, std::memory_order_relaxed); 117 } else { 118 __TBB_ASSERT(p->my_prev, nullptr); 119 p->my_prev->my_next = p->my_next; 120 } 121 __TBB_ASSERT((my_head.load(std::memory_order_relaxed) && my_tail.load(std::memory_order_relaxed)) || 122 (!my_head.load(std::memory_order_relaxed) && !my_tail.load(std::memory_order_relaxed)), nullptr); 123 } 124 125 void observer_list::remove_ref(observer_proxy* p) { 126 std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire); 127 __TBB_ASSERT(is_alive(r), nullptr); 128 while (r > 1) { 129 if (p->my_ref_count.compare_exchange_strong(r, r - 1)) { 130 return; 131 } 132 } 133 __TBB_ASSERT(r == 1, nullptr); 134 // Reference count might go to zero 135 { 136 // Use lock to avoid resurrection by a thread concurrently walking the list 137 observer_list::scoped_lock lock(mutex(), /*is_writer=*/true); 138 r = --p->my_ref_count; 139 if (!r) { 140 remove(p); 141 } 142 } 143 __TBB_ASSERT(r || !p->my_ref_count, nullptr); 144 if (!r) { 145 delete p; 146 } 147 } 148 149 void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) { 150 // Pointer p marches though the list from last (exclusively) to the end. 151 observer_proxy* p = last, * prev = p; 152 for (;;) { 153 d1::task_scheduler_observer* tso = nullptr; 154 // Hold lock on list only long enough to advance to the next proxy in the list. 155 { 156 scoped_lock lock(mutex(), /*is_writer=*/false); 157 do { 158 if (p) { 159 // We were already processing the list. 160 if (observer_proxy* q = p->my_next) { 161 if (p == prev) { 162 remove_ref_fast(prev); // sets prev to NULL if successful 163 } 164 p = q; 165 } else { 166 // Reached the end of the list. 167 if (p == prev) { 168 // Keep the reference as we store the 'last' pointer in scheduler 169 __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr); 170 } else { 171 // The last few proxies were empty 172 __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr); 173 ++p->my_ref_count; 174 if (prev) { 175 lock.release(); 176 remove_ref(prev); 177 } 178 } 179 last = p; 180 return; 181 } 182 } else { 183 // Starting pass through the list 184 p = my_head.load(std::memory_order_relaxed); 185 if (!p) { 186 return; 187 } 188 } 189 tso = p->my_observer; 190 } while (!tso); 191 ++p->my_ref_count; 192 ++tso->my_busy_count; 193 } 194 __TBB_ASSERT(!prev || p != prev, nullptr); 195 // Release the proxy pinned before p 196 if (prev) { 197 remove_ref(prev); 198 } 199 // Do not hold any locks on the list while calling user's code. 200 // Do not intercept any exceptions that may escape the callback so that 201 // they are either handled by the TBB scheduler or passed to the debugger. 202 tso->on_scheduler_entry(worker); 203 __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr); 204 intptr_t bc = --tso->my_busy_count; 205 __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 206 prev = p; 207 } 208 } 209 210 void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) { 211 // Pointer p marches though the list from the beginning to last (inclusively). 212 observer_proxy* p = nullptr, * prev = nullptr; 213 for (;;) { 214 d1::task_scheduler_observer* tso = nullptr; 215 // Hold lock on list only long enough to advance to the next proxy in the list. 216 { 217 scoped_lock lock(mutex(), /*is_writer=*/false); 218 do { 219 if (p) { 220 // We were already processing the list. 221 if (p != last) { 222 __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer"); 223 if (p == prev) 224 remove_ref_fast(prev); // sets prev to NULL if successful 225 p = p->my_next; 226 } else { 227 // remove the reference from the last item 228 remove_ref_fast(p); 229 if (p) { 230 lock.release(); 231 if (p != prev && prev) { 232 remove_ref(prev); 233 } 234 remove_ref(p); 235 } 236 return; 237 } 238 } else { 239 // Starting pass through the list 240 p = my_head.load(std::memory_order_relaxed); 241 __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty"); 242 } 243 tso = p->my_observer; 244 } while (!tso); 245 // The item is already refcounted 246 if (p != last) // the last is already referenced since entry notification 247 ++p->my_ref_count; 248 ++tso->my_busy_count; 249 } 250 __TBB_ASSERT(!prev || p != prev, nullptr); 251 if (prev) 252 remove_ref(prev); 253 // Do not hold any locks on the list while calling user's code. 254 // Do not intercept any exceptions that may escape the callback so that 255 // they are either handled by the TBB scheduler or passed to the debugger. 256 tso->on_scheduler_exit(worker); 257 __TBB_ASSERT(p->my_ref_count || p == last, nullptr); 258 intptr_t bc = --tso->my_busy_count; 259 __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 260 prev = p; 261 } 262 } 263 264 void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) { 265 if( enable ) { 266 if( !tso.my_proxy.load(std::memory_order_relaxed) ) { 267 observer_proxy* p = new observer_proxy(tso); 268 tso.my_proxy.store(p, std::memory_order_relaxed); 269 tso.my_busy_count.store(0, std::memory_order_relaxed); 270 271 thread_data* td = governor::get_thread_data_if_initialized(); 272 if (p->my_observer->my_task_arena == nullptr) { 273 if (!(td && td->my_arena)) { 274 td = governor::get_thread_data(); 275 } 276 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr); 277 __TBB_ASSERT(td && td->my_arena, nullptr); 278 p->my_list = &td->my_arena->my_observers; 279 } else { 280 d1::task_arena* ta = p->my_observer->my_task_arena; 281 arena* a = ta->my_arena.load(std::memory_order_acquire); 282 if (a == nullptr) { // Avoid recursion during arena initialization 283 ta->initialize(); 284 a = ta->my_arena.load(std::memory_order_relaxed); 285 } 286 __TBB_ASSERT(a != nullptr, nullptr); 287 p->my_list = &a->my_observers; 288 } 289 p->my_list->insert(p); 290 // Notify newly activated observer and other pending ones if it belongs to current arena 291 if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) { 292 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker); 293 } 294 } 295 } else { 296 // Make sure that possible concurrent proxy list cleanup does not conflict 297 // with the observer destruction here. 298 if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) { 299 // List destruction should not touch this proxy after we've won the above interlocked exchange. 300 __TBB_ASSERT( proxy->my_observer == &tso, nullptr); 301 __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" ); 302 __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" ); 303 observer_list &list = *proxy->my_list; 304 { 305 // Ensure that none of the list walkers relies on observer pointer validity 306 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true); 307 proxy->my_observer = nullptr; 308 // Proxy may still be held by other threads (to track the last notified observer) 309 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock 310 list.remove(proxy); 311 __TBB_ASSERT( !proxy->my_ref_count, NULL ); 312 delete proxy; 313 } 314 } 315 spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback 316 } 317 } 318 } 319 320 } // namespace r1 321 } // namespace detail 322 } // namespace tbb 323