151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1749e08aacStbbdev #include "oneapi/tbb/detail/_config.h"
1849e08aacStbbdev #include "oneapi/tbb/detail/_utils.h"
1951c0b2f7Stbbdev
2051c0b2f7Stbbdev #include "observer_proxy.h"
2151c0b2f7Stbbdev #include "arena.h"
2251c0b2f7Stbbdev #include "main.h"
2351c0b2f7Stbbdev #include "thread_data.h"
2451c0b2f7Stbbdev
2551c0b2f7Stbbdev #include <atomic>
2651c0b2f7Stbbdev
2751c0b2f7Stbbdev namespace tbb {
2851c0b2f7Stbbdev namespace detail {
2951c0b2f7Stbbdev namespace r1 {
3051c0b2f7Stbbdev
3151c0b2f7Stbbdev #if TBB_USE_ASSERT
3251c0b2f7Stbbdev extern std::atomic<int> the_observer_proxy_count;
3351c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
3451c0b2f7Stbbdev
observer_proxy(d1::task_scheduler_observer & tso)3551c0b2f7Stbbdev observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
3657f524caSIlya Isaev : my_ref_count(1), my_list(nullptr), my_next(nullptr), my_prev(nullptr), my_observer(&tso)
3751c0b2f7Stbbdev {
3851c0b2f7Stbbdev #if TBB_USE_ASSERT
3951c0b2f7Stbbdev ++the_observer_proxy_count;
4051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
4151c0b2f7Stbbdev }
4251c0b2f7Stbbdev
~observer_proxy()4351c0b2f7Stbbdev observer_proxy::~observer_proxy() {
4451c0b2f7Stbbdev __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
4551c0b2f7Stbbdev poison_value(my_ref_count);
4651c0b2f7Stbbdev poison_pointer(my_prev);
4751c0b2f7Stbbdev poison_pointer(my_next);
4851c0b2f7Stbbdev #if TBB_USE_ASSERT
4951c0b2f7Stbbdev --the_observer_proxy_count;
5051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
5151c0b2f7Stbbdev }
5251c0b2f7Stbbdev
clear()5351c0b2f7Stbbdev void observer_list::clear() {
5451c0b2f7Stbbdev {
5551c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/true);
56b15aabb3Stbbdev observer_proxy *next = my_head.load(std::memory_order_relaxed);
5751c0b2f7Stbbdev while ( observer_proxy *p = next ) {
5851c0b2f7Stbbdev next = p->my_next;
5951c0b2f7Stbbdev // Both proxy p and observer p->my_observer (if non-null) are guaranteed
6051c0b2f7Stbbdev // to be alive while the list is locked.
6151c0b2f7Stbbdev d1::task_scheduler_observer *obs = p->my_observer;
6251c0b2f7Stbbdev // Make sure that possible concurrent observer destruction does not
6351c0b2f7Stbbdev // conflict with the proxy list cleanup.
6451c0b2f7Stbbdev if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
6551c0b2f7Stbbdev continue;
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
6851c0b2f7Stbbdev __TBB_ASSERT(!next || p == next->my_prev, nullptr);
6951c0b2f7Stbbdev __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
7051c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
7151c0b2f7Stbbdev poison_pointer(p->my_observer);
7251c0b2f7Stbbdev remove(p);
7351c0b2f7Stbbdev --p->my_ref_count;
7451c0b2f7Stbbdev delete p;
7551c0b2f7Stbbdev }
7651c0b2f7Stbbdev }
7751c0b2f7Stbbdev
7851c0b2f7Stbbdev // If observe(false) is called concurrently with the destruction of the arena,
7951c0b2f7Stbbdev // need to wait until all proxies are removed.
8051c0b2f7Stbbdev for (atomic_backoff backoff; ; backoff.pause()) {
8151c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false);
82b15aabb3Stbbdev if (my_head.load(std::memory_order_relaxed) == nullptr) {
8351c0b2f7Stbbdev break;
8451c0b2f7Stbbdev }
8551c0b2f7Stbbdev }
8651c0b2f7Stbbdev
87b15aabb3Stbbdev __TBB_ASSERT(my_head.load(std::memory_order_relaxed) == nullptr && my_tail.load(std::memory_order_relaxed) == nullptr, nullptr);
8851c0b2f7Stbbdev }
8951c0b2f7Stbbdev
insert(observer_proxy * p)9051c0b2f7Stbbdev void observer_list::insert( observer_proxy* p ) {
9151c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/true);
92b15aabb3Stbbdev if (my_head.load(std::memory_order_relaxed)) {
93b15aabb3Stbbdev p->my_prev = my_tail.load(std::memory_order_relaxed);
94b15aabb3Stbbdev my_tail.load(std::memory_order_relaxed)->my_next = p;
9551c0b2f7Stbbdev } else {
96b15aabb3Stbbdev my_head.store(p, std::memory_order_relaxed);
9751c0b2f7Stbbdev }
98b15aabb3Stbbdev my_tail.store(p, std::memory_order_relaxed);
9951c0b2f7Stbbdev }
10051c0b2f7Stbbdev
remove(observer_proxy * p)10151c0b2f7Stbbdev void observer_list::remove(observer_proxy* p) {
102b15aabb3Stbbdev __TBB_ASSERT(my_head.load(std::memory_order_relaxed), "Attempt to remove an item from an empty list");
10357f524caSIlya Isaev __TBB_ASSERT(!my_tail.load(std::memory_order_relaxed)->my_next, "Last item's my_next must be nullptr");
104b15aabb3Stbbdev if (p == my_tail.load(std::memory_order_relaxed)) {
10551c0b2f7Stbbdev __TBB_ASSERT(!p->my_next, nullptr);
106b15aabb3Stbbdev my_tail.store(p->my_prev, std::memory_order_relaxed);
10751c0b2f7Stbbdev } else {
10851c0b2f7Stbbdev __TBB_ASSERT(p->my_next, nullptr);
10951c0b2f7Stbbdev p->my_next->my_prev = p->my_prev;
11051c0b2f7Stbbdev }
111b15aabb3Stbbdev if (p == my_head.load(std::memory_order_relaxed)) {
11251c0b2f7Stbbdev __TBB_ASSERT(!p->my_prev, nullptr);
113b15aabb3Stbbdev my_head.store(p->my_next, std::memory_order_relaxed);
11451c0b2f7Stbbdev } else {
11551c0b2f7Stbbdev __TBB_ASSERT(p->my_prev, nullptr);
11651c0b2f7Stbbdev p->my_prev->my_next = p->my_next;
11751c0b2f7Stbbdev }
118b15aabb3Stbbdev __TBB_ASSERT((my_head.load(std::memory_order_relaxed) && my_tail.load(std::memory_order_relaxed)) ||
119b15aabb3Stbbdev (!my_head.load(std::memory_order_relaxed) && !my_tail.load(std::memory_order_relaxed)), nullptr);
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev
remove_ref(observer_proxy * p)12251c0b2f7Stbbdev void observer_list::remove_ref(observer_proxy* p) {
12351c0b2f7Stbbdev std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
12451c0b2f7Stbbdev __TBB_ASSERT(is_alive(r), nullptr);
12551c0b2f7Stbbdev while (r > 1) {
12651c0b2f7Stbbdev if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
12751c0b2f7Stbbdev return;
12851c0b2f7Stbbdev }
12951c0b2f7Stbbdev }
13051c0b2f7Stbbdev __TBB_ASSERT(r == 1, nullptr);
13151c0b2f7Stbbdev // Reference count might go to zero
13251c0b2f7Stbbdev {
13351c0b2f7Stbbdev // Use lock to avoid resurrection by a thread concurrently walking the list
13451c0b2f7Stbbdev observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
13551c0b2f7Stbbdev r = --p->my_ref_count;
13651c0b2f7Stbbdev if (!r) {
13751c0b2f7Stbbdev remove(p);
13851c0b2f7Stbbdev }
13951c0b2f7Stbbdev }
14051c0b2f7Stbbdev __TBB_ASSERT(r || !p->my_ref_count, nullptr);
14151c0b2f7Stbbdev if (!r) {
14251c0b2f7Stbbdev delete p;
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev }
14551c0b2f7Stbbdev
do_notify_entry_observers(observer_proxy * & last,bool worker)14651c0b2f7Stbbdev void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
14751c0b2f7Stbbdev // Pointer p marches though the list from last (exclusively) to the end.
14851c0b2f7Stbbdev observer_proxy* p = last, * prev = p;
14951c0b2f7Stbbdev for (;;) {
15051c0b2f7Stbbdev d1::task_scheduler_observer* tso = nullptr;
15151c0b2f7Stbbdev // Hold lock on list only long enough to advance to the next proxy in the list.
15251c0b2f7Stbbdev {
15351c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false);
15451c0b2f7Stbbdev do {
15551c0b2f7Stbbdev if (p) {
15651c0b2f7Stbbdev // We were already processing the list.
15751c0b2f7Stbbdev if (observer_proxy* q = p->my_next) {
15851c0b2f7Stbbdev if (p == prev) {
15957f524caSIlya Isaev remove_ref_fast(prev); // sets prev to nullptr if successful
16051c0b2f7Stbbdev }
16151c0b2f7Stbbdev p = q;
16251c0b2f7Stbbdev } else {
16351c0b2f7Stbbdev // Reached the end of the list.
16451c0b2f7Stbbdev if (p == prev) {
16551c0b2f7Stbbdev // Keep the reference as we store the 'last' pointer in scheduler
16651c0b2f7Stbbdev __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
16751c0b2f7Stbbdev } else {
16851c0b2f7Stbbdev // The last few proxies were empty
16951c0b2f7Stbbdev __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
17051c0b2f7Stbbdev ++p->my_ref_count;
17151c0b2f7Stbbdev if (prev) {
17251c0b2f7Stbbdev lock.release();
17351c0b2f7Stbbdev remove_ref(prev);
17451c0b2f7Stbbdev }
17551c0b2f7Stbbdev }
17651c0b2f7Stbbdev last = p;
17751c0b2f7Stbbdev return;
17851c0b2f7Stbbdev }
17951c0b2f7Stbbdev } else {
18051c0b2f7Stbbdev // Starting pass through the list
181b15aabb3Stbbdev p = my_head.load(std::memory_order_relaxed);
18251c0b2f7Stbbdev if (!p) {
18351c0b2f7Stbbdev return;
18451c0b2f7Stbbdev }
18551c0b2f7Stbbdev }
18651c0b2f7Stbbdev tso = p->my_observer;
18751c0b2f7Stbbdev } while (!tso);
18851c0b2f7Stbbdev ++p->my_ref_count;
18951c0b2f7Stbbdev ++tso->my_busy_count;
19051c0b2f7Stbbdev }
19151c0b2f7Stbbdev __TBB_ASSERT(!prev || p != prev, nullptr);
19251c0b2f7Stbbdev // Release the proxy pinned before p
19351c0b2f7Stbbdev if (prev) {
19451c0b2f7Stbbdev remove_ref(prev);
19551c0b2f7Stbbdev }
19651c0b2f7Stbbdev // Do not hold any locks on the list while calling user's code.
19751c0b2f7Stbbdev // Do not intercept any exceptions that may escape the callback so that
19851c0b2f7Stbbdev // they are either handled by the TBB scheduler or passed to the debugger.
19951c0b2f7Stbbdev tso->on_scheduler_entry(worker);
20051c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
20151c0b2f7Stbbdev intptr_t bc = --tso->my_busy_count;
20251c0b2f7Stbbdev __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
20351c0b2f7Stbbdev prev = p;
20451c0b2f7Stbbdev }
20551c0b2f7Stbbdev }
20651c0b2f7Stbbdev
do_notify_exit_observers(observer_proxy * last,bool worker)20751c0b2f7Stbbdev void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
20851c0b2f7Stbbdev // Pointer p marches though the list from the beginning to last (inclusively).
20951c0b2f7Stbbdev observer_proxy* p = nullptr, * prev = nullptr;
21051c0b2f7Stbbdev for (;;) {
21151c0b2f7Stbbdev d1::task_scheduler_observer* tso = nullptr;
21251c0b2f7Stbbdev // Hold lock on list only long enough to advance to the next proxy in the list.
21351c0b2f7Stbbdev {
21451c0b2f7Stbbdev scoped_lock lock(mutex(), /*is_writer=*/false);
21551c0b2f7Stbbdev do {
21651c0b2f7Stbbdev if (p) {
21751c0b2f7Stbbdev // We were already processing the list.
21851c0b2f7Stbbdev if (p != last) {
21951c0b2f7Stbbdev __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
22051c0b2f7Stbbdev if (p == prev)
22157f524caSIlya Isaev remove_ref_fast(prev); // sets prev to nullptr if successful
22251c0b2f7Stbbdev p = p->my_next;
22351c0b2f7Stbbdev } else {
22451c0b2f7Stbbdev // remove the reference from the last item
22551c0b2f7Stbbdev remove_ref_fast(p);
22651c0b2f7Stbbdev if (p) {
22751c0b2f7Stbbdev lock.release();
22851c0b2f7Stbbdev if (p != prev && prev) {
22951c0b2f7Stbbdev remove_ref(prev);
23051c0b2f7Stbbdev }
23151c0b2f7Stbbdev remove_ref(p);
23251c0b2f7Stbbdev }
23351c0b2f7Stbbdev return;
23451c0b2f7Stbbdev }
23551c0b2f7Stbbdev } else {
23651c0b2f7Stbbdev // Starting pass through the list
237b15aabb3Stbbdev p = my_head.load(std::memory_order_relaxed);
23851c0b2f7Stbbdev __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
23951c0b2f7Stbbdev }
24051c0b2f7Stbbdev tso = p->my_observer;
24151c0b2f7Stbbdev } while (!tso);
24251c0b2f7Stbbdev // The item is already refcounted
24351c0b2f7Stbbdev if (p != last) // the last is already referenced since entry notification
24451c0b2f7Stbbdev ++p->my_ref_count;
24551c0b2f7Stbbdev ++tso->my_busy_count;
24651c0b2f7Stbbdev }
24751c0b2f7Stbbdev __TBB_ASSERT(!prev || p != prev, nullptr);
24851c0b2f7Stbbdev if (prev)
24951c0b2f7Stbbdev remove_ref(prev);
25051c0b2f7Stbbdev // Do not hold any locks on the list while calling user's code.
25151c0b2f7Stbbdev // Do not intercept any exceptions that may escape the callback so that
25251c0b2f7Stbbdev // they are either handled by the TBB scheduler or passed to the debugger.
25351c0b2f7Stbbdev tso->on_scheduler_exit(worker);
25451c0b2f7Stbbdev __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
25551c0b2f7Stbbdev intptr_t bc = --tso->my_busy_count;
25651c0b2f7Stbbdev __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
25751c0b2f7Stbbdev prev = p;
25851c0b2f7Stbbdev }
25951c0b2f7Stbbdev }
26051c0b2f7Stbbdev
observe(d1::task_scheduler_observer & tso,bool enable)26151c0b2f7Stbbdev void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
26251c0b2f7Stbbdev if( enable ) {
26351c0b2f7Stbbdev if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
26451c0b2f7Stbbdev observer_proxy* p = new observer_proxy(tso);
26551c0b2f7Stbbdev tso.my_proxy.store(p, std::memory_order_relaxed);
26651c0b2f7Stbbdev tso.my_busy_count.store(0, std::memory_order_relaxed);
26751c0b2f7Stbbdev
26851c0b2f7Stbbdev thread_data* td = governor::get_thread_data_if_initialized();
26951c0b2f7Stbbdev if (p->my_observer->my_task_arena == nullptr) {
27051c0b2f7Stbbdev if (!(td && td->my_arena)) {
27151c0b2f7Stbbdev td = governor::get_thread_data();
27251c0b2f7Stbbdev }
27351c0b2f7Stbbdev __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
27451c0b2f7Stbbdev __TBB_ASSERT(td && td->my_arena, nullptr);
27551c0b2f7Stbbdev p->my_list = &td->my_arena->my_observers;
27651c0b2f7Stbbdev } else {
27751c0b2f7Stbbdev d1::task_arena* ta = p->my_observer->my_task_arena;
278b15aabb3Stbbdev arena* a = ta->my_arena.load(std::memory_order_acquire);
27951c0b2f7Stbbdev if (a == nullptr) { // Avoid recursion during arena initialization
28051c0b2f7Stbbdev ta->initialize();
281b15aabb3Stbbdev a = ta->my_arena.load(std::memory_order_relaxed);
28251c0b2f7Stbbdev }
28351c0b2f7Stbbdev __TBB_ASSERT(a != nullptr, nullptr);
284b15aabb3Stbbdev p->my_list = &a->my_observers;
28551c0b2f7Stbbdev }
28651c0b2f7Stbbdev p->my_list->insert(p);
28751c0b2f7Stbbdev // Notify newly activated observer and other pending ones if it belongs to current arena
28851c0b2f7Stbbdev if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
28951c0b2f7Stbbdev p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
29051c0b2f7Stbbdev }
29151c0b2f7Stbbdev }
29251c0b2f7Stbbdev } else {
29351c0b2f7Stbbdev // Make sure that possible concurrent proxy list cleanup does not conflict
29451c0b2f7Stbbdev // with the observer destruction here.
29551c0b2f7Stbbdev if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
29651c0b2f7Stbbdev // List destruction should not touch this proxy after we've won the above interlocked exchange.
29751c0b2f7Stbbdev __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
29851c0b2f7Stbbdev __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
29951c0b2f7Stbbdev __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
30051c0b2f7Stbbdev observer_list &list = *proxy->my_list;
30151c0b2f7Stbbdev {
30251c0b2f7Stbbdev // Ensure that none of the list walkers relies on observer pointer validity
30351c0b2f7Stbbdev observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
30451c0b2f7Stbbdev proxy->my_observer = nullptr;
30551c0b2f7Stbbdev // Proxy may still be held by other threads (to track the last notified observer)
30651c0b2f7Stbbdev if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
30751c0b2f7Stbbdev list.remove(proxy);
30857f524caSIlya Isaev __TBB_ASSERT( !proxy->my_ref_count, nullptr);
30951c0b2f7Stbbdev delete proxy;
31051c0b2f7Stbbdev }
31151c0b2f7Stbbdev }
31251c0b2f7Stbbdev spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
31351c0b2f7Stbbdev }
31451c0b2f7Stbbdev }
31551c0b2f7Stbbdev }
31651c0b2f7Stbbdev
31751c0b2f7Stbbdev } // namespace r1
31851c0b2f7Stbbdev } // namespace detail
31951c0b2f7Stbbdev } // namespace tbb
320