1 /*
2 Copyright (c) 2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_collaborative_call_once_H
18 #define __TBB_collaborative_call_once_H
19
20 #include "task_arena.h"
21 #include "task_group.h"
22
23 #include <atomic>
24
25 namespace tbb {
26 namespace detail {
27 namespace d1 {
28
29 #if _MSC_VER && !defined(__INTEL_COMPILER)
30 // Suppress warning: structure was padded due to alignment specifier
31 #pragma warning (push)
32 #pragma warning (disable: 4324)
33 #endif
34
35 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
36 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;
37
alignas(max_nfs_size)38 class alignas(max_nfs_size) collaborative_once_runner : no_copy {
39
40 struct storage_t {
41 task_arena m_arena{ task_arena::attach{} };
42 wait_context m_wait_context{1};
43 };
44
45 std::atomic<std::int64_t> m_ref_count{0};
46 std::atomic<bool> m_is_ready{false};
47
48 // Storage with task_arena and wait_context must be initialized only by winner thread
49 union {
50 storage_t m_storage;
51 };
52
53 template<typename Fn>
54 void isolated_execute(Fn f) {
55 auto func = [f] {
56 f();
57 // delegate_base requires bool returning functor while isolate_within_arena ignores the result
58 return true;
59 };
60
61 delegated_function<decltype(func)> delegate(func);
62
63 r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
64 }
65
66 public:
67 class lifetime_guard : no_copy {
68 collaborative_once_runner& m_runner;
69 public:
70 lifetime_guard(collaborative_once_runner& r) : m_runner(r) {
71 m_runner.m_ref_count++;
72 }
73 ~lifetime_guard() {
74 m_runner.m_ref_count--;
75 }
76 };
77
78 collaborative_once_runner() {}
79
80 ~collaborative_once_runner() {
81 spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire);
82 if (m_is_ready.load(std::memory_order_relaxed)) {
83 m_storage.~storage_t();
84 }
85 }
86
87 std::uintptr_t to_bits() {
88 return reinterpret_cast<std::uintptr_t>(this);
89 }
90
91 static collaborative_once_runner* from_bits(std::uintptr_t bits) {
92 __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" );
93 return reinterpret_cast<collaborative_once_runner*>(bits);
94 }
95
96 template <typename F>
97 void run_once(F&& f) {
98 __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized");
99 // Initialize internal state
100 new(&m_storage) storage_t();
101 m_storage.m_arena.execute([&] {
102 isolated_execute([&] {
103 task_group_context context{ task_group_context::bound,
104 task_group_context::default_traits | task_group_context::concurrent_wait };
105
106 function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
107
108 // Set the ready flag after entering the execute body to prevent
109 // moonlighting threads from occupying all slots inside the arena.
110 m_is_ready.store(true, std::memory_order_release);
111 execute_and_wait(t, context, m_storage.m_wait_context, context);
112 });
113 });
114 }
115
116 void assist() noexcept {
117 // Do not join the arena until the winner thread takes the slot
118 spin_wait_while_eq(m_is_ready, false);
119 m_storage.m_arena.execute([&] {
120 isolated_execute([&] {
121 // We do not want to get an exception from user functor on moonlighting threads.
122 // The exception is handled with the winner thread
123 task_group_context stub_context;
124 wait(m_storage.m_wait_context, stub_context);
125 });
126 });
127 }
128
129 };
130
131 class collaborative_once_flag : no_copy {
132 enum state : std::uintptr_t {
133 uninitialized,
134 done,
135 #if TBB_USE_ASSERT
136 dead
137 #endif
138 };
139 std::atomic<std::uintptr_t> m_state{ state::uninitialized };
140
141 template <typename Fn, typename... Args>
142 friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);
143
set_completion_state(std::uintptr_t runner_bits,std::uintptr_t desired)144 void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) {
145 std::uintptr_t expected = runner_bits;
146 do {
147 expected = runner_bits;
148 // Possible inefficiency: when we start waiting,
149 // some moonlighting threads might continue coming that will prolong our waiting.
150 // Fortunately, there are limited number of threads on the system so wait time is limited.
151 spin_wait_until_eq(m_state, expected);
152 } while (!m_state.compare_exchange_strong(expected, desired));
153 }
154
155 template <typename Fn>
do_collaborative_call_once(Fn && f)156 void do_collaborative_call_once(Fn&& f) {
157 std::uintptr_t expected = m_state.load(std::memory_order_acquire);
158 collaborative_once_runner runner;
159
160 do {
161 if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) {
162 // Winner thread
163 runner.run_once([&] {
164 try_call([&] {
165 std::forward<Fn>(f)();
166 }).on_exception([&] {
167 // Reset the state to uninitialized to allow other threads to try initialization again
168 set_completion_state(runner.to_bits(), state::uninitialized);
169 });
170 // We successfully executed functor
171 set_completion_state(runner.to_bits(), state::done);
172 });
173 break;
174 } else {
175 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime.
176 // However, the maximum number of references are limited with runner alignment.
177 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value".
178 do {
179 auto max_value = expected | collaborative_once_references_mask;
180 expected = spin_wait_while_eq(m_state, max_value);
181 // "expected > state::done" prevents storing values, when state is uninitialized or done
182 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1));
183
184 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) {
185 collaborative_once_runner::lifetime_guard guard{*shared_runner};
186 m_state.fetch_sub(1);
187
188 // The moonlighting threads are not expected to handle exceptions from user functor.
189 // Therefore, no exception is expected from assist().
190 shared_runner->assist();
191 }
192 }
193 __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead,
194 "collaborative_once_flag has been prematurely destroyed");
195 } while (expected != state::done);
196 }
197
198 #if TBB_USE_ASSERT
199 public:
~collaborative_once_flag()200 ~collaborative_once_flag() {
201 m_state.store(state::dead, std::memory_order_relaxed);
202 }
203 #endif
204 };
205
206
207 template <typename Fn, typename... Args>
collaborative_call_once(collaborative_once_flag & flag,Fn && fn,Args &&...args)208 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
209 __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead,
210 "collaborative_once_flag has been prematurely destroyed");
211 if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) {
212 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
213 // Using stored_pack to suppress bug in GCC 4.8
214 // with parameter pack expansion in lambda
215 auto stored_pack = save_pack(std::forward<Args>(args)...);
216 auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); };
217 #else
218 auto func = [&] { fn(std::forward<Args>(args)...); };
219 #endif
220 flag.do_collaborative_call_once(func);
221 }
222 }
223
224 #if _MSC_VER && !defined(__INTEL_COMPILER)
225 #pragma warning (pop) // 4324 warning
226 #endif
227
228 } // namespace d1
229 } // namespace detail
230
231 using detail::d1::collaborative_call_once;
232 using detail::d1::collaborative_once_flag;
233 } // namespace tbb
234
235 #endif // __TBB_collaborative_call_once_H
236