1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_config.h" 18 #include "oneapi/tbb/detail/_template_helpers.h" 19 20 #include "oneapi/tbb/global_control.h" 21 #include "oneapi/tbb/tbb_allocator.h" 22 #include "oneapi/tbb/spin_mutex.h" 23 24 #include "governor.h" 25 #include "market.h" 26 #include "misc.h" 27 28 #include <atomic> 29 #include <set> 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 //! Comparator for a set of global_control objects 36 struct control_storage_comparator { 37 bool operator()(const global_control* lhs, const global_control* rhs) const; 38 }; 39 40 class control_storage { 41 friend struct global_control_impl; 42 friend std::size_t global_control_active_value(int); 43 protected: 44 std::size_t my_active_value{0}; 45 std::set<global_control*, control_storage_comparator, tbb_allocator<global_control*>> my_list{}; 46 spin_mutex my_list_mutex{}; 47 public: 48 virtual std::size_t default_value() const = 0; 49 virtual void apply_active(std::size_t new_active) { 50 my_active_value = new_active; 51 } 52 virtual bool is_first_arg_preferred(std::size_t a, std::size_t b) const { 53 return a>b; // prefer max by default 54 } 55 virtual std::size_t active_value() { 56 spin_mutex::scoped_lock lock(my_list_mutex); // protect my_list.empty() call 57 return !my_list.empty() ? my_active_value : default_value(); 58 } 59 }; 60 61 class alignas(max_nfs_size) allowed_parallelism_control : public control_storage { 62 std::size_t default_value() const override { 63 return max(1U, governor::default_num_threads()); 64 } 65 bool is_first_arg_preferred(std::size_t a, std::size_t b) const override { 66 return a<b; // prefer min allowed parallelism 67 } 68 void apply_active(std::size_t new_active) override { 69 control_storage::apply_active(new_active); 70 __TBB_ASSERT( my_active_value>=1, NULL ); 71 // -1 to take external thread into account 72 market::set_active_num_workers( my_active_value-1 ); 73 } 74 std::size_t active_value() override { 75 spin_mutex::scoped_lock lock(my_list_mutex); // protect my_list.empty() call 76 if (my_list.empty()) 77 return default_value(); 78 // non-zero, if market is active 79 const std::size_t workers = market::max_num_workers(); 80 // We can't exceed market's maximal number of workers. 81 // +1 to take external thread into account 82 return workers? min(workers+1, my_active_value): my_active_value; 83 } 84 public: 85 std::size_t active_value_if_present() const { 86 return !my_list.empty() ? my_active_value : 0; 87 } 88 }; 89 90 class alignas(max_nfs_size) stack_size_control : public control_storage { 91 std::size_t default_value() const override { 92 #if _WIN32_WINNT >= 0x0602 /* _WIN32_WINNT_WIN8 */ 93 static auto ThreadStackSizeDefault = [] { 94 ULONG_PTR hi, lo; 95 GetCurrentThreadStackLimits(&lo, &hi); 96 return hi - lo; 97 }(); 98 return ThreadStackSizeDefault; 99 #else 100 return ThreadStackSize; 101 #endif 102 } 103 void apply_active(std::size_t new_active) override { 104 control_storage::apply_active(new_active); 105 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 106 __TBB_ASSERT( false, "For Windows 8 Store* apps we must not set stack size" ); 107 #endif 108 } 109 }; 110 111 class alignas(max_nfs_size) terminate_on_exception_control : public control_storage { 112 std::size_t default_value() const override { 113 return 0; 114 } 115 }; 116 117 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 118 class alignas(max_nfs_size) lifetime_control : public control_storage { 119 bool is_first_arg_preferred(std::size_t, std::size_t) const override { 120 return false; // not interested 121 } 122 std::size_t default_value() const override { 123 return 0; 124 } 125 void apply_active(std::size_t new_active) override { 126 if (new_active == 1) { 127 // reserve the market reference 128 market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex ); 129 if (market::theMarket) { 130 market::add_ref_unsafe(lock, /*is_public*/ true); 131 } 132 } else if (new_active == 0) { // new_active == 0 133 // release the market reference 134 market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex ); 135 if (market::theMarket != nullptr) { 136 lock.release(); 137 market::theMarket->release(/*is_public*/ true, /*blocking_terminate*/ false); 138 } 139 } 140 control_storage::apply_active(new_active); 141 } 142 143 public: 144 bool is_empty() { 145 spin_mutex::scoped_lock lock(my_list_mutex); 146 return my_list.empty(); 147 } 148 }; 149 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 150 151 static allowed_parallelism_control allowed_parallelism_ctl; 152 static stack_size_control stack_size_ctl; 153 static terminate_on_exception_control terminate_on_exception_ctl; 154 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 155 static lifetime_control lifetime_ctl; 156 static control_storage *controls[] = {&allowed_parallelism_ctl, &stack_size_ctl, &terminate_on_exception_ctl, &lifetime_ctl}; 157 #else 158 static control_storage *controls[] = {&allowed_parallelism_ctl, &stack_size_ctl, &terminate_on_exception_ctl}; 159 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 160 161 //! Comparator for a set of global_control objects 162 inline bool control_storage_comparator::operator()(const global_control* lhs, const global_control* rhs) const { 163 __TBB_ASSERT_RELEASE(lhs->my_param < global_control::parameter_max , NULL); 164 return lhs->my_value < rhs->my_value || (lhs->my_value == rhs->my_value && lhs < rhs); 165 } 166 167 unsigned market::app_parallelism_limit() { 168 return allowed_parallelism_ctl.active_value_if_present(); 169 } 170 171 bool terminate_on_exception() { 172 return global_control::active_value(global_control::terminate_on_exception) == 1; 173 } 174 175 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 176 unsigned market::is_lifetime_control_present() { 177 return !lifetime_ctl.is_empty(); 178 } 179 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 180 181 struct global_control_impl { 182 private: 183 static bool erase_if_present(control_storage* const c, d1::global_control& gc) { 184 auto it = c->my_list.find(&gc); 185 if (it != c->my_list.end()) { 186 c->my_list.erase(it); 187 return true; 188 } 189 return false; 190 } 191 192 public: 193 194 static void create(d1::global_control& gc) { 195 __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL); 196 control_storage* const c = controls[gc.my_param]; 197 198 spin_mutex::scoped_lock lock(c->my_list_mutex); 199 if (c->my_list.empty() || c->is_first_arg_preferred(gc.my_value, c->my_active_value)) { 200 // to guarantee that apply_active() is called with current active value, 201 // calls it here and in internal_destroy() under my_list_mutex 202 c->apply_active(gc.my_value); 203 } 204 c->my_list.insert(&gc); 205 } 206 207 static void destroy(d1::global_control& gc) { 208 __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL); 209 control_storage* const c = controls[gc.my_param]; 210 // Concurrent reading and changing global parameter is possible. 211 spin_mutex::scoped_lock lock(c->my_list_mutex); 212 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 213 __TBB_ASSERT(gc.my_param == global_control::scheduler_handle || !c->my_list.empty(), NULL); 214 #else 215 __TBB_ASSERT(!c->my_list.empty(), NULL); 216 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 217 std::size_t new_active = (std::size_t)(-1), old_active = c->my_active_value; 218 219 if (!erase_if_present(c, gc)) { 220 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 221 __TBB_ASSERT(gc.my_param == global_control::scheduler_handle , NULL); 222 return; 223 #else 224 __TBB_ASSERT(false, "Unreachable code"); 225 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 226 } 227 if (c->my_list.empty()) { 228 __TBB_ASSERT(new_active == (std::size_t) - 1, NULL); 229 new_active = c->default_value(); 230 } else { 231 new_active = (*c->my_list.begin())->my_value; 232 } 233 if (new_active != old_active) { 234 c->apply_active(new_active); 235 } 236 } 237 238 static bool remove_and_check_if_empty(d1::global_control& gc) { 239 __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL); 240 control_storage* const c = controls[gc.my_param]; 241 242 spin_mutex::scoped_lock lock(c->my_list_mutex); 243 __TBB_ASSERT(!c->my_list.empty(), NULL); 244 erase_if_present(c, gc); 245 return c->my_list.empty(); 246 } 247 #if TBB_USE_ASSERT 248 static bool is_present(d1::global_control& gc) { 249 __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL); 250 control_storage* const c = controls[gc.my_param]; 251 252 spin_mutex::scoped_lock lock(c->my_list_mutex); 253 auto it = c->my_list.find(&gc); 254 if (it != c->my_list.end()) { 255 return true; 256 } 257 return false; 258 } 259 #endif // TBB_USE_ASSERT 260 }; 261 262 void __TBB_EXPORTED_FUNC create(d1::global_control& gc) { 263 global_control_impl::create(gc); 264 } 265 void __TBB_EXPORTED_FUNC destroy(d1::global_control& gc) { 266 global_control_impl::destroy(gc); 267 } 268 269 bool remove_and_check_if_empty(d1::global_control& gc) { 270 return global_control_impl::remove_and_check_if_empty(gc); 271 } 272 #if TBB_USE_ASSERT 273 bool is_present(d1::global_control& gc) { 274 return global_control_impl::is_present(gc); 275 } 276 #endif // TBB_USE_ASSERT 277 std::size_t __TBB_EXPORTED_FUNC global_control_active_value(int param) { 278 __TBB_ASSERT_RELEASE(param < global_control::parameter_max, NULL); 279 return controls[param]->active_value(); 280 } 281 282 } // namespace r1 283 } // namespace detail 284 } // namespace tbb 285