xref: /oneTBB/src/tbb/observer_proxy.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/detail/_utils.h"
19 
20 #include "observer_proxy.h"
21 #include "arena.h"
22 #include "main.h"
23 #include "thread_data.h"
24 
25 #include <atomic>
26 
27 namespace tbb {
28 namespace detail {
29 namespace r1 {
30 
31 #if TBB_USE_ASSERT
32 extern std::atomic<int> the_observer_proxy_count;
33 #endif /* TBB_USE_ASSERT */
34 
35 observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
36     : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
37 {
38 #if TBB_USE_ASSERT
39     ++the_observer_proxy_count;
40 #endif /* TBB_USE_ASSERT */
41 }
42 
43 observer_proxy::~observer_proxy() {
44     __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
45     poison_value(my_ref_count);
46     poison_pointer(my_prev);
47     poison_pointer(my_next);
48 #if TBB_USE_ASSERT
49     --the_observer_proxy_count;
50 #endif /* TBB_USE_ASSERT */
51 }
52 
53 void observer_list::clear() {
54     // Though the method will work fine for the empty list, we require the caller
55     // to check for the list emptiness before invoking it to avoid extra overhead.
56     __TBB_ASSERT( !empty(), NULL );
57     {
58         scoped_lock lock(mutex(), /*is_writer=*/true);
59         observer_proxy *next = my_head.load(std::memory_order_relaxed);
60         while ( observer_proxy *p = next ) {
61             next = p->my_next;
62             // Both proxy p and observer p->my_observer (if non-null) are guaranteed
63             // to be alive while the list is locked.
64             d1::task_scheduler_observer *obs = p->my_observer;
65             // Make sure that possible concurrent observer destruction does not
66             // conflict with the proxy list cleanup.
67             if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
68                 continue;
69             }
70             // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
71             __TBB_ASSERT(!next || p == next->my_prev, nullptr);
72             __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
73             __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
74             poison_pointer(p->my_observer);
75             remove(p);
76             --p->my_ref_count;
77             delete p;
78         }
79     }
80 
81     // If observe(false) is called concurrently with the destruction of the arena,
82     // need to wait until all proxies are removed.
83     for (atomic_backoff backoff; ; backoff.pause()) {
84         scoped_lock lock(mutex(), /*is_writer=*/false);
85         if (my_head.load(std::memory_order_relaxed) == nullptr) {
86             break;
87         }
88     }
89 
90     __TBB_ASSERT(my_head.load(std::memory_order_relaxed) == nullptr && my_tail.load(std::memory_order_relaxed) == nullptr, nullptr);
91 }
92 
93 void observer_list::insert( observer_proxy* p ) {
94     scoped_lock lock(mutex(), /*is_writer=*/true);
95     if (my_head.load(std::memory_order_relaxed)) {
96         p->my_prev = my_tail.load(std::memory_order_relaxed);
97         my_tail.load(std::memory_order_relaxed)->my_next = p;
98     } else {
99         my_head.store(p, std::memory_order_relaxed);
100     }
101     my_tail.store(p, std::memory_order_relaxed);
102 }
103 
104 void observer_list::remove(observer_proxy* p) {
105     __TBB_ASSERT(my_head.load(std::memory_order_relaxed), "Attempt to remove an item from an empty list");
106     __TBB_ASSERT(!my_tail.load(std::memory_order_relaxed)->my_next, "Last item's my_next must be NULL");
107     if (p == my_tail.load(std::memory_order_relaxed)) {
108         __TBB_ASSERT(!p->my_next, nullptr);
109         my_tail.store(p->my_prev, std::memory_order_relaxed);
110     } else {
111         __TBB_ASSERT(p->my_next, nullptr);
112         p->my_next->my_prev = p->my_prev;
113     }
114     if (p == my_head.load(std::memory_order_relaxed)) {
115         __TBB_ASSERT(!p->my_prev, nullptr);
116         my_head.store(p->my_next, std::memory_order_relaxed);
117     } else {
118         __TBB_ASSERT(p->my_prev, nullptr);
119         p->my_prev->my_next = p->my_next;
120     }
121     __TBB_ASSERT((my_head.load(std::memory_order_relaxed) && my_tail.load(std::memory_order_relaxed)) ||
122         (!my_head.load(std::memory_order_relaxed) && !my_tail.load(std::memory_order_relaxed)), nullptr);
123 }
124 
125 void observer_list::remove_ref(observer_proxy* p) {
126     std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
127     __TBB_ASSERT(is_alive(r), nullptr);
128     while (r > 1) {
129         if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
130             return;
131         }
132     }
133     __TBB_ASSERT(r == 1, nullptr);
134     // Reference count might go to zero
135     {
136         // Use lock to avoid resurrection by a thread concurrently walking the list
137         observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
138         r = --p->my_ref_count;
139         if (!r) {
140             remove(p);
141         }
142     }
143     __TBB_ASSERT(r || !p->my_ref_count, nullptr);
144     if (!r) {
145         delete p;
146     }
147 }
148 
149 void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
150     // Pointer p marches though the list from last (exclusively) to the end.
151     observer_proxy* p = last, * prev = p;
152     for (;;) {
153         d1::task_scheduler_observer* tso = nullptr;
154         // Hold lock on list only long enough to advance to the next proxy in the list.
155         {
156             scoped_lock lock(mutex(), /*is_writer=*/false);
157             do {
158                 if (p) {
159                     // We were already processing the list.
160                     if (observer_proxy* q = p->my_next) {
161                         if (p == prev) {
162                             remove_ref_fast(prev); // sets prev to NULL if successful
163                         }
164                         p = q;
165                     } else {
166                         // Reached the end of the list.
167                         if (p == prev) {
168                             // Keep the reference as we store the 'last' pointer in scheduler
169                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
170                         } else {
171                             // The last few proxies were empty
172                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
173                             ++p->my_ref_count;
174                             if (prev) {
175                                 lock.release();
176                                 remove_ref(prev);
177                             }
178                         }
179                         last = p;
180                         return;
181                     }
182                 } else {
183                     // Starting pass through the list
184                     p = my_head.load(std::memory_order_relaxed);
185                     if (!p) {
186                         return;
187                     }
188                 }
189                 tso = p->my_observer;
190             } while (!tso);
191             ++p->my_ref_count;
192             ++tso->my_busy_count;
193         }
194         __TBB_ASSERT(!prev || p != prev, nullptr);
195         // Release the proxy pinned before p
196         if (prev) {
197             remove_ref(prev);
198         }
199         // Do not hold any locks on the list while calling user's code.
200         // Do not intercept any exceptions that may escape the callback so that
201         // they are either handled by the TBB scheduler or passed to the debugger.
202         tso->on_scheduler_entry(worker);
203         __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
204         intptr_t bc = --tso->my_busy_count;
205         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
206         prev = p;
207     }
208 }
209 
210 void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
211     // Pointer p marches though the list from the beginning to last (inclusively).
212     observer_proxy* p = nullptr, * prev = nullptr;
213     for (;;) {
214         d1::task_scheduler_observer* tso = nullptr;
215         // Hold lock on list only long enough to advance to the next proxy in the list.
216         {
217             scoped_lock lock(mutex(), /*is_writer=*/false);
218             do {
219                 if (p) {
220                     // We were already processing the list.
221                     if (p != last) {
222                         __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
223                         if (p == prev)
224                             remove_ref_fast(prev); // sets prev to NULL if successful
225                         p = p->my_next;
226                     } else {
227                         // remove the reference from the last item
228                         remove_ref_fast(p);
229                         if (p) {
230                             lock.release();
231                             if (p != prev && prev) {
232                                 remove_ref(prev);
233                             }
234                             remove_ref(p);
235                         }
236                         return;
237                     }
238                 } else {
239                     // Starting pass through the list
240                     p = my_head.load(std::memory_order_relaxed);
241                     __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
242                 }
243                 tso = p->my_observer;
244             } while (!tso);
245             // The item is already refcounted
246             if (p != last) // the last is already referenced since entry notification
247                 ++p->my_ref_count;
248             ++tso->my_busy_count;
249         }
250         __TBB_ASSERT(!prev || p != prev, nullptr);
251         if (prev)
252             remove_ref(prev);
253         // Do not hold any locks on the list while calling user's code.
254         // Do not intercept any exceptions that may escape the callback so that
255         // they are either handled by the TBB scheduler or passed to the debugger.
256         tso->on_scheduler_exit(worker);
257         __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
258         intptr_t bc = --tso->my_busy_count;
259         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
260         prev = p;
261     }
262 }
263 
264 void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
265     if( enable ) {
266         if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
267             observer_proxy* p = new observer_proxy(tso);
268             tso.my_proxy.store(p, std::memory_order_relaxed);
269             tso.my_busy_count.store(0, std::memory_order_relaxed);
270 
271             thread_data* td = governor::get_thread_data_if_initialized();
272             if (p->my_observer->my_task_arena == nullptr) {
273                 if (!(td && td->my_arena)) {
274                     td = governor::get_thread_data();
275                 }
276                 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
277                 __TBB_ASSERT(td && td->my_arena, nullptr);
278                 p->my_list = &td->my_arena->my_observers;
279             } else {
280                 d1::task_arena* ta = p->my_observer->my_task_arena;
281                 arena* a = ta->my_arena.load(std::memory_order_acquire);
282                 if (a == nullptr) { // Avoid recursion during arena initialization
283                     ta->initialize();
284                     a = ta->my_arena.load(std::memory_order_relaxed);
285                 }
286                 __TBB_ASSERT(a != nullptr, nullptr);
287                 p->my_list = &a->my_observers;
288             }
289             p->my_list->insert(p);
290             // Notify newly activated observer and other pending ones if it belongs to current arena
291             if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
292                 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
293             }
294         }
295     } else {
296         // Make sure that possible concurrent proxy list cleanup does not conflict
297         // with the observer destruction here.
298         if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
299             // List destruction should not touch this proxy after we've won the above interlocked exchange.
300             __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
301             __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
302             __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
303             observer_list &list = *proxy->my_list;
304             {
305                 // Ensure that none of the list walkers relies on observer pointer validity
306                 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
307                 proxy->my_observer = nullptr;
308                 // Proxy may still be held by other threads (to track the last notified observer)
309                 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
310                     list.remove(proxy);
311                     __TBB_ASSERT( !proxy->my_ref_count, NULL );
312                     delete proxy;
313                 }
314             }
315             spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
316         }
317     }
318 }
319 
320 } // namespace r1
321 } // namespace detail
322 } // namespace tbb
323