1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_config.h" 18 #include "oneapi/tbb/detail/_utils.h" 19 20 #include "observer_proxy.h" 21 #include "arena.h" 22 #include "main.h" 23 #include "thread_data.h" 24 25 #include <atomic> 26 27 namespace tbb { 28 namespace detail { 29 namespace r1 { 30 31 #if TBB_USE_ASSERT 32 extern std::atomic<int> the_observer_proxy_count; 33 #endif /* TBB_USE_ASSERT */ 34 35 observer_proxy::observer_proxy( d1::task_scheduler_observer& tso ) 36 : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso) 37 { 38 #if TBB_USE_ASSERT 39 ++the_observer_proxy_count; 40 #endif /* TBB_USE_ASSERT */ 41 } 42 43 observer_proxy::~observer_proxy() { 44 __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" ); 45 poison_value(my_ref_count); 46 poison_pointer(my_prev); 47 poison_pointer(my_next); 48 #if TBB_USE_ASSERT 49 --the_observer_proxy_count; 50 #endif /* TBB_USE_ASSERT */ 51 } 52 53 void observer_list::clear() { 54 // Though the method will work fine for the empty list, we require the caller 55 // to check for the list emptiness before invoking it to avoid extra overhead. 56 __TBB_ASSERT( !empty(), NULL ); 57 { 58 scoped_lock lock(mutex(), /*is_writer=*/true); 59 observer_proxy *next = my_head; 60 while ( observer_proxy *p = next ) { 61 next = p->my_next; 62 // Both proxy p and observer p->my_observer (if non-null) are guaranteed 63 // to be alive while the list is locked. 64 d1::task_scheduler_observer *obs = p->my_observer; 65 // Make sure that possible concurrent observer destruction does not 66 // conflict with the proxy list cleanup. 67 if (!obs || !(p = obs->my_proxy.exchange(nullptr))) { 68 continue; 69 } 70 // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction 71 __TBB_ASSERT(!next || p == next->my_prev, nullptr); 72 __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely"); 73 __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing"); 74 poison_pointer(p->my_observer); 75 remove(p); 76 --p->my_ref_count; 77 delete p; 78 } 79 } 80 81 // If observe(false) is called concurrently with the destruction of the arena, 82 // need to wait until all proxies are removed. 83 for (atomic_backoff backoff; ; backoff.pause()) { 84 scoped_lock lock(mutex(), /*is_writer=*/false); 85 if (my_head == nullptr) { 86 break; 87 } 88 } 89 90 __TBB_ASSERT(my_head == nullptr && my_tail == nullptr, nullptr); 91 } 92 93 void observer_list::insert( observer_proxy* p ) { 94 scoped_lock lock(mutex(), /*is_writer=*/true); 95 if (my_head) { 96 p->my_prev = my_tail; 97 my_tail->my_next = p; 98 } else { 99 my_head = p; 100 } 101 my_tail = p; 102 } 103 104 void observer_list::remove(observer_proxy* p) { 105 __TBB_ASSERT(my_head, "Attempt to remove an item from an empty list"); 106 __TBB_ASSERT(!my_tail->my_next, "Last item's my_next must be NULL"); 107 if (p == my_tail) { 108 __TBB_ASSERT(!p->my_next, nullptr); 109 my_tail = p->my_prev; 110 } else { 111 __TBB_ASSERT(p->my_next, nullptr); 112 p->my_next->my_prev = p->my_prev; 113 } 114 if (p == my_head) { 115 __TBB_ASSERT(!p->my_prev, nullptr); 116 my_head = p->my_next; 117 } else { 118 __TBB_ASSERT(p->my_prev, nullptr); 119 p->my_prev->my_next = p->my_next; 120 } 121 __TBB_ASSERT((my_head && my_tail) || (!my_head && !my_tail), nullptr); 122 } 123 124 void observer_list::remove_ref(observer_proxy* p) { 125 std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire); 126 __TBB_ASSERT(is_alive(r), nullptr); 127 while (r > 1) { 128 if (p->my_ref_count.compare_exchange_strong(r, r - 1)) { 129 return; 130 } 131 } 132 __TBB_ASSERT(r == 1, nullptr); 133 // Reference count might go to zero 134 { 135 // Use lock to avoid resurrection by a thread concurrently walking the list 136 observer_list::scoped_lock lock(mutex(), /*is_writer=*/true); 137 r = --p->my_ref_count; 138 if (!r) { 139 remove(p); 140 } 141 } 142 __TBB_ASSERT(r || !p->my_ref_count, nullptr); 143 if (!r) { 144 delete p; 145 } 146 } 147 148 void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) { 149 // Pointer p marches though the list from last (exclusively) to the end. 150 observer_proxy* p = last, * prev = p; 151 for (;;) { 152 d1::task_scheduler_observer* tso = nullptr; 153 // Hold lock on list only long enough to advance to the next proxy in the list. 154 { 155 scoped_lock lock(mutex(), /*is_writer=*/false); 156 do { 157 if (p) { 158 // We were already processing the list. 159 if (observer_proxy* q = p->my_next) { 160 if (p == prev) { 161 remove_ref_fast(prev); // sets prev to NULL if successful 162 } 163 p = q; 164 } else { 165 // Reached the end of the list. 166 if (p == prev) { 167 // Keep the reference as we store the 'last' pointer in scheduler 168 __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr); 169 } else { 170 // The last few proxies were empty 171 __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr); 172 ++p->my_ref_count; 173 if (prev) { 174 lock.release(); 175 remove_ref(prev); 176 } 177 } 178 last = p; 179 return; 180 } 181 } else { 182 // Starting pass through the list 183 p = my_head; 184 if (!p) { 185 return; 186 } 187 } 188 tso = p->my_observer; 189 } while (!tso); 190 ++p->my_ref_count; 191 ++tso->my_busy_count; 192 } 193 __TBB_ASSERT(!prev || p != prev, nullptr); 194 // Release the proxy pinned before p 195 if (prev) { 196 remove_ref(prev); 197 } 198 // Do not hold any locks on the list while calling user's code. 199 // Do not intercept any exceptions that may escape the callback so that 200 // they are either handled by the TBB scheduler or passed to the debugger. 201 tso->on_scheduler_entry(worker); 202 __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr); 203 intptr_t bc = --tso->my_busy_count; 204 __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 205 prev = p; 206 } 207 } 208 209 void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) { 210 // Pointer p marches though the list from the beginning to last (inclusively). 211 observer_proxy* p = nullptr, * prev = nullptr; 212 for (;;) { 213 d1::task_scheduler_observer* tso = nullptr; 214 // Hold lock on list only long enough to advance to the next proxy in the list. 215 { 216 scoped_lock lock(mutex(), /*is_writer=*/false); 217 do { 218 if (p) { 219 // We were already processing the list. 220 if (p != last) { 221 __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer"); 222 if (p == prev) 223 remove_ref_fast(prev); // sets prev to NULL if successful 224 p = p->my_next; 225 } else { 226 // remove the reference from the last item 227 remove_ref_fast(p); 228 if (p) { 229 lock.release(); 230 if (p != prev && prev) { 231 remove_ref(prev); 232 } 233 remove_ref(p); 234 } 235 return; 236 } 237 } else { 238 // Starting pass through the list 239 p = my_head; 240 __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty"); 241 } 242 tso = p->my_observer; 243 } while (!tso); 244 // The item is already refcounted 245 if (p != last) // the last is already referenced since entry notification 246 ++p->my_ref_count; 247 ++tso->my_busy_count; 248 } 249 __TBB_ASSERT(!prev || p != prev, nullptr); 250 if (prev) 251 remove_ref(prev); 252 // Do not hold any locks on the list while calling user's code. 253 // Do not intercept any exceptions that may escape the callback so that 254 // they are either handled by the TBB scheduler or passed to the debugger. 255 tso->on_scheduler_exit(worker); 256 __TBB_ASSERT(p->my_ref_count || p == last, nullptr); 257 intptr_t bc = --tso->my_busy_count; 258 __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed"); 259 prev = p; 260 } 261 } 262 263 void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) { 264 if( enable ) { 265 if( !tso.my_proxy.load(std::memory_order_relaxed) ) { 266 observer_proxy* p = new observer_proxy(tso); 267 tso.my_proxy.store(p, std::memory_order_relaxed); 268 tso.my_busy_count.store(0, std::memory_order_relaxed); 269 270 thread_data* td = governor::get_thread_data_if_initialized(); 271 if (p->my_observer->my_task_arena == nullptr) { 272 if (!(td && td->my_arena)) { 273 td = governor::get_thread_data(); 274 } 275 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr); 276 __TBB_ASSERT(td && td->my_arena, nullptr); 277 p->my_list = &td->my_arena->my_observers; 278 } else { 279 d1::task_arena* ta = p->my_observer->my_task_arena; 280 arena* a = ta->my_arena; 281 if (a == nullptr) { // Avoid recursion during arena initialization 282 ta->initialize(); 283 a = ta->my_arena; 284 } 285 __TBB_ASSERT(a != nullptr, nullptr); 286 p->my_list = &ta->my_arena->my_observers; 287 } 288 p->my_list->insert(p); 289 // Notify newly activated observer and other pending ones if it belongs to current arena 290 if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) { 291 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker); 292 } 293 } 294 } else { 295 // Make sure that possible concurrent proxy list cleanup does not conflict 296 // with the observer destruction here. 297 if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) { 298 // List destruction should not touch this proxy after we've won the above interlocked exchange. 299 __TBB_ASSERT( proxy->my_observer == &tso, nullptr); 300 __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" ); 301 __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" ); 302 observer_list &list = *proxy->my_list; 303 { 304 // Ensure that none of the list walkers relies on observer pointer validity 305 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true); 306 proxy->my_observer = nullptr; 307 // Proxy may still be held by other threads (to track the last notified observer) 308 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock 309 list.remove(proxy); 310 __TBB_ASSERT( !proxy->my_ref_count, NULL ); 311 delete proxy; 312 } 313 } 314 spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback 315 } 316 } 317 } 318 319 } // namespace r1 320 } // namespace detail 321 } // namespace tbb 322