1 /* 2 Copyright (c) 2022-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef _TBB_threading_control_H 18 #define _TBB_threading_control_H 19 20 #include "oneapi/tbb/mutex.h" 21 #include "oneapi/tbb/global_control.h" 22 23 #include "threading_control_client.h" 24 #include "intrusive_list.h" 25 #include "main.h" 26 #include "permit_manager.h" 27 #include "pm_client.h" 28 #include "thread_dispatcher.h" 29 #include "cancellation_disseminator.h" 30 #include "thread_request_serializer.h" 31 #include "scheduler_common.h" 32 33 namespace tbb { 34 namespace detail { 35 namespace r1 { 36 37 class arena; 38 class thread_data; 39 40 class threading_control; 41 42 class threading_control_impl { 43 public: 44 threading_control_impl(threading_control*); 45 46 public: 47 void release(bool blocking_terminate); 48 49 threading_control_client create_client(arena& a); 50 void publish_client(threading_control_client client, d1::constraints& constraints); 51 52 struct client_snapshot { 53 std::uint64_t aba_epoch; 54 unsigned priority_level; 55 thread_dispatcher_client* my_td_client; 56 pm_client* my_pm_client; 57 }; 58 59 client_snapshot prepare_client_destruction(threading_control_client client); 60 bool try_destroy_client(client_snapshot deleter); 61 62 void register_thread(thread_data& td); 63 void unregister_thread(thread_data& td); 64 void propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state, 65 d1::task_group_context& src, uint32_t new_state); 66 67 void set_active_num_workers(unsigned soft_limit); 68 std::size_t worker_stack_size(); 69 unsigned max_num_workers(); 70 71 void adjust_demand(threading_control_client, int mandatory_delta, int workers_delta); 72 73 thread_control_monitor& get_waiting_threads_monitor(); 74 75 private: 76 static unsigned calc_workers_soft_limit(unsigned workers_hard_limit); 77 static std::pair<unsigned, unsigned> calculate_workers_limits(); 78 static cache_aligned_unique_ptr<permit_manager> make_permit_manager(unsigned workers_soft_limit); 79 static cache_aligned_unique_ptr<thread_dispatcher> make_thread_dispatcher(threading_control& control, 80 unsigned workers_soft_limit, 81 unsigned workers_hard_limit); 82 83 // TODO: Consider allocation one chunk of memory and construct objects on it 84 cache_aligned_unique_ptr<permit_manager> my_permit_manager{nullptr}; 85 cache_aligned_unique_ptr<thread_dispatcher> my_thread_dispatcher{nullptr}; 86 cache_aligned_unique_ptr<thread_request_serializer_proxy> my_thread_request_serializer{nullptr}; 87 cache_aligned_unique_ptr<cancellation_disseminator> my_cancellation_disseminator{nullptr}; 88 cache_aligned_unique_ptr<thread_control_monitor> my_waiting_threads_monitor{nullptr}; 89 }; 90 91 92 class threading_control { 93 using global_mutex_type = d1::mutex; 94 public: 95 using client_snapshot = threading_control_impl::client_snapshot; 96 97 static threading_control* register_public_reference(); 98 static bool unregister_public_reference(bool blocking_terminate); 99 100 static bool is_present(); 101 static void set_active_num_workers(unsigned soft_limit); 102 static bool register_lifetime_control(); 103 static bool unregister_lifetime_control(bool blocking_terminate); 104 105 threading_control_client create_client(arena& a); 106 void publish_client(threading_control_client client, d1::constraints& constraints); 107 client_snapshot prepare_client_destruction(threading_control_client client); 108 bool try_destroy_client(client_snapshot deleter); 109 110 void register_thread(thread_data& td); 111 void unregister_thread(thread_data& td); 112 void propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state, 113 d1::task_group_context& src, uint32_t new_state); 114 115 std::size_t worker_stack_size(); 116 static unsigned max_num_workers(); 117 118 void adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta); 119 120 thread_control_monitor& get_waiting_threads_monitor(); 121 122 private: 123 threading_control(unsigned public_ref, unsigned ref); 124 void add_ref(bool is_public); 125 bool remove_ref(bool is_public); 126 127 static threading_control* get_threading_control(bool is_public); 128 static threading_control* create_threading_control(); 129 130 bool release(bool is_public, bool blocking_terminate); 131 void wait_last_reference(global_mutex_type::scoped_lock& lock); 132 void destroy(); 133 134 friend class thread_dispatcher; 135 136 static threading_control* g_threading_control; 137 //! Mutex guarding creation/destruction of g_threading_control, insertions/deletions in my_arenas, and cancellation propagation 138 static global_mutex_type g_threading_control_mutex; 139 140 cache_aligned_unique_ptr<threading_control_impl> my_pimpl{nullptr}; 141 //! Count of external threads attached 142 std::atomic<unsigned> my_public_ref_count{0}; 143 //! Reference count controlling threading_control object lifetime 144 std::atomic<unsigned> my_ref_count{0}; 145 }; 146 147 } // r1 148 } // detail 149 } // tbb 150 151 152 #endif // _TBB_threading_control_H 153