1 /* 2 Copyright (c) 2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_collaborative_call_once_H 18 #define __TBB_collaborative_call_once_H 19 20 #if __TBB_PREVIEW_COLLABORATIVE_CALL_ONCE 21 22 #include "tbb/task_arena.h" 23 #include "tbb/task_group.h" 24 #include "tbb/task.h" 25 26 #include <atomic> 27 28 namespace tbb { 29 namespace detail { 30 namespace d1 { 31 32 #if _MSC_VER && !defined(__INTEL_COMPILER) 33 // Suppress warning: structure was padded due to alignment specifier 34 #pragma warning (push) 35 #pragma warning (disable: 4324) 36 #endif 37 38 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size; 39 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1; 40 41 class alignas(max_nfs_size) collaborative_once_runner : no_copy { 42 43 struct storage_t { 44 task_arena m_arena{ task_arena::attach{} }; 45 wait_context m_wait_context{1}; 46 }; 47 48 std::atomic<std::int64_t> m_ref_count{0}; 49 std::atomic<bool> m_is_ready{false}; 50 51 // Storage with task_arena and wait_context must be initialized only by winner thread 52 union { 53 storage_t m_storage; 54 }; 55 56 template<typename Fn> 57 void isolated_execute(Fn f) { 58 auto func = [f] { 59 f(); 60 // delegate_base requires bool returning functor while isolate_within_arena ignores the result 61 return true; 62 }; 63 64 delegated_function<decltype(func)> delegate(func); 65 66 r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this)); 67 } 68 69 public: 70 class lifetime_guard : no_copy { 71 collaborative_once_runner& m_runner; 72 public: 73 lifetime_guard(collaborative_once_runner& r) : m_runner(r) { 74 m_runner.m_ref_count++; 75 } 76 ~lifetime_guard() { 77 m_runner.m_ref_count--; 78 } 79 }; 80 81 collaborative_once_runner() {} 82 83 ~collaborative_once_runner() { 84 spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire); 85 if (m_is_ready.load(std::memory_order_relaxed)) { 86 m_storage.~storage_t(); 87 } 88 } 89 90 std::uintptr_t to_bits() { 91 return reinterpret_cast<std::uintptr_t>(this); 92 } 93 94 static collaborative_once_runner* from_bits(std::uintptr_t bits) { 95 __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" ); 96 return reinterpret_cast<collaborative_once_runner*>(bits); 97 } 98 99 template <typename F> 100 void run_once(F&& f) { 101 __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized"); 102 // Initialize internal state 103 new(&m_storage) storage_t(); 104 m_storage.m_arena.execute([&] { 105 isolated_execute([&] { 106 task_group_context context{ task_group_context::bound, 107 task_group_context::default_traits | task_group_context::concurrent_wait }; 108 109 function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context }; 110 111 // Set the ready flag after entering the execute body to prevent 112 // moonlighting threads from occupying all slots inside the arena. 113 m_is_ready.store(true, std::memory_order_release); 114 execute_and_wait(t, context, m_storage.m_wait_context, context); 115 }); 116 }); 117 } 118 119 void assist() noexcept { 120 // Do not join the arena until the winner thread takes the slot 121 spin_wait_while_eq(m_is_ready, false); 122 m_storage.m_arena.execute([&] { 123 isolated_execute([&] { 124 // We do not want to get an exception from user functor on moonlighting threads. 125 // The exception is handled with the winner thread 126 task_group_context stub_context; 127 wait(m_storage.m_wait_context, stub_context); 128 }); 129 }); 130 } 131 132 }; 133 134 class collaborative_once_flag : no_copy { 135 enum state : std::uintptr_t { 136 uninitialized, 137 done, 138 #if TBB_USE_ASSERT 139 dead 140 #endif 141 }; 142 std::atomic<std::uintptr_t> m_state{ state::uninitialized }; 143 144 template <typename Fn, typename... Args> 145 friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args); 146 147 void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) { 148 std::uintptr_t expected = runner_bits; 149 do { 150 expected = runner_bits; 151 // Possible inefficiency: when we start waiting, 152 // some moonlighting threads might continue coming that will prolong our waiting. 153 // Fortunately, there are limited number of threads on the system so wait time is limited. 154 spin_wait_until_eq(m_state, expected); 155 } while (!m_state.compare_exchange_strong(expected, desired)); 156 } 157 158 template <typename Fn> 159 void do_collaborative_call_once(Fn&& f) { 160 std::uintptr_t expected = m_state.load(std::memory_order_acquire); 161 collaborative_once_runner runner; 162 163 do { 164 if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) { 165 // Winner thread 166 runner.run_once([&] { 167 try_call([&] { 168 std::forward<Fn>(f)(); 169 }).on_exception([&] { 170 // Reset the state to uninitialized to allow other threads to try initialization again 171 set_completion_state(runner.to_bits(), state::uninitialized); 172 }); 173 // We successfully executed functor 174 set_completion_state(runner.to_bits(), state::done); 175 }); 176 break; 177 } else { 178 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime. 179 // However, the maximum number of references are limited with runner alignment. 180 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value". 181 do { 182 auto max_value = expected | collaborative_once_references_mask; 183 expected = spin_wait_while_eq(m_state, max_value); 184 // "expected > state::done" prevents storing values, when state is uninitialized or done 185 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1)); 186 187 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) { 188 collaborative_once_runner::lifetime_guard guard{*shared_runner}; 189 m_state.fetch_sub(1); 190 191 // The moonlighting threads are not expected to handle exceptions from user functor. 192 // Therefore, no exception is expected from assist(). 193 shared_runner->assist(); 194 } 195 } 196 __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead, 197 "collaborative_once_flag has been prematurely destroyed"); 198 } while (expected != state::done); 199 } 200 201 #if TBB_USE_ASSERT 202 public: 203 ~collaborative_once_flag() { 204 m_state.store(state::dead, std::memory_order_relaxed); 205 } 206 #endif 207 }; 208 209 210 template <typename Fn, typename... Args> 211 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) { 212 __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead, 213 "collaborative_once_flag has been prematurely destroyed"); 214 if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) { 215 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 216 // Using stored_pack to suppress bug in GCC 4.8 217 // with parameter pack expansion in lambda 218 auto stored_pack = save_pack(std::forward<Args>(args)...); 219 auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); }; 220 #else 221 auto func = [&] { fn(std::forward<Args>(args)...); }; 222 #endif 223 flag.do_collaborative_call_once(func); 224 } 225 } 226 227 #if _MSC_VER && !defined(__INTEL_COMPILER) 228 #pragma warning (pop) // 4324 warning 229 #endif 230 231 } // namespace d1 232 } // namespace detail 233 234 using detail::d1::collaborative_call_once; 235 using detail::d1::collaborative_once_flag; 236 } // namespace tbb 237 238 #endif // __TBB_PREVIEW_COLLABORATIVE_CALL_ONCE 239 #endif // __TBB_collaborative_call_once_H 240