1 /*
2     Copyright (c) 2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_collaborative_call_once_H
18 #define __TBB_collaborative_call_once_H
19 
20 #include "task_arena.h"
21 #include "task_group.h"
22 
23 #include <atomic>
24 
25 namespace tbb {
26 namespace detail {
27 namespace d1 {
28 
29 #if _MSC_VER && !defined(__INTEL_COMPILER)
30     // Suppress warning: structure was padded due to alignment specifier
31     #pragma warning (push)
32     #pragma warning (disable: 4324)
33 #endif
34 
35 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
36 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;
37 
alignas(max_nfs_size)38 class alignas(max_nfs_size) collaborative_once_runner : no_copy {
39 
40     struct storage_t {
41         task_arena m_arena{ task_arena::attach{} };
42         wait_context m_wait_context{1};
43     };
44 
45     std::atomic<std::int64_t> m_ref_count{0};
46     std::atomic<bool> m_is_ready{false};
47 
48     // Storage with task_arena and wait_context must be initialized only by winner thread
49     union {
50         storage_t m_storage;
51     };
52 
53     template<typename Fn>
54     void isolated_execute(Fn f) {
55         auto func = [f] {
56             f();
57            // delegate_base requires bool returning functor while isolate_within_arena ignores the result
58             return true;
59         };
60 
61         delegated_function<decltype(func)> delegate(func);
62 
63         r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
64     }
65 
66 public:
67     class lifetime_guard : no_copy {
68         collaborative_once_runner& m_runner;
69     public:
70         lifetime_guard(collaborative_once_runner& r) : m_runner(r) {
71             m_runner.m_ref_count++;
72         }
73         ~lifetime_guard() {
74             m_runner.m_ref_count--;
75         }
76     };
77 
78     collaborative_once_runner() {}
79 
80     ~collaborative_once_runner() {
81         spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire);
82         if (m_is_ready.load(std::memory_order_relaxed)) {
83             m_storage.~storage_t();
84         }
85     }
86 
87     std::uintptr_t to_bits() {
88         return reinterpret_cast<std::uintptr_t>(this);
89     }
90 
91     static collaborative_once_runner* from_bits(std::uintptr_t bits) {
92         __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" );
93         return reinterpret_cast<collaborative_once_runner*>(bits);
94     }
95 
96     template <typename F>
97     void run_once(F&& f) {
98         __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized");
99         // Initialize internal state
100         new(&m_storage) storage_t();
101         m_storage.m_arena.execute([&] {
102             isolated_execute([&] {
103                 task_group_context context{ task_group_context::bound,
104                     task_group_context::default_traits | task_group_context::concurrent_wait };
105 
106                 function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
107 
108                 // Set the ready flag after entering the execute body to prevent
109                 // moonlighting threads from occupying all slots inside the arena.
110                 m_is_ready.store(true, std::memory_order_release);
111                 execute_and_wait(t, context, m_storage.m_wait_context, context);
112             });
113         });
114     }
115 
116     void assist() noexcept {
117         // Do not join the arena until the winner thread takes the slot
118         spin_wait_while_eq(m_is_ready, false);
119         m_storage.m_arena.execute([&] {
120             isolated_execute([&] {
121                 // We do not want to get an exception from user functor on moonlighting threads.
122                 // The exception is handled with the winner thread
123                 task_group_context stub_context;
124                 wait(m_storage.m_wait_context, stub_context);
125             });
126         });
127     }
128 
129 };
130 
131 class collaborative_once_flag : no_copy {
132     enum state : std::uintptr_t {
133         uninitialized,
134         done,
135 #if TBB_USE_ASSERT
136         dead
137 #endif
138     };
139     std::atomic<std::uintptr_t> m_state{ state::uninitialized };
140 
141     template <typename Fn, typename... Args>
142     friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);
143 
set_completion_state(std::uintptr_t runner_bits,std::uintptr_t desired)144     void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) {
145         std::uintptr_t expected = runner_bits;
146         do {
147             expected = runner_bits;
148             // Possible inefficiency: when we start waiting,
149             // some moonlighting threads might continue coming that will prolong our waiting.
150             // Fortunately, there are limited number of threads on the system so wait time is limited.
151             spin_wait_until_eq(m_state, expected);
152         } while (!m_state.compare_exchange_strong(expected, desired));
153     }
154 
155     template <typename Fn>
do_collaborative_call_once(Fn && f)156     void do_collaborative_call_once(Fn&& f) {
157         std::uintptr_t expected = m_state.load(std::memory_order_acquire);
158         collaborative_once_runner runner;
159 
160         do {
161             if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) {
162                 // Winner thread
163                 runner.run_once([&] {
164                     try_call([&] {
165                         std::forward<Fn>(f)();
166                     }).on_exception([&] {
167                         // Reset the state to uninitialized to allow other threads to try initialization again
168                         set_completion_state(runner.to_bits(), state::uninitialized);
169                     });
170                     // We successfully executed functor
171                     set_completion_state(runner.to_bits(), state::done);
172                 });
173                 break;
174             } else {
175                 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime.
176                 // However, the maximum number of references are limited with runner alignment.
177                 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value".
178                 do {
179                     auto max_value = expected | collaborative_once_references_mask;
180                     expected = spin_wait_while_eq(m_state, max_value);
181                 // "expected > state::done" prevents storing values, when state is uninitialized or done
182                 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1));
183 
184                 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) {
185                     collaborative_once_runner::lifetime_guard guard{*shared_runner};
186                     m_state.fetch_sub(1);
187 
188                     // The moonlighting threads are not expected to handle exceptions from user functor.
189                     // Therefore, no exception is expected from assist().
190                     shared_runner->assist();
191                 }
192             }
193             __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead,
194                          "collaborative_once_flag has been prematurely destroyed");
195         } while (expected != state::done);
196     }
197 
198 #if TBB_USE_ASSERT
199 public:
~collaborative_once_flag()200     ~collaborative_once_flag() {
201         m_state.store(state::dead, std::memory_order_relaxed);
202     }
203 #endif
204 };
205 
206 
207 template <typename Fn, typename... Args>
collaborative_call_once(collaborative_once_flag & flag,Fn && fn,Args &&...args)208 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
209     __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead,
210                  "collaborative_once_flag has been prematurely destroyed");
211     if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) {
212     #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
213         // Using stored_pack to suppress bug in GCC 4.8
214         // with parameter pack expansion in lambda
215         auto stored_pack = save_pack(std::forward<Args>(args)...);
216         auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); };
217     #else
218         auto func = [&] { fn(std::forward<Args>(args)...); };
219     #endif
220         flag.do_collaborative_call_once(func);
221     }
222 }
223 
224 #if _MSC_VER && !defined(__INTEL_COMPILER)
225     #pragma warning (pop) // 4324 warning
226 #endif
227 
228 } // namespace d1
229 } // namespace detail
230 
231 using detail::d1::collaborative_call_once;
232 using detail::d1::collaborative_once_flag;
233 } // namespace tbb
234 
235 #endif // __TBB_collaborative_call_once_H
236