xref: /oneTBB/src/tbb/observer_proxy.cpp (revision 8dcbd5b1)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/detail/_utils.h"
19 
20 #include "observer_proxy.h"
21 #include "arena.h"
22 #include "main.h"
23 #include "thread_data.h"
24 
25 #include <atomic>
26 
27 namespace tbb {
28 namespace detail {
29 namespace r1 {
30 
31 #if TBB_USE_ASSERT
32 extern std::atomic<int> the_observer_proxy_count;
33 #endif /* TBB_USE_ASSERT */
34 
35 observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
36     : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
37 {
38 #if TBB_USE_ASSERT
39     ++the_observer_proxy_count;
40 #endif /* TBB_USE_ASSERT */
41 }
42 
43 observer_proxy::~observer_proxy() {
44     __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
45     poison_value(my_ref_count);
46     poison_pointer(my_prev);
47     poison_pointer(my_next);
48 #if TBB_USE_ASSERT
49     --the_observer_proxy_count;
50 #endif /* TBB_USE_ASSERT */
51 }
52 
53 void observer_list::clear() {
54     // Though the method will work fine for the empty list, we require the caller
55     // to check for the list emptiness before invoking it to avoid extra overhead.
56     __TBB_ASSERT( !empty(), NULL );
57     {
58         scoped_lock lock(mutex(), /*is_writer=*/true);
59         observer_proxy *next = my_head;
60         while ( observer_proxy *p = next ) {
61             next = p->my_next;
62             // Both proxy p and observer p->my_observer (if non-null) are guaranteed
63             // to be alive while the list is locked.
64             d1::task_scheduler_observer *obs = p->my_observer;
65             // Make sure that possible concurrent observer destruction does not
66             // conflict with the proxy list cleanup.
67             if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
68                 continue;
69             }
70             // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
71             __TBB_ASSERT(!next || p == next->my_prev, nullptr);
72             __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
73             __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
74             poison_pointer(p->my_observer);
75             remove(p);
76             --p->my_ref_count;
77             delete p;
78         }
79     }
80 
81     // If observe(false) is called concurrently with the destruction of the arena,
82     // need to wait until all proxies are removed.
83     for (atomic_backoff backoff; ; backoff.pause()) {
84         scoped_lock lock(mutex(), /*is_writer=*/false);
85         if (my_head == nullptr) {
86             break;
87         }
88     }
89 
90     __TBB_ASSERT(my_head == nullptr && my_tail == nullptr, nullptr);
91 }
92 
93 void observer_list::insert( observer_proxy* p ) {
94     scoped_lock lock(mutex(), /*is_writer=*/true);
95     if (my_head) {
96         p->my_prev = my_tail;
97         my_tail->my_next = p;
98     } else {
99         my_head = p;
100     }
101     my_tail = p;
102 }
103 
104 void observer_list::remove(observer_proxy* p) {
105     __TBB_ASSERT(my_head, "Attempt to remove an item from an empty list");
106     __TBB_ASSERT(!my_tail->my_next, "Last item's my_next must be NULL");
107     if (p == my_tail) {
108         __TBB_ASSERT(!p->my_next, nullptr);
109         my_tail = p->my_prev;
110     } else {
111         __TBB_ASSERT(p->my_next, nullptr);
112         p->my_next->my_prev = p->my_prev;
113     }
114     if (p == my_head) {
115         __TBB_ASSERT(!p->my_prev, nullptr);
116         my_head = p->my_next;
117     } else {
118         __TBB_ASSERT(p->my_prev, nullptr);
119         p->my_prev->my_next = p->my_next;
120     }
121     __TBB_ASSERT((my_head && my_tail) || (!my_head && !my_tail), nullptr);
122 }
123 
124 void observer_list::remove_ref(observer_proxy* p) {
125     std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
126     __TBB_ASSERT(is_alive(r), nullptr);
127     while (r > 1) {
128         if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
129             return;
130         }
131     }
132     __TBB_ASSERT(r == 1, nullptr);
133     // Reference count might go to zero
134     {
135         // Use lock to avoid resurrection by a thread concurrently walking the list
136         observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
137         r = --p->my_ref_count;
138         if (!r) {
139             remove(p);
140         }
141     }
142     __TBB_ASSERT(r || !p->my_ref_count, nullptr);
143     if (!r) {
144         delete p;
145     }
146 }
147 
148 void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
149     // Pointer p marches though the list from last (exclusively) to the end.
150     observer_proxy* p = last, * prev = p;
151     for (;;) {
152         d1::task_scheduler_observer* tso = nullptr;
153         // Hold lock on list only long enough to advance to the next proxy in the list.
154         {
155             scoped_lock lock(mutex(), /*is_writer=*/false);
156             do {
157                 if (p) {
158                     // We were already processing the list.
159                     if (observer_proxy* q = p->my_next) {
160                         if (p == prev) {
161                             remove_ref_fast(prev); // sets prev to NULL if successful
162                         }
163                         p = q;
164                     } else {
165                         // Reached the end of the list.
166                         if (p == prev) {
167                             // Keep the reference as we store the 'last' pointer in scheduler
168                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
169                         } else {
170                             // The last few proxies were empty
171                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
172                             ++p->my_ref_count;
173                             if (prev) {
174                                 lock.release();
175                                 remove_ref(prev);
176                             }
177                         }
178                         last = p;
179                         return;
180                     }
181                 } else {
182                     // Starting pass through the list
183                     p = my_head;
184                     if (!p) {
185                         return;
186                     }
187                 }
188                 tso = p->my_observer;
189             } while (!tso);
190             ++p->my_ref_count;
191             ++tso->my_busy_count;
192         }
193         __TBB_ASSERT(!prev || p != prev, nullptr);
194         // Release the proxy pinned before p
195         if (prev) {
196             remove_ref(prev);
197         }
198         // Do not hold any locks on the list while calling user's code.
199         // Do not intercept any exceptions that may escape the callback so that
200         // they are either handled by the TBB scheduler or passed to the debugger.
201         tso->on_scheduler_entry(worker);
202         __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
203         intptr_t bc = --tso->my_busy_count;
204         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
205         prev = p;
206     }
207 }
208 
209 void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
210     // Pointer p marches though the list from the beginning to last (inclusively).
211     observer_proxy* p = nullptr, * prev = nullptr;
212     for (;;) {
213         d1::task_scheduler_observer* tso = nullptr;
214         // Hold lock on list only long enough to advance to the next proxy in the list.
215         {
216             scoped_lock lock(mutex(), /*is_writer=*/false);
217             do {
218                 if (p) {
219                     // We were already processing the list.
220                     if (p != last) {
221                         __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
222                         if (p == prev)
223                             remove_ref_fast(prev); // sets prev to NULL if successful
224                         p = p->my_next;
225                     } else {
226                         // remove the reference from the last item
227                         remove_ref_fast(p);
228                         if (p) {
229                             lock.release();
230                             if (p != prev && prev) {
231                                 remove_ref(prev);
232                             }
233                             remove_ref(p);
234                         }
235                         return;
236                     }
237                 } else {
238                     // Starting pass through the list
239                     p = my_head;
240                     __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
241                 }
242                 tso = p->my_observer;
243             } while (!tso);
244             // The item is already refcounted
245             if (p != last) // the last is already referenced since entry notification
246                 ++p->my_ref_count;
247             ++tso->my_busy_count;
248         }
249         __TBB_ASSERT(!prev || p != prev, nullptr);
250         if (prev)
251             remove_ref(prev);
252         // Do not hold any locks on the list while calling user's code.
253         // Do not intercept any exceptions that may escape the callback so that
254         // they are either handled by the TBB scheduler or passed to the debugger.
255         tso->on_scheduler_exit(worker);
256         __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
257         intptr_t bc = --tso->my_busy_count;
258         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
259         prev = p;
260     }
261 }
262 
263 void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
264     if( enable ) {
265         if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
266             observer_proxy* p = new observer_proxy(tso);
267             tso.my_proxy.store(p, std::memory_order_relaxed);
268             tso.my_busy_count.store(0, std::memory_order_relaxed);
269 
270             thread_data* td = governor::get_thread_data_if_initialized();
271             if (p->my_observer->my_task_arena == nullptr) {
272                 if (!(td && td->my_arena)) {
273                     td = governor::get_thread_data();
274                 }
275                 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
276                 __TBB_ASSERT(td && td->my_arena, nullptr);
277                 p->my_list = &td->my_arena->my_observers;
278             } else {
279                 d1::task_arena* ta = p->my_observer->my_task_arena;
280                 arena* a = ta->my_arena;
281                 if (a == nullptr) { // Avoid recursion during arena initialization
282                     ta->initialize();
283                     a = ta->my_arena;
284                 }
285                 __TBB_ASSERT(a != nullptr, nullptr);
286                 p->my_list = &ta->my_arena->my_observers;
287             }
288             p->my_list->insert(p);
289             // Notify newly activated observer and other pending ones if it belongs to current arena
290             if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
291                 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
292             }
293         }
294     } else {
295         // Make sure that possible concurrent proxy list cleanup does not conflict
296         // with the observer destruction here.
297         if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
298             // List destruction should not touch this proxy after we've won the above interlocked exchange.
299             __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
300             __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
301             __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
302             observer_list &list = *proxy->my_list;
303             {
304                 // Ensure that none of the list walkers relies on observer pointer validity
305                 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
306                 proxy->my_observer = nullptr;
307                 // Proxy may still be held by other threads (to track the last notified observer)
308                 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
309                     list.remove(proxy);
310                     __TBB_ASSERT( !proxy->my_ref_count, NULL );
311                     delete proxy;
312                 }
313             }
314             spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
315         }
316     }
317 }
318 
319 } // namespace r1
320 } // namespace detail
321 } // namespace tbb
322