1 /* 2 Copyright (c) 2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_collaborative_call_once_H 18 #define __TBB_collaborative_call_once_H 19 20 #include "task_arena.h" 21 #include "task_group.h" 22 23 #include <atomic> 24 25 namespace tbb { 26 namespace detail { 27 namespace d1 { 28 29 #if _MSC_VER && !defined(__INTEL_COMPILER) 30 // Suppress warning: structure was padded due to alignment specifier 31 #pragma warning (push) 32 #pragma warning (disable: 4324) 33 #endif 34 35 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size; 36 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1; 37 38 class alignas(max_nfs_size) collaborative_once_runner : no_copy { 39 40 struct storage_t { 41 task_arena m_arena{ task_arena::attach{} }; 42 wait_context m_wait_context{1}; 43 }; 44 45 std::atomic<std::int64_t> m_ref_count{0}; 46 std::atomic<bool> m_is_ready{false}; 47 48 // Storage with task_arena and wait_context must be initialized only by winner thread 49 union { 50 storage_t m_storage; 51 }; 52 53 template<typename Fn> 54 void isolated_execute(Fn f) { 55 auto func = [f] { 56 f(); 57 // delegate_base requires bool returning functor while isolate_within_arena ignores the result 58 return true; 59 }; 60 61 delegated_function<decltype(func)> delegate(func); 62 63 r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this)); 64 } 65 66 public: 67 class lifetime_guard : no_copy { 68 collaborative_once_runner& m_runner; 69 public: 70 lifetime_guard(collaborative_once_runner& r) : m_runner(r) { 71 m_runner.m_ref_count++; 72 } 73 ~lifetime_guard() { 74 m_runner.m_ref_count--; 75 } 76 }; 77 78 collaborative_once_runner() {} 79 80 ~collaborative_once_runner() { 81 spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire); 82 if (m_is_ready.load(std::memory_order_relaxed)) { 83 m_storage.~storage_t(); 84 } 85 } 86 87 std::uintptr_t to_bits() { 88 return reinterpret_cast<std::uintptr_t>(this); 89 } 90 91 static collaborative_once_runner* from_bits(std::uintptr_t bits) { 92 __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" ); 93 return reinterpret_cast<collaborative_once_runner*>(bits); 94 } 95 96 template <typename F> 97 void run_once(F&& f) { 98 __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized"); 99 // Initialize internal state 100 new(&m_storage) storage_t(); 101 m_storage.m_arena.execute([&] { 102 isolated_execute([&] { 103 task_group_context context{ task_group_context::bound, 104 task_group_context::default_traits | task_group_context::concurrent_wait }; 105 106 function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context }; 107 108 // Set the ready flag after entering the execute body to prevent 109 // moonlighting threads from occupying all slots inside the arena. 110 m_is_ready.store(true, std::memory_order_release); 111 execute_and_wait(t, context, m_storage.m_wait_context, context); 112 }); 113 }); 114 } 115 116 void assist() noexcept { 117 // Do not join the arena until the winner thread takes the slot 118 spin_wait_while_eq(m_is_ready, false); 119 m_storage.m_arena.execute([&] { 120 isolated_execute([&] { 121 // We do not want to get an exception from user functor on moonlighting threads. 122 // The exception is handled with the winner thread 123 task_group_context stub_context; 124 wait(m_storage.m_wait_context, stub_context); 125 }); 126 }); 127 } 128 129 }; 130 131 class collaborative_once_flag : no_copy { 132 enum state : std::uintptr_t { 133 uninitialized, 134 done, 135 #if TBB_USE_ASSERT 136 dead 137 #endif 138 }; 139 std::atomic<std::uintptr_t> m_state{ state::uninitialized }; 140 141 template <typename Fn, typename... Args> 142 friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args); 143 144 void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) { 145 std::uintptr_t expected = runner_bits; 146 do { 147 expected = runner_bits; 148 // Possible inefficiency: when we start waiting, 149 // some moonlighting threads might continue coming that will prolong our waiting. 150 // Fortunately, there are limited number of threads on the system so wait time is limited. 151 spin_wait_until_eq(m_state, expected); 152 } while (!m_state.compare_exchange_strong(expected, desired)); 153 } 154 155 template <typename Fn> 156 void do_collaborative_call_once(Fn&& f) { 157 std::uintptr_t expected = m_state.load(std::memory_order_acquire); 158 collaborative_once_runner runner; 159 160 do { 161 if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) { 162 // Winner thread 163 runner.run_once([&] { 164 try_call([&] { 165 std::forward<Fn>(f)(); 166 }).on_exception([&] { 167 // Reset the state to uninitialized to allow other threads to try initialization again 168 set_completion_state(runner.to_bits(), state::uninitialized); 169 }); 170 // We successfully executed functor 171 set_completion_state(runner.to_bits(), state::done); 172 }); 173 break; 174 } else { 175 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime. 176 // However, the maximum number of references are limited with runner alignment. 177 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value". 178 do { 179 auto max_value = expected | collaborative_once_references_mask; 180 expected = spin_wait_while_eq(m_state, max_value); 181 // "expected > state::done" prevents storing values, when state is uninitialized or done 182 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1)); 183 184 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) { 185 collaborative_once_runner::lifetime_guard guard{*shared_runner}; 186 m_state.fetch_sub(1); 187 188 // The moonlighting threads are not expected to handle exceptions from user functor. 189 // Therefore, no exception is expected from assist(). 190 shared_runner->assist(); 191 } 192 } 193 __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead, 194 "collaborative_once_flag has been prematurely destroyed"); 195 } while (expected != state::done); 196 } 197 198 #if TBB_USE_ASSERT 199 public: 200 ~collaborative_once_flag() { 201 m_state.store(state::dead, std::memory_order_relaxed); 202 } 203 #endif 204 }; 205 206 207 template <typename Fn, typename... Args> 208 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) { 209 __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead, 210 "collaborative_once_flag has been prematurely destroyed"); 211 if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) { 212 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 213 // Using stored_pack to suppress bug in GCC 4.8 214 // with parameter pack expansion in lambda 215 auto stored_pack = save_pack(std::forward<Args>(args)...); 216 auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); }; 217 #else 218 auto func = [&] { fn(std::forward<Args>(args)...); }; 219 #endif 220 flag.do_collaborative_call_once(func); 221 } 222 } 223 224 #if _MSC_VER && !defined(__INTEL_COMPILER) 225 #pragma warning (pop) // 4324 warning 226 #endif 227 228 } // namespace d1 229 } // namespace detail 230 231 using detail::d1::collaborative_call_once; 232 using detail::d1::collaborative_once_flag; 233 } // namespace tbb 234 235 #endif // __TBB_collaborative_call_once_H 236