xref: /oneTBB/src/tbb/observer_proxy.cpp (revision b15aabb3)
151c0b2f7Stbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1749e08aacStbbdev #include "oneapi/tbb/detail/_config.h"
1849e08aacStbbdev #include "oneapi/tbb/detail/_utils.h"
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev #include "observer_proxy.h"
2151c0b2f7Stbbdev #include "arena.h"
2251c0b2f7Stbbdev #include "main.h"
2351c0b2f7Stbbdev #include "thread_data.h"
2451c0b2f7Stbbdev 
2551c0b2f7Stbbdev #include <atomic>
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev namespace tbb {
2851c0b2f7Stbbdev namespace detail {
2951c0b2f7Stbbdev namespace r1 {
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev #if TBB_USE_ASSERT
3251c0b2f7Stbbdev extern std::atomic<int> the_observer_proxy_count;
3351c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
3451c0b2f7Stbbdev 
3551c0b2f7Stbbdev observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
3651c0b2f7Stbbdev     : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
3751c0b2f7Stbbdev {
3851c0b2f7Stbbdev #if TBB_USE_ASSERT
3951c0b2f7Stbbdev     ++the_observer_proxy_count;
4051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
4151c0b2f7Stbbdev }
4251c0b2f7Stbbdev 
4351c0b2f7Stbbdev observer_proxy::~observer_proxy() {
4451c0b2f7Stbbdev     __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
4551c0b2f7Stbbdev     poison_value(my_ref_count);
4651c0b2f7Stbbdev     poison_pointer(my_prev);
4751c0b2f7Stbbdev     poison_pointer(my_next);
4851c0b2f7Stbbdev #if TBB_USE_ASSERT
4951c0b2f7Stbbdev     --the_observer_proxy_count;
5051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
5151c0b2f7Stbbdev }
5251c0b2f7Stbbdev 
5351c0b2f7Stbbdev void observer_list::clear() {
5451c0b2f7Stbbdev     // Though the method will work fine for the empty list, we require the caller
5551c0b2f7Stbbdev     // to check for the list emptiness before invoking it to avoid extra overhead.
5651c0b2f7Stbbdev     __TBB_ASSERT( !empty(), NULL );
5751c0b2f7Stbbdev     {
5851c0b2f7Stbbdev         scoped_lock lock(mutex(), /*is_writer=*/true);
59*b15aabb3Stbbdev         observer_proxy *next = my_head.load(std::memory_order_relaxed);
6051c0b2f7Stbbdev         while ( observer_proxy *p = next ) {
6151c0b2f7Stbbdev             next = p->my_next;
6251c0b2f7Stbbdev             // Both proxy p and observer p->my_observer (if non-null) are guaranteed
6351c0b2f7Stbbdev             // to be alive while the list is locked.
6451c0b2f7Stbbdev             d1::task_scheduler_observer *obs = p->my_observer;
6551c0b2f7Stbbdev             // Make sure that possible concurrent observer destruction does not
6651c0b2f7Stbbdev             // conflict with the proxy list cleanup.
6751c0b2f7Stbbdev             if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
6851c0b2f7Stbbdev                 continue;
6951c0b2f7Stbbdev             }
7051c0b2f7Stbbdev             // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
7151c0b2f7Stbbdev             __TBB_ASSERT(!next || p == next->my_prev, nullptr);
7251c0b2f7Stbbdev             __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
7351c0b2f7Stbbdev             __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
7451c0b2f7Stbbdev             poison_pointer(p->my_observer);
7551c0b2f7Stbbdev             remove(p);
7651c0b2f7Stbbdev             --p->my_ref_count;
7751c0b2f7Stbbdev             delete p;
7851c0b2f7Stbbdev         }
7951c0b2f7Stbbdev     }
8051c0b2f7Stbbdev 
8151c0b2f7Stbbdev     // If observe(false) is called concurrently with the destruction of the arena,
8251c0b2f7Stbbdev     // need to wait until all proxies are removed.
8351c0b2f7Stbbdev     for (atomic_backoff backoff; ; backoff.pause()) {
8451c0b2f7Stbbdev         scoped_lock lock(mutex(), /*is_writer=*/false);
85*b15aabb3Stbbdev         if (my_head.load(std::memory_order_relaxed) == nullptr) {
8651c0b2f7Stbbdev             break;
8751c0b2f7Stbbdev         }
8851c0b2f7Stbbdev     }
8951c0b2f7Stbbdev 
90*b15aabb3Stbbdev     __TBB_ASSERT(my_head.load(std::memory_order_relaxed) == nullptr && my_tail.load(std::memory_order_relaxed) == nullptr, nullptr);
9151c0b2f7Stbbdev }
9251c0b2f7Stbbdev 
9351c0b2f7Stbbdev void observer_list::insert( observer_proxy* p ) {
9451c0b2f7Stbbdev     scoped_lock lock(mutex(), /*is_writer=*/true);
95*b15aabb3Stbbdev     if (my_head.load(std::memory_order_relaxed)) {
96*b15aabb3Stbbdev         p->my_prev = my_tail.load(std::memory_order_relaxed);
97*b15aabb3Stbbdev         my_tail.load(std::memory_order_relaxed)->my_next = p;
9851c0b2f7Stbbdev     } else {
99*b15aabb3Stbbdev         my_head.store(p, std::memory_order_relaxed);
10051c0b2f7Stbbdev     }
101*b15aabb3Stbbdev     my_tail.store(p, std::memory_order_relaxed);
10251c0b2f7Stbbdev }
10351c0b2f7Stbbdev 
10451c0b2f7Stbbdev void observer_list::remove(observer_proxy* p) {
105*b15aabb3Stbbdev     __TBB_ASSERT(my_head.load(std::memory_order_relaxed), "Attempt to remove an item from an empty list");
106*b15aabb3Stbbdev     __TBB_ASSERT(!my_tail.load(std::memory_order_relaxed)->my_next, "Last item's my_next must be NULL");
107*b15aabb3Stbbdev     if (p == my_tail.load(std::memory_order_relaxed)) {
10851c0b2f7Stbbdev         __TBB_ASSERT(!p->my_next, nullptr);
109*b15aabb3Stbbdev         my_tail.store(p->my_prev, std::memory_order_relaxed);
11051c0b2f7Stbbdev     } else {
11151c0b2f7Stbbdev         __TBB_ASSERT(p->my_next, nullptr);
11251c0b2f7Stbbdev         p->my_next->my_prev = p->my_prev;
11351c0b2f7Stbbdev     }
114*b15aabb3Stbbdev     if (p == my_head.load(std::memory_order_relaxed)) {
11551c0b2f7Stbbdev         __TBB_ASSERT(!p->my_prev, nullptr);
116*b15aabb3Stbbdev         my_head.store(p->my_next, std::memory_order_relaxed);
11751c0b2f7Stbbdev     } else {
11851c0b2f7Stbbdev         __TBB_ASSERT(p->my_prev, nullptr);
11951c0b2f7Stbbdev         p->my_prev->my_next = p->my_next;
12051c0b2f7Stbbdev     }
121*b15aabb3Stbbdev     __TBB_ASSERT((my_head.load(std::memory_order_relaxed) && my_tail.load(std::memory_order_relaxed)) ||
122*b15aabb3Stbbdev         (!my_head.load(std::memory_order_relaxed) && !my_tail.load(std::memory_order_relaxed)), nullptr);
12351c0b2f7Stbbdev }
12451c0b2f7Stbbdev 
12551c0b2f7Stbbdev void observer_list::remove_ref(observer_proxy* p) {
12651c0b2f7Stbbdev     std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
12751c0b2f7Stbbdev     __TBB_ASSERT(is_alive(r), nullptr);
12851c0b2f7Stbbdev     while (r > 1) {
12951c0b2f7Stbbdev         if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
13051c0b2f7Stbbdev             return;
13151c0b2f7Stbbdev         }
13251c0b2f7Stbbdev     }
13351c0b2f7Stbbdev     __TBB_ASSERT(r == 1, nullptr);
13451c0b2f7Stbbdev     // Reference count might go to zero
13551c0b2f7Stbbdev     {
13651c0b2f7Stbbdev         // Use lock to avoid resurrection by a thread concurrently walking the list
13751c0b2f7Stbbdev         observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
13851c0b2f7Stbbdev         r = --p->my_ref_count;
13951c0b2f7Stbbdev         if (!r) {
14051c0b2f7Stbbdev             remove(p);
14151c0b2f7Stbbdev         }
14251c0b2f7Stbbdev     }
14351c0b2f7Stbbdev     __TBB_ASSERT(r || !p->my_ref_count, nullptr);
14451c0b2f7Stbbdev     if (!r) {
14551c0b2f7Stbbdev         delete p;
14651c0b2f7Stbbdev     }
14751c0b2f7Stbbdev }
14851c0b2f7Stbbdev 
14951c0b2f7Stbbdev void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
15051c0b2f7Stbbdev     // Pointer p marches though the list from last (exclusively) to the end.
15151c0b2f7Stbbdev     observer_proxy* p = last, * prev = p;
15251c0b2f7Stbbdev     for (;;) {
15351c0b2f7Stbbdev         d1::task_scheduler_observer* tso = nullptr;
15451c0b2f7Stbbdev         // Hold lock on list only long enough to advance to the next proxy in the list.
15551c0b2f7Stbbdev         {
15651c0b2f7Stbbdev             scoped_lock lock(mutex(), /*is_writer=*/false);
15751c0b2f7Stbbdev             do {
15851c0b2f7Stbbdev                 if (p) {
15951c0b2f7Stbbdev                     // We were already processing the list.
16051c0b2f7Stbbdev                     if (observer_proxy* q = p->my_next) {
16151c0b2f7Stbbdev                         if (p == prev) {
16251c0b2f7Stbbdev                             remove_ref_fast(prev); // sets prev to NULL if successful
16351c0b2f7Stbbdev                         }
16451c0b2f7Stbbdev                         p = q;
16551c0b2f7Stbbdev                     } else {
16651c0b2f7Stbbdev                         // Reached the end of the list.
16751c0b2f7Stbbdev                         if (p == prev) {
16851c0b2f7Stbbdev                             // Keep the reference as we store the 'last' pointer in scheduler
16951c0b2f7Stbbdev                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
17051c0b2f7Stbbdev                         } else {
17151c0b2f7Stbbdev                             // The last few proxies were empty
17251c0b2f7Stbbdev                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
17351c0b2f7Stbbdev                             ++p->my_ref_count;
17451c0b2f7Stbbdev                             if (prev) {
17551c0b2f7Stbbdev                                 lock.release();
17651c0b2f7Stbbdev                                 remove_ref(prev);
17751c0b2f7Stbbdev                             }
17851c0b2f7Stbbdev                         }
17951c0b2f7Stbbdev                         last = p;
18051c0b2f7Stbbdev                         return;
18151c0b2f7Stbbdev                     }
18251c0b2f7Stbbdev                 } else {
18351c0b2f7Stbbdev                     // Starting pass through the list
184*b15aabb3Stbbdev                     p = my_head.load(std::memory_order_relaxed);
18551c0b2f7Stbbdev                     if (!p) {
18651c0b2f7Stbbdev                         return;
18751c0b2f7Stbbdev                     }
18851c0b2f7Stbbdev                 }
18951c0b2f7Stbbdev                 tso = p->my_observer;
19051c0b2f7Stbbdev             } while (!tso);
19151c0b2f7Stbbdev             ++p->my_ref_count;
19251c0b2f7Stbbdev             ++tso->my_busy_count;
19351c0b2f7Stbbdev         }
19451c0b2f7Stbbdev         __TBB_ASSERT(!prev || p != prev, nullptr);
19551c0b2f7Stbbdev         // Release the proxy pinned before p
19651c0b2f7Stbbdev         if (prev) {
19751c0b2f7Stbbdev             remove_ref(prev);
19851c0b2f7Stbbdev         }
19951c0b2f7Stbbdev         // Do not hold any locks on the list while calling user's code.
20051c0b2f7Stbbdev         // Do not intercept any exceptions that may escape the callback so that
20151c0b2f7Stbbdev         // they are either handled by the TBB scheduler or passed to the debugger.
20251c0b2f7Stbbdev         tso->on_scheduler_entry(worker);
20351c0b2f7Stbbdev         __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
20451c0b2f7Stbbdev         intptr_t bc = --tso->my_busy_count;
20551c0b2f7Stbbdev         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
20651c0b2f7Stbbdev         prev = p;
20751c0b2f7Stbbdev     }
20851c0b2f7Stbbdev }
20951c0b2f7Stbbdev 
21051c0b2f7Stbbdev void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
21151c0b2f7Stbbdev     // Pointer p marches though the list from the beginning to last (inclusively).
21251c0b2f7Stbbdev     observer_proxy* p = nullptr, * prev = nullptr;
21351c0b2f7Stbbdev     for (;;) {
21451c0b2f7Stbbdev         d1::task_scheduler_observer* tso = nullptr;
21551c0b2f7Stbbdev         // Hold lock on list only long enough to advance to the next proxy in the list.
21651c0b2f7Stbbdev         {
21751c0b2f7Stbbdev             scoped_lock lock(mutex(), /*is_writer=*/false);
21851c0b2f7Stbbdev             do {
21951c0b2f7Stbbdev                 if (p) {
22051c0b2f7Stbbdev                     // We were already processing the list.
22151c0b2f7Stbbdev                     if (p != last) {
22251c0b2f7Stbbdev                         __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
22351c0b2f7Stbbdev                         if (p == prev)
22451c0b2f7Stbbdev                             remove_ref_fast(prev); // sets prev to NULL if successful
22551c0b2f7Stbbdev                         p = p->my_next;
22651c0b2f7Stbbdev                     } else {
22751c0b2f7Stbbdev                         // remove the reference from the last item
22851c0b2f7Stbbdev                         remove_ref_fast(p);
22951c0b2f7Stbbdev                         if (p) {
23051c0b2f7Stbbdev                             lock.release();
23151c0b2f7Stbbdev                             if (p != prev && prev) {
23251c0b2f7Stbbdev                                 remove_ref(prev);
23351c0b2f7Stbbdev                             }
23451c0b2f7Stbbdev                             remove_ref(p);
23551c0b2f7Stbbdev                         }
23651c0b2f7Stbbdev                         return;
23751c0b2f7Stbbdev                     }
23851c0b2f7Stbbdev                 } else {
23951c0b2f7Stbbdev                     // Starting pass through the list
240*b15aabb3Stbbdev                     p = my_head.load(std::memory_order_relaxed);
24151c0b2f7Stbbdev                     __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
24251c0b2f7Stbbdev                 }
24351c0b2f7Stbbdev                 tso = p->my_observer;
24451c0b2f7Stbbdev             } while (!tso);
24551c0b2f7Stbbdev             // The item is already refcounted
24651c0b2f7Stbbdev             if (p != last) // the last is already referenced since entry notification
24751c0b2f7Stbbdev                 ++p->my_ref_count;
24851c0b2f7Stbbdev             ++tso->my_busy_count;
24951c0b2f7Stbbdev         }
25051c0b2f7Stbbdev         __TBB_ASSERT(!prev || p != prev, nullptr);
25151c0b2f7Stbbdev         if (prev)
25251c0b2f7Stbbdev             remove_ref(prev);
25351c0b2f7Stbbdev         // Do not hold any locks on the list while calling user's code.
25451c0b2f7Stbbdev         // Do not intercept any exceptions that may escape the callback so that
25551c0b2f7Stbbdev         // they are either handled by the TBB scheduler or passed to the debugger.
25651c0b2f7Stbbdev         tso->on_scheduler_exit(worker);
25751c0b2f7Stbbdev         __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
25851c0b2f7Stbbdev         intptr_t bc = --tso->my_busy_count;
25951c0b2f7Stbbdev         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
26051c0b2f7Stbbdev         prev = p;
26151c0b2f7Stbbdev     }
26251c0b2f7Stbbdev }
26351c0b2f7Stbbdev 
26451c0b2f7Stbbdev void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
26551c0b2f7Stbbdev     if( enable ) {
26651c0b2f7Stbbdev         if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
26751c0b2f7Stbbdev             observer_proxy* p = new observer_proxy(tso);
26851c0b2f7Stbbdev             tso.my_proxy.store(p, std::memory_order_relaxed);
26951c0b2f7Stbbdev             tso.my_busy_count.store(0, std::memory_order_relaxed);
27051c0b2f7Stbbdev 
27151c0b2f7Stbbdev             thread_data* td = governor::get_thread_data_if_initialized();
27251c0b2f7Stbbdev             if (p->my_observer->my_task_arena == nullptr) {
27351c0b2f7Stbbdev                 if (!(td && td->my_arena)) {
27451c0b2f7Stbbdev                     td = governor::get_thread_data();
27551c0b2f7Stbbdev                 }
27651c0b2f7Stbbdev                 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
27751c0b2f7Stbbdev                 __TBB_ASSERT(td && td->my_arena, nullptr);
27851c0b2f7Stbbdev                 p->my_list = &td->my_arena->my_observers;
27951c0b2f7Stbbdev             } else {
28051c0b2f7Stbbdev                 d1::task_arena* ta = p->my_observer->my_task_arena;
281*b15aabb3Stbbdev                 arena* a = ta->my_arena.load(std::memory_order_acquire);
28251c0b2f7Stbbdev                 if (a == nullptr) { // Avoid recursion during arena initialization
28351c0b2f7Stbbdev                     ta->initialize();
284*b15aabb3Stbbdev                     a = ta->my_arena.load(std::memory_order_relaxed);
28551c0b2f7Stbbdev                 }
28651c0b2f7Stbbdev                 __TBB_ASSERT(a != nullptr, nullptr);
287*b15aabb3Stbbdev                 p->my_list = &a->my_observers;
28851c0b2f7Stbbdev             }
28951c0b2f7Stbbdev             p->my_list->insert(p);
29051c0b2f7Stbbdev             // Notify newly activated observer and other pending ones if it belongs to current arena
29151c0b2f7Stbbdev             if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
29251c0b2f7Stbbdev                 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
29351c0b2f7Stbbdev             }
29451c0b2f7Stbbdev         }
29551c0b2f7Stbbdev     } else {
29651c0b2f7Stbbdev         // Make sure that possible concurrent proxy list cleanup does not conflict
29751c0b2f7Stbbdev         // with the observer destruction here.
29851c0b2f7Stbbdev         if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
29951c0b2f7Stbbdev             // List destruction should not touch this proxy after we've won the above interlocked exchange.
30051c0b2f7Stbbdev             __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
30151c0b2f7Stbbdev             __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
30251c0b2f7Stbbdev             __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
30351c0b2f7Stbbdev             observer_list &list = *proxy->my_list;
30451c0b2f7Stbbdev             {
30551c0b2f7Stbbdev                 // Ensure that none of the list walkers relies on observer pointer validity
30651c0b2f7Stbbdev                 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
30751c0b2f7Stbbdev                 proxy->my_observer = nullptr;
30851c0b2f7Stbbdev                 // Proxy may still be held by other threads (to track the last notified observer)
30951c0b2f7Stbbdev                 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
31051c0b2f7Stbbdev                     list.remove(proxy);
31151c0b2f7Stbbdev                     __TBB_ASSERT( !proxy->my_ref_count, NULL );
31251c0b2f7Stbbdev                     delete proxy;
31351c0b2f7Stbbdev                 }
31451c0b2f7Stbbdev             }
31551c0b2f7Stbbdev             spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
31651c0b2f7Stbbdev         }
31751c0b2f7Stbbdev     }
31851c0b2f7Stbbdev }
31951c0b2f7Stbbdev 
32051c0b2f7Stbbdev } // namespace r1
32151c0b2f7Stbbdev } // namespace detail
32251c0b2f7Stbbdev } // namespace tbb
323