xref: /oneTBB/src/tbb/observer_proxy.cpp (revision 51c0b2f7)
1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2005-2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev #include "tbb/detail/_config.h"
18*51c0b2f7Stbbdev #include "tbb/detail/_utils.h"
19*51c0b2f7Stbbdev 
20*51c0b2f7Stbbdev #include "observer_proxy.h"
21*51c0b2f7Stbbdev #include "arena.h"
22*51c0b2f7Stbbdev #include "main.h"
23*51c0b2f7Stbbdev #include "thread_data.h"
24*51c0b2f7Stbbdev 
25*51c0b2f7Stbbdev #include <atomic>
26*51c0b2f7Stbbdev 
27*51c0b2f7Stbbdev namespace tbb {
28*51c0b2f7Stbbdev namespace detail {
29*51c0b2f7Stbbdev namespace r1 {
30*51c0b2f7Stbbdev 
31*51c0b2f7Stbbdev #if TBB_USE_ASSERT
32*51c0b2f7Stbbdev extern std::atomic<int> the_observer_proxy_count;
33*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
34*51c0b2f7Stbbdev 
35*51c0b2f7Stbbdev observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
36*51c0b2f7Stbbdev     : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
37*51c0b2f7Stbbdev {
38*51c0b2f7Stbbdev #if TBB_USE_ASSERT
39*51c0b2f7Stbbdev     ++the_observer_proxy_count;
40*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
41*51c0b2f7Stbbdev }
42*51c0b2f7Stbbdev 
43*51c0b2f7Stbbdev observer_proxy::~observer_proxy() {
44*51c0b2f7Stbbdev     __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
45*51c0b2f7Stbbdev     poison_value(my_ref_count);
46*51c0b2f7Stbbdev     poison_pointer(my_prev);
47*51c0b2f7Stbbdev     poison_pointer(my_next);
48*51c0b2f7Stbbdev #if TBB_USE_ASSERT
49*51c0b2f7Stbbdev     --the_observer_proxy_count;
50*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
51*51c0b2f7Stbbdev }
52*51c0b2f7Stbbdev 
53*51c0b2f7Stbbdev void observer_list::clear() {
54*51c0b2f7Stbbdev     // Though the method will work fine for the empty list, we require the caller
55*51c0b2f7Stbbdev     // to check for the list emptiness before invoking it to avoid extra overhead.
56*51c0b2f7Stbbdev     __TBB_ASSERT( !empty(), NULL );
57*51c0b2f7Stbbdev     {
58*51c0b2f7Stbbdev         scoped_lock lock(mutex(), /*is_writer=*/true);
59*51c0b2f7Stbbdev         observer_proxy *next = my_head;
60*51c0b2f7Stbbdev         while ( observer_proxy *p = next ) {
61*51c0b2f7Stbbdev             next = p->my_next;
62*51c0b2f7Stbbdev             // Both proxy p and observer p->my_observer (if non-null) are guaranteed
63*51c0b2f7Stbbdev             // to be alive while the list is locked.
64*51c0b2f7Stbbdev             d1::task_scheduler_observer *obs = p->my_observer;
65*51c0b2f7Stbbdev             // Make sure that possible concurrent observer destruction does not
66*51c0b2f7Stbbdev             // conflict with the proxy list cleanup.
67*51c0b2f7Stbbdev             if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
68*51c0b2f7Stbbdev                 continue;
69*51c0b2f7Stbbdev             }
70*51c0b2f7Stbbdev             // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
71*51c0b2f7Stbbdev             __TBB_ASSERT(!next || p == next->my_prev, nullptr);
72*51c0b2f7Stbbdev             __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
73*51c0b2f7Stbbdev             __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
74*51c0b2f7Stbbdev             poison_pointer(p->my_observer);
75*51c0b2f7Stbbdev             remove(p);
76*51c0b2f7Stbbdev             --p->my_ref_count;
77*51c0b2f7Stbbdev             delete p;
78*51c0b2f7Stbbdev         }
79*51c0b2f7Stbbdev     }
80*51c0b2f7Stbbdev 
81*51c0b2f7Stbbdev     // If observe(false) is called concurrently with the destruction of the arena,
82*51c0b2f7Stbbdev     // need to wait until all proxies are removed.
83*51c0b2f7Stbbdev     for (atomic_backoff backoff; ; backoff.pause()) {
84*51c0b2f7Stbbdev         scoped_lock lock(mutex(), /*is_writer=*/false);
85*51c0b2f7Stbbdev         if (my_head == nullptr) {
86*51c0b2f7Stbbdev             break;
87*51c0b2f7Stbbdev         }
88*51c0b2f7Stbbdev     }
89*51c0b2f7Stbbdev 
90*51c0b2f7Stbbdev     __TBB_ASSERT(my_head == nullptr && my_tail == nullptr, nullptr);
91*51c0b2f7Stbbdev }
92*51c0b2f7Stbbdev 
93*51c0b2f7Stbbdev void observer_list::insert( observer_proxy* p ) {
94*51c0b2f7Stbbdev     scoped_lock lock(mutex(), /*is_writer=*/true);
95*51c0b2f7Stbbdev     if (my_head) {
96*51c0b2f7Stbbdev         p->my_prev = my_tail;
97*51c0b2f7Stbbdev         my_tail->my_next = p;
98*51c0b2f7Stbbdev     } else {
99*51c0b2f7Stbbdev         my_head = p;
100*51c0b2f7Stbbdev     }
101*51c0b2f7Stbbdev     my_tail = p;
102*51c0b2f7Stbbdev }
103*51c0b2f7Stbbdev 
104*51c0b2f7Stbbdev void observer_list::remove(observer_proxy* p) {
105*51c0b2f7Stbbdev     __TBB_ASSERT(my_head, "Attempt to remove an item from an empty list");
106*51c0b2f7Stbbdev     __TBB_ASSERT(!my_tail->my_next, "Last item's my_next must be NULL");
107*51c0b2f7Stbbdev     if (p == my_tail) {
108*51c0b2f7Stbbdev         __TBB_ASSERT(!p->my_next, nullptr);
109*51c0b2f7Stbbdev         my_tail = p->my_prev;
110*51c0b2f7Stbbdev     } else {
111*51c0b2f7Stbbdev         __TBB_ASSERT(p->my_next, nullptr);
112*51c0b2f7Stbbdev         p->my_next->my_prev = p->my_prev;
113*51c0b2f7Stbbdev     }
114*51c0b2f7Stbbdev     if (p == my_head) {
115*51c0b2f7Stbbdev         __TBB_ASSERT(!p->my_prev, nullptr);
116*51c0b2f7Stbbdev         my_head = p->my_next;
117*51c0b2f7Stbbdev     } else {
118*51c0b2f7Stbbdev         __TBB_ASSERT(p->my_prev, nullptr);
119*51c0b2f7Stbbdev         p->my_prev->my_next = p->my_next;
120*51c0b2f7Stbbdev     }
121*51c0b2f7Stbbdev     __TBB_ASSERT((my_head && my_tail) || (!my_head && !my_tail), nullptr);
122*51c0b2f7Stbbdev }
123*51c0b2f7Stbbdev 
124*51c0b2f7Stbbdev void observer_list::remove_ref(observer_proxy* p) {
125*51c0b2f7Stbbdev     std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
126*51c0b2f7Stbbdev     __TBB_ASSERT(is_alive(r), nullptr);
127*51c0b2f7Stbbdev     while (r > 1) {
128*51c0b2f7Stbbdev         if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
129*51c0b2f7Stbbdev             return;
130*51c0b2f7Stbbdev         }
131*51c0b2f7Stbbdev     }
132*51c0b2f7Stbbdev     __TBB_ASSERT(r == 1, nullptr);
133*51c0b2f7Stbbdev     // Reference count might go to zero
134*51c0b2f7Stbbdev     {
135*51c0b2f7Stbbdev         // Use lock to avoid resurrection by a thread concurrently walking the list
136*51c0b2f7Stbbdev         observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
137*51c0b2f7Stbbdev         r = --p->my_ref_count;
138*51c0b2f7Stbbdev         if (!r) {
139*51c0b2f7Stbbdev             remove(p);
140*51c0b2f7Stbbdev         }
141*51c0b2f7Stbbdev     }
142*51c0b2f7Stbbdev     __TBB_ASSERT(r || !p->my_ref_count, nullptr);
143*51c0b2f7Stbbdev     if (!r) {
144*51c0b2f7Stbbdev         delete p;
145*51c0b2f7Stbbdev     }
146*51c0b2f7Stbbdev }
147*51c0b2f7Stbbdev 
148*51c0b2f7Stbbdev void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
149*51c0b2f7Stbbdev     // Pointer p marches though the list from last (exclusively) to the end.
150*51c0b2f7Stbbdev     observer_proxy* p = last, * prev = p;
151*51c0b2f7Stbbdev     for (;;) {
152*51c0b2f7Stbbdev         d1::task_scheduler_observer* tso = nullptr;
153*51c0b2f7Stbbdev         // Hold lock on list only long enough to advance to the next proxy in the list.
154*51c0b2f7Stbbdev         {
155*51c0b2f7Stbbdev             scoped_lock lock(mutex(), /*is_writer=*/false);
156*51c0b2f7Stbbdev             do {
157*51c0b2f7Stbbdev                 if (p) {
158*51c0b2f7Stbbdev                     // We were already processing the list.
159*51c0b2f7Stbbdev                     if (observer_proxy* q = p->my_next) {
160*51c0b2f7Stbbdev                         if (p == prev) {
161*51c0b2f7Stbbdev                             remove_ref_fast(prev); // sets prev to NULL if successful
162*51c0b2f7Stbbdev                         }
163*51c0b2f7Stbbdev                         p = q;
164*51c0b2f7Stbbdev                     } else {
165*51c0b2f7Stbbdev                         // Reached the end of the list.
166*51c0b2f7Stbbdev                         if (p == prev) {
167*51c0b2f7Stbbdev                             // Keep the reference as we store the 'last' pointer in scheduler
168*51c0b2f7Stbbdev                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
169*51c0b2f7Stbbdev                         } else {
170*51c0b2f7Stbbdev                             // The last few proxies were empty
171*51c0b2f7Stbbdev                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
172*51c0b2f7Stbbdev                             ++p->my_ref_count;
173*51c0b2f7Stbbdev                             if (prev) {
174*51c0b2f7Stbbdev                                 lock.release();
175*51c0b2f7Stbbdev                                 remove_ref(prev);
176*51c0b2f7Stbbdev                             }
177*51c0b2f7Stbbdev                         }
178*51c0b2f7Stbbdev                         last = p;
179*51c0b2f7Stbbdev                         return;
180*51c0b2f7Stbbdev                     }
181*51c0b2f7Stbbdev                 } else {
182*51c0b2f7Stbbdev                     // Starting pass through the list
183*51c0b2f7Stbbdev                     p = my_head;
184*51c0b2f7Stbbdev                     if (!p) {
185*51c0b2f7Stbbdev                         return;
186*51c0b2f7Stbbdev                     }
187*51c0b2f7Stbbdev                 }
188*51c0b2f7Stbbdev                 tso = p->my_observer;
189*51c0b2f7Stbbdev             } while (!tso);
190*51c0b2f7Stbbdev             ++p->my_ref_count;
191*51c0b2f7Stbbdev             ++tso->my_busy_count;
192*51c0b2f7Stbbdev         }
193*51c0b2f7Stbbdev         __TBB_ASSERT(!prev || p != prev, nullptr);
194*51c0b2f7Stbbdev         // Release the proxy pinned before p
195*51c0b2f7Stbbdev         if (prev) {
196*51c0b2f7Stbbdev             remove_ref(prev);
197*51c0b2f7Stbbdev         }
198*51c0b2f7Stbbdev         // Do not hold any locks on the list while calling user's code.
199*51c0b2f7Stbbdev         // Do not intercept any exceptions that may escape the callback so that
200*51c0b2f7Stbbdev         // they are either handled by the TBB scheduler or passed to the debugger.
201*51c0b2f7Stbbdev         tso->on_scheduler_entry(worker);
202*51c0b2f7Stbbdev         __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
203*51c0b2f7Stbbdev         intptr_t bc = --tso->my_busy_count;
204*51c0b2f7Stbbdev         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
205*51c0b2f7Stbbdev         prev = p;
206*51c0b2f7Stbbdev     }
207*51c0b2f7Stbbdev }
208*51c0b2f7Stbbdev 
209*51c0b2f7Stbbdev void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
210*51c0b2f7Stbbdev     // Pointer p marches though the list from the beginning to last (inclusively).
211*51c0b2f7Stbbdev     observer_proxy* p = nullptr, * prev = nullptr;
212*51c0b2f7Stbbdev     for (;;) {
213*51c0b2f7Stbbdev         d1::task_scheduler_observer* tso = nullptr;
214*51c0b2f7Stbbdev         // Hold lock on list only long enough to advance to the next proxy in the list.
215*51c0b2f7Stbbdev         {
216*51c0b2f7Stbbdev             scoped_lock lock(mutex(), /*is_writer=*/false);
217*51c0b2f7Stbbdev             do {
218*51c0b2f7Stbbdev                 if (p) {
219*51c0b2f7Stbbdev                     // We were already processing the list.
220*51c0b2f7Stbbdev                     if (p != last) {
221*51c0b2f7Stbbdev                         __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
222*51c0b2f7Stbbdev                         if (p == prev)
223*51c0b2f7Stbbdev                             remove_ref_fast(prev); // sets prev to NULL if successful
224*51c0b2f7Stbbdev                         p = p->my_next;
225*51c0b2f7Stbbdev                     } else {
226*51c0b2f7Stbbdev                         // remove the reference from the last item
227*51c0b2f7Stbbdev                         remove_ref_fast(p);
228*51c0b2f7Stbbdev                         if (p) {
229*51c0b2f7Stbbdev                             lock.release();
230*51c0b2f7Stbbdev                             if (p != prev && prev) {
231*51c0b2f7Stbbdev                                 remove_ref(prev);
232*51c0b2f7Stbbdev                             }
233*51c0b2f7Stbbdev                             remove_ref(p);
234*51c0b2f7Stbbdev                         }
235*51c0b2f7Stbbdev                         return;
236*51c0b2f7Stbbdev                     }
237*51c0b2f7Stbbdev                 } else {
238*51c0b2f7Stbbdev                     // Starting pass through the list
239*51c0b2f7Stbbdev                     p = my_head;
240*51c0b2f7Stbbdev                     __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
241*51c0b2f7Stbbdev                 }
242*51c0b2f7Stbbdev                 tso = p->my_observer;
243*51c0b2f7Stbbdev             } while (!tso);
244*51c0b2f7Stbbdev             // The item is already refcounted
245*51c0b2f7Stbbdev             if (p != last) // the last is already referenced since entry notification
246*51c0b2f7Stbbdev                 ++p->my_ref_count;
247*51c0b2f7Stbbdev             ++tso->my_busy_count;
248*51c0b2f7Stbbdev         }
249*51c0b2f7Stbbdev         __TBB_ASSERT(!prev || p != prev, nullptr);
250*51c0b2f7Stbbdev         if (prev)
251*51c0b2f7Stbbdev             remove_ref(prev);
252*51c0b2f7Stbbdev         // Do not hold any locks on the list while calling user's code.
253*51c0b2f7Stbbdev         // Do not intercept any exceptions that may escape the callback so that
254*51c0b2f7Stbbdev         // they are either handled by the TBB scheduler or passed to the debugger.
255*51c0b2f7Stbbdev         tso->on_scheduler_exit(worker);
256*51c0b2f7Stbbdev         __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
257*51c0b2f7Stbbdev         intptr_t bc = --tso->my_busy_count;
258*51c0b2f7Stbbdev         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
259*51c0b2f7Stbbdev         prev = p;
260*51c0b2f7Stbbdev     }
261*51c0b2f7Stbbdev }
262*51c0b2f7Stbbdev 
263*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
264*51c0b2f7Stbbdev     if( enable ) {
265*51c0b2f7Stbbdev         if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
266*51c0b2f7Stbbdev             observer_proxy* p = new observer_proxy(tso);
267*51c0b2f7Stbbdev             tso.my_proxy.store(p, std::memory_order_relaxed);
268*51c0b2f7Stbbdev             tso.my_busy_count.store(0, std::memory_order_relaxed);
269*51c0b2f7Stbbdev 
270*51c0b2f7Stbbdev             thread_data* td = governor::get_thread_data_if_initialized();
271*51c0b2f7Stbbdev             if (p->my_observer->my_task_arena == nullptr) {
272*51c0b2f7Stbbdev                 if (!(td && td->my_arena)) {
273*51c0b2f7Stbbdev                     td = governor::get_thread_data();
274*51c0b2f7Stbbdev                 }
275*51c0b2f7Stbbdev                 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
276*51c0b2f7Stbbdev                 __TBB_ASSERT(td && td->my_arena, nullptr);
277*51c0b2f7Stbbdev                 p->my_list = &td->my_arena->my_observers;
278*51c0b2f7Stbbdev             } else {
279*51c0b2f7Stbbdev                 d1::task_arena* ta = p->my_observer->my_task_arena;
280*51c0b2f7Stbbdev                 arena* a = ta->my_arena;
281*51c0b2f7Stbbdev                 if (a == nullptr) { // Avoid recursion during arena initialization
282*51c0b2f7Stbbdev                     ta->initialize();
283*51c0b2f7Stbbdev                     a = ta->my_arena;
284*51c0b2f7Stbbdev                 }
285*51c0b2f7Stbbdev                 __TBB_ASSERT(a != nullptr, nullptr);
286*51c0b2f7Stbbdev                 p->my_list = &ta->my_arena->my_observers;
287*51c0b2f7Stbbdev             }
288*51c0b2f7Stbbdev             p->my_list->insert(p);
289*51c0b2f7Stbbdev             // Notify newly activated observer and other pending ones if it belongs to current arena
290*51c0b2f7Stbbdev             if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
291*51c0b2f7Stbbdev                 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
292*51c0b2f7Stbbdev             }
293*51c0b2f7Stbbdev         }
294*51c0b2f7Stbbdev     } else {
295*51c0b2f7Stbbdev         // Make sure that possible concurrent proxy list cleanup does not conflict
296*51c0b2f7Stbbdev         // with the observer destruction here.
297*51c0b2f7Stbbdev         if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
298*51c0b2f7Stbbdev             // List destruction should not touch this proxy after we've won the above interlocked exchange.
299*51c0b2f7Stbbdev             __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
300*51c0b2f7Stbbdev             __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
301*51c0b2f7Stbbdev             __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
302*51c0b2f7Stbbdev             observer_list &list = *proxy->my_list;
303*51c0b2f7Stbbdev             {
304*51c0b2f7Stbbdev                 // Ensure that none of the list walkers relies on observer pointer validity
305*51c0b2f7Stbbdev                 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
306*51c0b2f7Stbbdev                 proxy->my_observer = nullptr;
307*51c0b2f7Stbbdev                 // Proxy may still be held by other threads (to track the last notified observer)
308*51c0b2f7Stbbdev                 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
309*51c0b2f7Stbbdev                     list.remove(proxy);
310*51c0b2f7Stbbdev                     __TBB_ASSERT( !proxy->my_ref_count, NULL );
311*51c0b2f7Stbbdev                     delete proxy;
312*51c0b2f7Stbbdev                 }
313*51c0b2f7Stbbdev             }
314*51c0b2f7Stbbdev             spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
315*51c0b2f7Stbbdev         }
316*51c0b2f7Stbbdev     }
317*51c0b2f7Stbbdev }
318*51c0b2f7Stbbdev 
319*51c0b2f7Stbbdev } // namespace r1
320*51c0b2f7Stbbdev } // namespace detail
321*51c0b2f7Stbbdev } // namespace tbb
322