1 /*
2     Copyright (c) 2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_collaborative_call_once_H
18 #define __TBB_collaborative_call_once_H
19 
20 #if __TBB_PREVIEW_COLLABORATIVE_CALL_ONCE
21 
22 #include "tbb/task_arena.h"
23 #include "tbb/task_group.h"
24 #include "tbb/task.h"
25 
26 #include <atomic>
27 
28 namespace tbb {
29 namespace detail {
30 namespace d1 {
31 
32 #if _MSC_VER && !defined(__INTEL_COMPILER)
33     // Suppress warning: structure was padded due to alignment specifier
34     #pragma warning (push)
35     #pragma warning (disable: 4324)
36 #endif
37 
38 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
39 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;
40 
41 class alignas(max_nfs_size) collaborative_once_runner : no_copy {
42 
43     struct storage_t {
44         task_arena m_arena{ task_arena::attach{} };
45         wait_context m_wait_context{1};
46     };
47 
48     std::atomic<std::int64_t> m_ref_count{0};
49     std::atomic<bool> m_is_ready{false};
50 
51     // Storage with task_arena and wait_context must be initialized only by winner thread
52     union {
53         storage_t m_storage;
54     };
55 
56     template<typename Fn>
57     void isolated_execute(Fn f) {
58         auto func = [f] {
59             f();
60            // delegate_base requires bool returning functor while isolate_within_arena ignores the result
61             return true;
62         };
63 
64         delegated_function<decltype(func)> delegate(func);
65 
66         r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
67     }
68 
69 public:
70     class lifetime_guard : no_copy {
71         collaborative_once_runner& m_runner;
72     public:
73         lifetime_guard(collaborative_once_runner& r) : m_runner(r) {
74             m_runner.m_ref_count++;
75         }
76         ~lifetime_guard() {
77             m_runner.m_ref_count--;
78         }
79     };
80 
81     collaborative_once_runner() {}
82 
83     ~collaborative_once_runner() {
84         spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire);
85         if (m_is_ready.load(std::memory_order_relaxed)) {
86             m_storage.~storage_t();
87         }
88     }
89 
90     std::uintptr_t to_bits() {
91         return reinterpret_cast<std::uintptr_t>(this);
92     }
93 
94     static collaborative_once_runner* from_bits(std::uintptr_t bits) {
95         __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" );
96         return reinterpret_cast<collaborative_once_runner*>(bits);
97     }
98 
99     template <typename F>
100     void run_once(F&& f) {
101         __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized");
102         // Initialize internal state
103         new(&m_storage) storage_t();
104         m_storage.m_arena.execute([&] {
105             isolated_execute([&] {
106                 task_group_context context{ task_group_context::bound,
107                     task_group_context::default_traits | task_group_context::concurrent_wait };
108 
109                 function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
110 
111                 // Set the ready flag after entering the execute body to prevent
112                 // moonlighting threads from occupying all slots inside the arena.
113                 m_is_ready.store(true, std::memory_order_release);
114                 execute_and_wait(t, context, m_storage.m_wait_context, context);
115             });
116         });
117     }
118 
119     void assist() noexcept {
120         // Do not join the arena until the winner thread takes the slot
121         spin_wait_while_eq(m_is_ready, false);
122         m_storage.m_arena.execute([&] {
123             isolated_execute([&] {
124                 // We do not want to get an exception from user functor on moonlighting threads.
125                 // The exception is handled with the winner thread
126                 task_group_context stub_context;
127                 wait(m_storage.m_wait_context, stub_context);
128             });
129         });
130     }
131 
132 };
133 
134 class collaborative_once_flag : no_copy {
135     enum state : std::uintptr_t {
136         uninitialized,
137         done,
138 #if TBB_USE_ASSERT
139         dead
140 #endif
141     };
142     std::atomic<std::uintptr_t> m_state{ state::uninitialized };
143 
144     template <typename Fn, typename... Args>
145     friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);
146 
147     void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) {
148         std::uintptr_t expected = runner_bits;
149         do {
150             expected = runner_bits;
151             // Possible inefficiency: when we start waiting,
152             // some moonlighting threads might continue coming that will prolong our waiting.
153             // Fortunately, there are limited number of threads on the system so wait time is limited.
154             spin_wait_until_eq(m_state, expected);
155         } while (!m_state.compare_exchange_strong(expected, desired));
156     }
157 
158     template <typename Fn>
159     void do_collaborative_call_once(Fn&& f) {
160         std::uintptr_t expected = m_state.load(std::memory_order_acquire);
161         collaborative_once_runner runner;
162 
163         do {
164             if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) {
165                 // Winner thread
166                 runner.run_once([&] {
167                     try_call([&] {
168                         std::forward<Fn>(f)();
169                     }).on_exception([&] {
170                         // Reset the state to uninitialized to allow other threads to try initialization again
171                         set_completion_state(runner.to_bits(), state::uninitialized);
172                     });
173                     // We successfully executed functor
174                     set_completion_state(runner.to_bits(), state::done);
175                 });
176                 break;
177             } else {
178                 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime.
179                 // However, the maximum number of references are limited with runner alignment.
180                 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value".
181                 do {
182                     auto max_value = expected | collaborative_once_references_mask;
183                     expected = spin_wait_while_eq(m_state, max_value);
184                 // "expected > state::done" prevents storing values, when state is uninitialized or done
185                 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1));
186 
187                 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) {
188                     collaborative_once_runner::lifetime_guard guard{*shared_runner};
189                     m_state.fetch_sub(1);
190 
191                     // The moonlighting threads are not expected to handle exceptions from user functor.
192                     // Therefore, no exception is expected from assist().
193                     shared_runner->assist();
194                 }
195             }
196             __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead,
197                          "collaborative_once_flag has been prematurely destroyed");
198         } while (expected != state::done);
199     }
200 
201 #if TBB_USE_ASSERT
202 public:
203     ~collaborative_once_flag() {
204         m_state.store(state::dead, std::memory_order_relaxed);
205     }
206 #endif
207 };
208 
209 
210 template <typename Fn, typename... Args>
211 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
212     __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead,
213                  "collaborative_once_flag has been prematurely destroyed");
214     if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) {
215     #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
216         // Using stored_pack to suppress bug in GCC 4.8
217         // with parameter pack expansion in lambda
218         auto stored_pack = save_pack(std::forward<Args>(args)...);
219         auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); };
220     #else
221         auto func = [&] { fn(std::forward<Args>(args)...); };
222     #endif
223         flag.do_collaborative_call_once(func);
224     }
225 }
226 
227 #if _MSC_VER && !defined(__INTEL_COMPILER)
228     #pragma warning (pop) // 4324 warning
229 #endif
230 
231 } // namespace d1
232 } // namespace detail
233 
234 using detail::d1::collaborative_call_once;
235 using detail::d1::collaborative_once_flag;
236 } // namespace tbb
237 
238 #endif // __TBB_PREVIEW_COLLABORATIVE_CALL_ONCE
239 #endif // __TBB_collaborative_call_once_H
240